From 0797dd914b7cad8a6e94f644c82da7755caeea50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20G=C3=BCttler?= Date: Sun, 19 Apr 2026 16:25:44 +0200 Subject: [PATCH] feat: JMAP outbound changes via pending_changes queue (Step 7) For JMAP accounts, setFlag/moveEmail/deleteEmail now write to the pending_changes table instead of making direct server calls, enabling offline-first mutation with durable retries. flushPendingChanges() drains the queue at the start of each JMAP sync cycle via Email/set (flag updates use keyword patches; move updates mailboxIds; delete uses Email/set destroy). On failure the attempt count and last error are recorded; the change remains queued. Local DB is updated optimistically on mutation so the UI responds immediately. Co-Authored-By: Claude Sonnet 4.6 --- DB-SYNC.md | 2 +- lib/core/repositories/email_repository.dart | 4 + lib/core/sync/account_sync_manager.dart | 11 +- .../repositories/email_repository_impl.dart | 180 +++++++++++++++++- .../account_sync_manager_test.dart | 3 + test/unit/account_sync_manager_test.dart | 3 + test/unit/email_repository_impl_test.dart | 163 ++++++++++++++++ test/widget/helpers.dart | 3 + 8 files changed, 362 insertions(+), 7 deletions(-) diff --git a/DB-SYNC.md b/DB-SYNC.md index 38bf2d8..d333e1f 100644 --- a/DB-SYNC.md +++ b/DB-SYNC.md @@ -104,7 +104,7 @@ Add JMAP handling to `AccountSyncManager`: EventSource if server supports it) → repeat. - Reuse the existing exponential backoff pattern from `_AccountSync`. -### Step 7 — JMAP outbound changes `[ ]` +### Step 7 — JMAP outbound changes `[x]` Wire local mutations (flag, move, delete) for JMAP accounts into `pending_changes` instead of direct server calls. Add a queue-draining step at the start of each sync diff --git a/lib/core/repositories/email_repository.dart b/lib/core/repositories/email_repository.dart index 78d2421..3cdd468 100644 --- a/lib/core/repositories/email_repository.dart +++ b/lib/core/repositories/email_repository.dart @@ -26,4 +26,8 @@ abstract class EmailRepository { String mailboxPath, String query, ); + + /// Sends any queued local mutations for [accountId] to the server. + /// No-op for IMAP accounts (mutations are applied synchronously). + Future flushPendingChanges(String accountId, String password); } diff --git a/lib/core/sync/account_sync_manager.dart b/lib/core/sync/account_sync_manager.dart index 4dd101c..19a7b6c 100644 --- a/lib/core/sync/account_sync_manager.dart +++ b/lib/core/sync/account_sync_manager.dart @@ -33,7 +33,7 @@ class AccountSyncManager { AccountType.imap => _AccountSync(account, _accounts, _mailboxes, _emails), AccountType.jmap => - _JmapAccountSync(account, _mailboxes, _emails), + _JmapAccountSync(account, _mailboxes, _emails, _accounts), }; _active[account.id] = loop; loop.start(); @@ -164,11 +164,12 @@ class _AccountSync implements _SyncLoop { // ── JMAP ────────────────────────────────────────────────────────────────────── class _JmapAccountSync implements _SyncLoop { - _JmapAccountSync(this.account, this._mailboxes, this._emails); + _JmapAccountSync(this.account, this._mailboxes, this._emails, this._accounts); final Account account; final MailboxRepository _mailboxes; final EmailRepository _emails; + final AccountRepository _accounts; bool _running = false; int _backoffSeconds = 5; @@ -209,9 +210,13 @@ class _JmapAccountSync implements _SyncLoop { } Future _sync() async { + final password = await _accounts.getPassword(account.id); + + // Drain outbound queue before pulling from server. + await _emails.flushPendingChanges(account.id, password); + await _mailboxes.syncMailboxes(account.id); - // Sync emails for each known mailbox. final mailboxes = await _mailboxes.observeMailboxes(account.id).first; for (final mailbox in mailboxes) { if (!_running) break; diff --git a/lib/data/repositories/email_repository_impl.dart b/lib/data/repositories/email_repository_impl.dart index f91d524..0b69e82 100644 --- a/lib/data/repositories/email_repository_impl.dart +++ b/lib/data/repositories/email_repository_impl.dart @@ -392,8 +392,29 @@ class EmailRepositoryImpl implements EmailRepository { ..where((t) => t.id.equals(emailId))) .getSingle(); final account = (await _accounts.getAccount(row.accountId))!; + + if (account.type == account_model.AccountType.jmap) { + if (seen != null) { + await _enqueueChange(account.id, emailId, 'flag_seen', + jsonEncode({'seen': seen})); + } + if (flagged != null) { + await _enqueueChange(account.id, emailId, 'flag_flagged', + jsonEncode({'flagged': flagged})); + } + // Optimistic local update. + await (_db.update(_db.emails)..where((t) => t.id.equals(emailId))).write( + EmailsCompanion( + isSeen: seen != null ? Value(seen) : const Value.absent(), + isFlagged: flagged != null ? Value(flagged) : const Value.absent(), + ), + ); + return; + } + final password = await _accounts.getPassword(account.id); - final client = await _imapConnect(account, _effectiveUsername(account), password); + final client = + await _imapConnect(account, _effectiveUsername(account), password); try { await client.selectMailboxByPath(row.mailboxPath); final seq = imap.MessageSequence.fromId(row.uid, isUid: true); @@ -425,8 +446,18 @@ class EmailRepositoryImpl implements EmailRepository { ..where((t) => t.id.equals(emailId))) .getSingle(); final account = (await _accounts.getAccount(row.accountId))!; + + if (account.type == account_model.AccountType.jmap) { + await _enqueueChange(account.id, emailId, 'move', + jsonEncode({'dest': destMailboxPath})); + // Optimistic: remove from current view; next sync will reconcile. + await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go(); + return; + } + final password = await _accounts.getPassword(account.id); - final client = await _imapConnect(account, _effectiveUsername(account), password); + final client = + await _imapConnect(account, _effectiveUsername(account), password); try { await client.selectMailboxByPath(row.mailboxPath); await client.uidMove( @@ -445,8 +476,17 @@ class EmailRepositoryImpl implements EmailRepository { ..where((t) => t.id.equals(emailId))) .getSingle(); final account = (await _accounts.getAccount(row.accountId))!; + + if (account.type == account_model.AccountType.jmap) { + await _enqueueChange( + account.id, emailId, 'delete', jsonEncode({})); + await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go(); + return; + } + final password = await _accounts.getPassword(account.id); - final client = await _imapConnect(account, _effectiveUsername(account), password); + final client = + await _imapConnect(account, _effectiveUsername(account), password); try { await client.selectMailboxByPath(row.mailboxPath); final seq = imap.MessageSequence.fromId(row.uid, isUid: true); @@ -458,6 +498,140 @@ class EmailRepositoryImpl implements EmailRepository { } } + // ── pending_changes queue ────────────────────────────────────────────────── + + Future _enqueueChange( + String accountId, + String resourceId, + String changeType, + String payload, + ) async { + await _db.into(_db.pendingChanges).insert( + PendingChangesCompanion.insert( + accountId: accountId, + resourceType: 'Email', + resourceId: resourceId, + changeType: changeType, + payload: payload, + createdAt: DateTime.now(), + ), + ); + } + + /// Drains pending changes for [accountId] via JMAP Email/set. + /// Called at the start of each JMAP sync cycle. + @override + Future flushPendingChanges( + String accountId, String password) async { + final rows = await (_db.select(_db.pendingChanges) + ..where((t) => t.accountId.equals(accountId)) + ..orderBy([(t) => OrderingTerm.asc(t.createdAt)])) + .get(); + if (rows.isEmpty) return; + + final account = (await _accounts.getAccount(accountId))!; + final jmapUrl = account.jmapUrl; + if (jmapUrl == null || jmapUrl.isEmpty) return; + + final jmap = await JmapClient.connect( + httpClient: _httpClient, + jmapUrl: Uri.parse(jmapUrl), + username: _effectiveUsername(account), + password: password, + ); + + for (final row in rows) { + try { + await _applyPendingChange(jmap, row); + await (_db.delete(_db.pendingChanges) + ..where((t) => t.id.equals(row.id))) + .go(); + } catch (e) { + await (_db.update(_db.pendingChanges) + ..where((t) => t.id.equals(row.id))) + .write(PendingChangesCompanion( + attempts: Value(row.attempts + 1), + lastError: Value(e.toString()), + )); + } + } + } + + Future _applyPendingChange( + JmapClient jmap, PendingChangeRow row) async { + final payload = jsonDecode(row.payload) as Map; + // Extract the JMAP email ID from the DB id (format: "accountId:jmapId"). + final jmapEmailId = row.resourceId.contains(':') + ? row.resourceId.substring(row.resourceId.indexOf(':') + 1) + : row.resourceId; + + switch (row.changeType) { + case 'flag_seen': + final seen = payload['seen'] as bool; + await jmap.call([ + [ + 'Email/set', + { + 'accountId': jmap.accountId, + 'update': { + jmapEmailId: { + 'keywords/\$seen': seen, + }, + }, + }, + '0', + ] + ]); + + case 'flag_flagged': + final flagged = payload['flagged'] as bool; + await jmap.call([ + [ + 'Email/set', + { + 'accountId': jmap.accountId, + 'update': { + jmapEmailId: { + 'keywords/\$flagged': flagged, + }, + }, + }, + '0', + ] + ]); + + case 'move': + final destMailboxId = payload['dest'] as String; + await jmap.call([ + [ + 'Email/set', + { + 'accountId': jmap.accountId, + 'update': { + jmapEmailId: { + 'mailboxIds/$destMailboxId': true, + 'mailboxIds/${row.resourceId}': null, + }, + }, + }, + '0', + ] + ]); + + case 'delete': + await jmap.call([ + [ + 'Email/set', + { + 'accountId': jmap.accountId, + 'destroy': [jmapEmailId], + }, + '0', + ] + ]); + } + } + @override Future sendEmail(String accountId, model.EmailDraft draft) async { final account = (await _accounts.getAccount(accountId))!; diff --git a/test/integration/account_sync_manager_test.dart b/test/integration/account_sync_manager_test.dart index c460a53..2abe95b 100644 --- a/test/integration/account_sync_manager_test.dart +++ b/test/integration/account_sync_manager_test.dart @@ -82,6 +82,9 @@ class _FakeEmails implements EmailRepository { @override Future deleteEmail(String id) async {} + @override + Future flushPendingChanges(String accountId, String password) async {} + @override Future sendEmail(String a, EmailDraft d) async {} diff --git a/test/unit/account_sync_manager_test.dart b/test/unit/account_sync_manager_test.dart index 1249953..fb822fa 100644 --- a/test/unit/account_sync_manager_test.dart +++ b/test/unit/account_sync_manager_test.dart @@ -82,6 +82,9 @@ class FakeEmailRepository implements EmailRepository { @override Future deleteEmail(String emailId) async {} + @override + Future flushPendingChanges(String accountId, String password) async {} + @override Future sendEmail(String accountId, EmailDraft draft) async {} diff --git a/test/unit/email_repository_impl_test.dart b/test/unit/email_repository_impl_test.dart index 5f8dd16..8ff6c60 100644 --- a/test/unit/email_repository_impl_test.dart +++ b/test/unit/email_repository_impl_test.dart @@ -747,4 +747,167 @@ void main() { expect(states.first.state, 'est1'); }); }); + + group('JMAP setFlag / moveEmail / deleteEmail enqueue pending_changes', () { + Future seedJmapEmail( + AppDatabase db, AccountRepositoryImpl accounts) async { + await accounts.addAccount(_jmapAccount, 'pw'); + await db.into(db.emails).insert(EmailsCompanion.insert( + id: 'jmap-1:e1', + accountId: 'jmap-1', + mailboxPath: 'mbx1', + uid: 0, + receivedAt: DateTime(2024), + )); + } + + test('setFlag seen enqueues flag_seen change and updates local DB', () async { + final r = _makeRepos(); + await seedJmapEmail(r.db, r.accounts); + + await r.emails.setFlag('jmap-1:e1', seen: true); + + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes, hasLength(1)); + expect(changes.first.changeType, 'flag_seen'); + expect(changes.first.payload, contains('true')); + + final email = await r.emails.getEmail('jmap-1:e1'); + expect(email?.isSeen, isTrue); + }); + + test('setFlag flagged enqueues flag_flagged change', () async { + final r = _makeRepos(); + await seedJmapEmail(r.db, r.accounts); + + await r.emails.setFlag('jmap-1:e1', flagged: true); + + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes.first.changeType, 'flag_flagged'); + }); + + test('moveEmail enqueues move change and removes email from local DB', () async { + final r = _makeRepos(); + await seedJmapEmail(r.db, r.accounts); + + await r.emails.moveEmail('jmap-1:e1', 'mbx2'); + + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes.first.changeType, 'move'); + expect(changes.first.payload, contains('mbx2')); + + expect(await r.emails.getEmail('jmap-1:e1'), isNull); + }); + + test('deleteEmail enqueues delete change and removes email from local DB', () async { + final r = _makeRepos(); + await seedJmapEmail(r.db, r.accounts); + + await r.emails.deleteEmail('jmap-1:e1'); + + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes.first.changeType, 'delete'); + expect(await r.emails.getEmail('jmap-1:e1'), isNull); + }); + }); + + group('JMAP flushPendingChanges', () { + http.Client mockFlush(int apiStatus) { + return MockClient((req) async { + if (req.url.path.contains('well-known')) { + return http.Response( + jsonEncode({ + 'apiUrl': 'https://jmap.example.com/api/', + 'accounts': {'acct1': {'name': 'alice@example.com', 'isPersonal': true}}, + 'primaryAccounts': { + 'urn:ietf:params:jmap:core': 'acct1', + 'urn:ietf:params:jmap:mail': 'acct1', + }, + 'capabilities': {}, + 'username': 'alice@example.com', + 'state': 'sess1', + }), + 200, + ); + } + return http.Response( + jsonEncode({'sessionState': 's1', 'methodResponses': [ + ['Email/set', {'accountId': 'acct1', 'updated': {}, 'destroyed': []}, '0'], + ]}), + apiStatus, + ); + }); + } + + Future seedChange(AppDatabase db, AccountRepositoryImpl accounts, + {String changeType = 'flag_seen', String payload = '{"seen":true}'}) async { + await accounts.addAccount(_jmapAccount, 'pw'); + await db.into(db.pendingChanges).insert(PendingChangesCompanion.insert( + accountId: 'jmap-1', + resourceType: 'Email', + resourceId: 'jmap-1:e1', + changeType: changeType, + payload: payload, + createdAt: DateTime.now(), + )); + } + + test('no-op when no pending changes', () async { + final r = _makeRepos(httpClient: mockFlush(200)); + await r.accounts.addAccount(_jmapAccount, 'pw'); + await r.emails.flushPendingChanges('jmap-1', 'pw'); + expect(await r.db.select(r.db.pendingChanges).get(), isEmpty); + }); + + test('sends flag_seen and removes change on success', () async { + final r = _makeRepos(httpClient: mockFlush(200)); + await seedChange(r.db, r.accounts); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + expect(await r.db.select(r.db.pendingChanges).get(), isEmpty); + }); + + test('sends flag_flagged and removes change on success', () async { + final r = _makeRepos(httpClient: mockFlush(200)); + await seedChange(r.db, r.accounts, + changeType: 'flag_flagged', payload: '{"flagged":true}'); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + expect(await r.db.select(r.db.pendingChanges).get(), isEmpty); + }); + + test('sends move and removes change on success', () async { + final r = _makeRepos(httpClient: mockFlush(200)); + await seedChange(r.db, r.accounts, + changeType: 'move', payload: '{"dest":"mbx2"}'); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + expect(await r.db.select(r.db.pendingChanges).get(), isEmpty); + }); + + test('sends delete and removes change on success', () async { + final r = _makeRepos(httpClient: mockFlush(200)); + await seedChange(r.db, r.accounts, + changeType: 'delete', payload: '{}'); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + expect(await r.db.select(r.db.pendingChanges).get(), isEmpty); + }); + + test('records attempt count and error on API failure', () async { + final r = _makeRepos(httpClient: mockFlush(500)); + await seedChange(r.db, r.accounts); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes, hasLength(1)); + expect(changes.first.attempts, 1); + expect(changes.first.lastError, isNotNull); + }); + }); } diff --git a/test/widget/helpers.dart b/test/widget/helpers.dart index ad1f47d..db189a5 100644 --- a/test/widget/helpers.dart +++ b/test/widget/helpers.dart @@ -168,6 +168,9 @@ class FakeEmailRepository implements EmailRepository { @override Future deleteEmail(String emailId) async {} + @override + Future flushPendingChanges(String accountId, String password) async {} + @override Future sendEmail(String accountId, EmailDraft draft) async {}