feat: conflict resolution hardening — server-wins policy, max-retry eviction

- Check notUpdated/notDestroyed per-item errors in Email/set; throw
  JmapSetItemException for permanent failures (notFound, forbidden) so
  they are discarded immediately rather than retried
- Add _maxChangeAttempts=5 constant; _recordChangeError() evicts the
  pending-change row when attempts reach the limit, preventing unbounded
  queue growth from transient errors
- Both IMAP and JMAP flush paths now use _recordChangeError() consistently
- Document server-wins conflict-resolution policy in DB-SYNC.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Güttler
2026-04-19 21:05:48 +02:00
co-authored by Claude Sonnet 4.6
parent 795001d268
commit 93ac5afbcf
4 changed files with 168 additions and 28 deletions
+8 -4
View File
@@ -68,7 +68,11 @@ This document covers the mail-to-database sync layer only, not the UI.
### Shared / cross-protocol
- **Conflict-resolution hardening**: document and enforce the server-wins policy
consistently — check `notUpdated`/`notDestroyed` per-item errors in JMAP `Email/set`
responses, handle IMAP `NO`/`BAD` gracefully, and evict changes that exceed a
maximum retry threshold (e.g. 5 attempts) to prevent queues from growing unboundedly.
- Conflict-resolution policy: **server-wins**. The next sync cycle always
overwrites local state with server values. Outbound mutations in
`pending_changes` are retried up to 5 times before being evicted, preventing
unbounded queue growth. Permanent per-item JMAP errors (`notFound`,
`forbidden`) are discarded immediately; transient errors (network, 500)
are retried up to the limit.
- `ifInState` guard on every JMAP `Email/set` call; `notUpdated`/`notDestroyed`
per-item errors are detected and treated as permanent failures.
+16
View File
@@ -208,3 +208,19 @@ class JmapStateMismatchException implements Exception {
@override
String toString() => 'JmapStateMismatchException: state token is stale';
}
/// Thrown when an individual email update or destroy inside an `Email/set`
/// is rejected by the server (RFC 8620 §5.3 `notUpdated` / `notDestroyed`).
///
/// This is a permanent per-item error (e.g. `notFound`, `forbidden`) rather
/// than a transient transport failure, so the pending change should be
/// discarded rather than retried indefinitely.
class JmapSetItemException implements Exception {
JmapSetItemException(this.type, this.description);
final String type;
final String? description;
@override
String toString() =>
'JmapSetItemException: $type${description != null ? '$description' : ''}';
}
@@ -11,6 +11,7 @@ import 'package:path_provider/path_provider.dart';
import 'package:http/http.dart' as http;
import '../../core/models/account.dart' as account_model;
import '../../core/utils/logger.dart';
import '../../core/models/email.dart' as model;
import '../../core/repositories/account_repository.dart';
import '../../core/repositories/email_repository.dart';
@@ -339,6 +340,10 @@ class EmailRepositoryImpl implements EmailRepository {
static const _jmapPageSize = 500;
/// Pending changes exceeding this attempt count are evicted rather than
/// retried, preventing unbounded queue growth from permanent server errors.
static const _maxChangeAttempts = 5;
static const _emailProperties = [
'id', 'mailboxIds', 'subject', 'sentAt', 'receivedAt',
'from', 'to', 'cc', 'keywords', 'hasAttachment', 'preview',
@@ -559,6 +564,27 @@ class EmailRepositoryImpl implements EmailRepository {
return (textBody, htmlBody, attachmentsJson);
}
// ── Pending-change helpers ────────────────────────────────────────────────
/// Records a failure for [row]: increments attempt count and stores the
/// error message. When attempts reach [_maxChangeAttempts] the row is
/// deleted instead — the change is permanently abandoned.
Future<void> _recordChangeError(PendingChangeRow row, Object error) async {
final next = row.attempts + 1;
if (next >= _maxChangeAttempts) {
await (_db.delete(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.go();
} else {
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(next),
lastError: Value(error.toString()),
));
}
}
// ── sync_state helpers ────────────────────────────────────────────────────
Future<String?> _loadSyncState(String accountId, String resourceType) async {
@@ -867,21 +893,19 @@ class EmailRepositoryImpl implements EmailRepository {
t.accountId.equals(account.id) &
t.resourceType.equals('Email')))
.go();
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: const Value('stateMismatch — will retry after re-sync'),
));
await _recordChangeError(
row, 'stateMismatch — will retry after re-sync');
// State is now stale for all remaining rows too; stop processing.
break;
} catch (e) {
await (_db.update(_db.pendingChanges)
} on JmapSetItemException catch (e) {
// Permanent per-item rejection (e.g. notFound, forbidden) — discard
// the change so the queue doesn't grow unboundedly.
await (_db.delete(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: Value(e.toString()),
));
.go();
log('JMAP permanent error for change ${row.id}: $e');
} catch (e) {
await _recordChangeError(row, e);
}
}
}
@@ -893,13 +917,9 @@ class EmailRepositoryImpl implements EmailRepository {
client =
await _imapConnect(account, _effectiveUsername(account), password);
} catch (e) {
// Connection-level failure — bump all rows, they'll retry next cycle.
for (final row in rows) {
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: Value(e.toString()),
));
await _recordChangeError(row, e);
}
return;
}
@@ -911,12 +931,7 @@ class EmailRepositoryImpl implements EmailRepository {
..where((t) => t.id.equals(row.id)))
.go();
} catch (e) {
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: Value(e.toString()),
));
await _recordChangeError(row, e);
}
}
} finally {
@@ -1044,6 +1059,24 @@ class EmailRepositoryImpl implements EmailRepository {
throw const JmapStateMismatchException();
}
// Check for per-item rejection (notUpdated / notDestroyed).
final notUpdated = result['notUpdated'] as Map<String, dynamic>?;
if (notUpdated != null && notUpdated.containsKey(jmapEmailId)) {
final err = notUpdated[jmapEmailId] as Map<String, dynamic>;
throw JmapSetItemException(
err['type'] as String? ?? 'unknown',
err['description'] as String?,
);
}
final notDestroyed = result['notDestroyed'] as Map<String, dynamic>?;
if (notDestroyed != null && notDestroyed.containsKey(jmapEmailId)) {
final err = notDestroyed[jmapEmailId] as Map<String, dynamic>;
throw JmapSetItemException(
err['type'] as String? ?? 'unknown',
err['description'] as String?,
);
}
return result['newState'] as String?;
}
+87
View File
@@ -906,6 +906,34 @@ void main() {
expect(changes.first.attempts, 1);
expect(changes.first.lastError, isNotNull);
});
test('evicts IMAP change after max attempts (5)', () async {
final r = _makeReposWithFakes();
await r.accounts.addAccount(_account, 'pw');
// Pre-seed a flag_seen at attempts=4
await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert(
accountId: _account.id,
resourceType: 'Email',
resourceId: '${_account.id}:1',
changeType: 'flag_seen',
payload: '{"uid":1,"mailboxPath":"INBOX","seen":true}',
createdAt: DateTime.now(),
attempts: const Value(4),
));
// Force connection failure so the attempt counter increments
final failingEmails = EmailRepositoryImpl(
r.db,
r.accounts,
imapConnect: (_, __, ___) => Future.error(Exception('forced failure')),
smtpConnect: _noSmtpConnect,
);
await failingEmails.flushPendingChanges(_account.id, 'pw');
// 4+1 = 5 = _maxChangeAttempts → evicted
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
});
group('JMAP getEmailBody', () {
@@ -1418,6 +1446,65 @@ void main() {
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.attempts, 1);
});
test('discards change immediately on notUpdated (permanent error)', () async {
final client = MockClient((req) async {
if (req.url.path.contains('well-known')) {
return http.Response(
jsonEncode({
'apiUrl': 'https://jmap.example.com/api/',
'accounts': {'acct1': {}},
'primaryAccounts': {
'urn:ietf:params:jmap:core': 'acct1',
'urn:ietf:params:jmap:mail': 'acct1',
},
'capabilities': {},
'username': 'alice@example.com',
'state': 'sess1',
}),
200,
);
}
// Server responds with notUpdated — permanent per-item error
return http.Response(
jsonEncode({'sessionState': 's1', 'methodResponses': [
['Email/set', {
'accountId': 'acct1',
'notUpdated': {'e1': {'type': 'notFound'}},
}, '0'],
]}),
200,
);
});
final r = _makeRepos(httpClient: client);
await r.accounts.addAccount(_jmapAccount, 'pw');
await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert(
accountId: 'jmap-1', resourceType: 'Email', resourceId: 'jmap-1:e1',
changeType: 'flag_seen', payload: '{"seen":true}', createdAt: DateTime.now(),
));
await r.emails.flushPendingChanges('jmap-1', 'pw');
// Permanent error — change is immediately evicted
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
test('evicts change after max attempts (5)', () async {
final r = _makeRepos(httpClient: mockFlush(500));
await r.accounts.addAccount(_jmapAccount, 'pw');
// Seed a change already at attempts=4 (one below the eviction threshold)
await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert(
accountId: 'jmap-1', resourceType: 'Email', resourceId: 'jmap-1:e1',
changeType: 'flag_seen', payload: '{"seen":true}', createdAt: DateTime.now(),
attempts: const Value(4),
));
await r.emails.flushPendingChanges('jmap-1', 'pw');
// 4+1 = 5 = _maxChangeAttempts → evicted
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
});
group('JMAP syncEmails body caching', () {