diff --git a/DB-SYNC.md b/DB-SYNC.md index d333e1f..fc2bf65 100644 --- a/DB-SYNC.md +++ b/DB-SYNC.md @@ -9,6 +9,24 @@ This document covers the mail-to-database sync layer only, not the UI. - JMAP accounts can be stored in the database. - JMAP endpoint discovery is implemented. - JMAP connection testing is implemented. +- `sync_state` table stores server-side state tokens per (account, resource type). +- `pending_changes` table provides a protocol-agnostic outbound mutation queue. +- `JmapClient` fetches the JMAP Session object, extracts `apiUrl` and `accountId`, + and provides a `call()` helper for API requests. +- `syncMailboxes` for JMAP: first run uses `Mailbox/get`; subsequent runs use + `Mailbox/changes` with the stored state token. +- `syncEmails` for JMAP: first run uses `Email/query` + `Email/get`; subsequent runs + use `Email/changes`. Chains both calls in a single API request via `#ids` back-reference. +- `Email/query` pagination: cursor-based loop with `position` offset and `calculateTotal` + handles mailboxes larger than 500 emails. +- JMAP background sync worker (`_JmapAccountSync`): session → flush outbound queue → + syncMailboxes → syncEmails per mailbox → 30 s poll → repeat. Exponential backoff + 5–300 s on failure. +- Local mutations (flag, move, delete) on JMAP accounts are written to `pending_changes` + with an optimistic local update. `flushPendingChanges` drains the queue via `Email/set` + at the start of each sync cycle. +- Email bodies are fetched on demand via `Email/get` with `bodyValues` and cached in + `email_bodies` so subsequent opens are instant. ### IMAP @@ -16,123 +34,40 @@ This document covers the mail-to-database sync layer only, not the UI. - Mailbox lists and mailbox counters are synced into the local database. - Email headers, flags, and attachment metadata are pulled from IMAP into the local database. - Email bodies are fetched on demand and cached locally. -- User-triggered changes are sent to the server immediately: seen, flagged, move, delete, and send. +- All mailboxes (not just INBOX) are synced each cycle. +- Incremental sync: `(lastUid, uidValidity)` checkpoint stored in `sync_state`; only + new UIDs are fetched on subsequent runs; UID-validity change triggers a full re-scan. +- Deletion reconciliation: server UID set is compared against local rows; any email + absent from the server is removed from the local DB. +- Local mutations (flag, move, delete) are written to `pending_changes` with an + optimistic local update; `flushPendingChanges` drains the queue over a single + IMAP connection at the start of each sync cycle. - Sent messages are appended to the Sent folder after SMTP delivery. - Sync retries use exponential backoff after failures. ---- +### Cross-protocol -## Plan - -Goal: make bidirectional DB↔JMAP sync easy and correct. JMAP is the preferred protocol -long-term because its state-based change tracking is cleaner than IMAP's UID/MODSEQ model. -All DB foundations are protocol-agnostic so IMAP can use the same tables later. - -### Step 1 — `sync_state` table `[x]` - -A single table that stores the server-side state token per (account, resource type). -For JMAP this is the opaque `state` string returned by `Mailbox/get` and `Email/get`. -For IMAP it will hold a JSON checkpoint (last UID, MODSEQ) per mailbox. - -Schema: - -```sql -sync_state ( - account_id TEXT NOT NULL, - resource_type TEXT NOT NULL, -- e.g. "Mailbox", "Email", "INBOX" - state TEXT NOT NULL, -- JMAP state string or IMAP checkpoint JSON - synced_at DATETIME NOT NULL, - PRIMARY KEY (account_id, resource_type) -) -``` - -### Step 2 — `pending_changes` table `[x]` - -Protocol-agnostic outbound queue. Any local mutation (flag, move, delete) is written -here first. A sync worker drains the queue and sends to server. Enables offline-first. - -Schema: - -```sql -pending_changes ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - account_id TEXT NOT NULL, - resource_type TEXT NOT NULL, -- "Email" - resource_id TEXT NOT NULL, -- local email id - change_type TEXT NOT NULL, -- "flag_seen" | "flag_flagged" | "move" | "delete" - payload TEXT NOT NULL, -- JSON, e.g. {"seen": true} or {"dest": "Archive"} - created_at DATETIME NOT NULL, - attempts INTEGER NOT NULL DEFAULT 0, - last_error TEXT -) -``` - -### Step 3 — JMAP session client `[x]` - -Implement `JmapSession`: parse the JMAP Session object from `GET {jmapUrl}`, -extract `apiUrl`, primary `accountId`, and capabilities. Store nothing extra in the -DB (re-fetch session on start). Provide a `call(methodCalls)` helper that POSTs to -`apiUrl` and decodes responses. - -### Step 4 — JMAP Mailbox sync `[x]` - -Implement `syncMailboxes(accountId)` for JMAP: - -- First run: `Mailbox/get` → upsert all mailboxes, persist state in `sync_state`. -- Subsequent runs: `Mailbox/changes` using stored state → apply additions, updates, - removals, then update state. - -Reuse the existing `Mailboxes` table. No new DB columns needed. - -### Step 5 — JMAP Email sync `[x]` - -Implement `syncEmails(accountId, mailboxId)` for JMAP: - -- First run: `Email/query` (sorted by receivedAt desc, limit 500) + `Email/get` for - the returned ids → upsert into `Emails`, persist state. -- Subsequent runs: `Email/changes` using stored state → fetch new/changed via - `Email/get`, delete removed rows, update state. - -No new DB columns needed beyond `sync_state`. - -### Step 6 — JMAP background sync worker `[x]` - -Add JMAP handling to `AccountSyncManager`: - -- When a JMAP account appears, start a `_JmapAccountSync` loop. -- Loop: session → syncMailboxes → syncEmails for each mailbox → wait (poll or - EventSource if server supports it) → repeat. -- Reuse the existing exponential backoff pattern from `_AccountSync`. - -### Step 7 — JMAP outbound changes `[x]` - -Wire local mutations (flag, move, delete) for JMAP accounts into `pending_changes` -instead of direct server calls. Add a queue-draining step at the start of each sync -loop that issues `Email/set` for queued changes and removes them on success. +- `sync_log` table records each sync cycle's account, result (ok / error), error + message, start time, and finish time. Used for debugging and "last synced" UI. --- -## Missing features (to be addressed after the plan above) +## Next steps +### JMAP hardening -### JMAP missing features +- **Body caching during sync**: `syncEmails` currently syncs headers only. Include + `bodyValues` + `htmlBody`/`textBody` in the `Email/get` properties list so bodies + are written to `email_bodies` during the sync pass, not just on first open. +- **JMAP send**: implement outgoing mail via `EmailSubmission/set` in addition to the + current SMTP path. +- **Push instead of polling**: upgrade `_JmapAccountSync._wait()` to use an + `EventSource` connection to the JMAP push URL when the server advertises push + capability. Fall back to 30 s polling when push is unavailable. +- **Conflict handling**: pass `ifInState` to `Email/set` in `flushPendingChanges` so + the server can reject a stale mutation; retry the affected change after re-syncing. -- Everything in the plan above (Steps 3–7). -- No conflict handling (deferred; JMAP's `ifInState` provides the hook for it later). -- No sync log in database (deferred). +### Shared / cross-protocol -### IMAP missing features - -- Background sync refreshes only INBOX; other folders need the same treatment. -- No incremental sync checkpoints (will use `sync_state` once Step 1 is done). -- No durable outbound queue (will use `pending_changes` once Step 2 is done). -- No full reconciliation for remote deletions. -- No explicit conflict-resolution strategy. -- No sync log or audit trail. - -## Current summary - -- IMAP: partially implemented and already usable, but not full bidirectional sync. -- JMAP: account setup exists, but actual sync is still missing. -- Plan above targets JMAP first; IMAP improvements follow naturally once the shared - DB foundations (Steps 1–2) are in place. +- **Explicit conflict-resolution strategy**: decide and document the policy (last-write- + wins vs. server-wins) and implement it consistently across both protocols. diff --git a/lib/data/jmap/jmap_client.dart b/lib/data/jmap/jmap_client.dart index 3508548..6df99ed 100644 --- a/lib/data/jmap/jmap_client.dart +++ b/lib/data/jmap/jmap_client.dart @@ -132,3 +132,12 @@ class JmapException implements Exception { @override String toString() => 'JmapException: $message'; } + +/// Thrown when the server rejects an `Email/set` because our `ifInState` +/// token no longer matches the server's current state (RFC 8620 §5.3). +class JmapStateMismatchException implements Exception { + const JmapStateMismatchException(); + + @override + String toString() => 'JmapStateMismatchException: state token is stale'; +} diff --git a/lib/data/repositories/email_repository_impl.dart b/lib/data/repositories/email_repository_impl.dart index 54269ea..98ee1bc 100644 --- a/lib/data/repositories/email_repository_impl.dart +++ b/lib/data/repositories/email_repository_impl.dart @@ -172,41 +172,7 @@ class EmailRepositoryImpl implements EmailRepository { final emailData = (result['list'] as List).first as Map; - final bodyValues = - emailData['bodyValues'] as Map? ?? {}; - final textBodyParts = emailData['textBody'] as List? ?? []; - final htmlBodyParts = emailData['htmlBody'] as List? ?? []; - final jmapAttachments = emailData['attachments'] as List? ?? []; - - String? textBody; - if (textBodyParts.isNotEmpty) { - final partId = - (textBodyParts.first as Map)['partId'] as String?; - if (partId != null) { - textBody = - (bodyValues[partId] as Map?)?['value'] as String?; - } - } - - String? htmlBody; - if (htmlBodyParts.isNotEmpty) { - final partId = - (htmlBodyParts.first as Map)['partId'] as String?; - if (partId != null) { - htmlBody = - (bodyValues[partId] as Map?)?['value'] as String?; - } - } - - final attachmentsJson = jsonEncode(jmapAttachments.map((a) { - final att = a as Map; - return { - 'filename': att['name'] ?? '', - 'contentType': att['type'] ?? '', - 'size': att['size'] ?? 0, - 'fetchPartId': att['blobId'] ?? '', - }; - }).toList()); + final (textBody, htmlBody, attachmentsJson) = _parseJmapBody(emailData); await _db.into(_db.emailBodies).insertOnConflictUpdate( EmailBodiesCompanion.insert( @@ -375,8 +341,14 @@ class EmailRepositoryImpl implements EmailRepository { static const _emailProperties = [ 'id', 'mailboxIds', 'subject', 'sentAt', 'receivedAt', 'from', 'to', 'cc', 'keywords', 'hasAttachment', 'preview', + 'textBody', 'htmlBody', 'bodyValues', 'attachments', ]; + static const _emailGetBodyOptions = { + 'fetchHTMLBodyValues': true, + 'fetchTextBodyValues': true, + }; + Future _syncEmailsJmap( account_model.Account account, String password, @@ -428,6 +400,7 @@ class EmailRepositoryImpl implements EmailRepository { 'accountId': jmap.accountId, '#ids': {'resultOf': '0', 'name': 'Email/query', 'path': '/ids'}, 'properties': _emailProperties, + ..._emailGetBodyOptions, }, '1', ], @@ -473,6 +446,7 @@ class EmailRepositoryImpl implements EmailRepository { 'accountId': jmap.accountId, 'ids': toFetch, 'properties': _emailProperties, + ..._emailGetBodyOptions, }, '1', ] @@ -526,9 +500,64 @@ class EmailRepositoryImpl implements EmailRepository { hasAttachment: Value((m['hasAttachment'] as bool?) ?? false), ), ); + + // Cache body if the server included bodyValues in this response. + if (m.containsKey('bodyValues')) { + final (textBody, htmlBody, attachmentsJson) = _parseJmapBody(m); + await _db.into(_db.emailBodies).insertOnConflictUpdate( + EmailBodiesCompanion.insert( + emailId: dbId, + textBody: Value(textBody), + htmlBody: Value(htmlBody), + attachmentsJson: Value(attachmentsJson), + ), + ); + } } } + /// Extracts text body, HTML body, and attachments JSON from a JMAP Email object + /// that was fetched with fetchHTMLBodyValues/fetchTextBodyValues. + (String? textBody, String? htmlBody, String attachmentsJson) _parseJmapBody( + Map m) { + final bodyValues = m['bodyValues'] as Map? ?? {}; + final textBodyParts = m['textBody'] as List? ?? []; + final htmlBodyParts = m['htmlBody'] as List? ?? []; + final jmapAttachments = m['attachments'] as List? ?? []; + + String? textBody; + if (textBodyParts.isNotEmpty) { + final partId = + (textBodyParts.first as Map)['partId'] as String?; + if (partId != null) { + textBody = + (bodyValues[partId] as Map?)?['value'] as String?; + } + } + + String? htmlBody; + if (htmlBodyParts.isNotEmpty) { + final partId = + (htmlBodyParts.first as Map)['partId'] as String?; + if (partId != null) { + htmlBody = + (bodyValues[partId] as Map?)?['value'] as String?; + } + } + + final attachmentsJson = jsonEncode(jmapAttachments.map((a) { + final att = a as Map; + return { + 'filename': att['name'] ?? '', + 'contentType': att['type'] ?? '', + 'size': att['size'] ?? 0, + 'fetchPartId': att['blobId'] ?? '', + }; + }).toList()); + + return (textBody, htmlBody, attachmentsJson); + } + // ── sync_state helpers ──────────────────────────────────────────────────── Future _loadSyncState(String accountId, String resourceType) async { @@ -718,12 +747,36 @@ class EmailRepositoryImpl implements EmailRepository { password: password, ); + final ifInState = await _loadSyncState(account.id, 'Email'); + for (final row in rows) { try { - await _applyPendingChangeJmap(jmap, row); + final newState = + await _applyPendingChangeJmap(jmap, row, ifInState: ifInState); await (_db.delete(_db.pendingChanges) ..where((t) => t.id.equals(row.id))) .go(); + // Keep our checkpoint in sync with whatever the server returned. + if (newState != null) { + await _saveSyncState(account.id, 'Email', newState); + } + } on JmapStateMismatchException { + // Server rejected the mutation because our state token is stale. + // Drop the cached state so the next sync cycle does a full re-fetch, + // after which this change will be retried with a fresh token. + await (_db.delete(_db.syncStates) + ..where((t) => + t.accountId.equals(account.id) & + t.resourceType.equals('Email'))) + .go(); + await (_db.update(_db.pendingChanges) + ..where((t) => t.id.equals(row.id))) + .write(PendingChangesCompanion( + attempts: Value(row.attempts + 1), + lastError: const Value('stateMismatch — will retry after re-sync'), + )); + // State is now stale for all remaining rows too; stop processing. + break; } catch (e) { await (_db.update(_db.pendingChanges) ..where((t) => t.id.equals(row.id))) @@ -801,79 +854,99 @@ class EmailRepositoryImpl implements EmailRepository { } } - Future _applyPendingChangeJmap( - JmapClient jmap, PendingChangeRow row) async { + /// Applies a single pending change to the JMAP server. + /// + /// Returns the `newState` from the server's `Email/set` response so the + /// caller can keep the local checkpoint in sync. + /// + /// Throws [JmapStateMismatchException] when the server rejects the request + /// because [ifInState] is stale (RFC 8620 §5.3 `stateMismatch`). + Future _applyPendingChangeJmap( + JmapClient jmap, + PendingChangeRow row, { + String? ifInState, + }) async { final payload = jsonDecode(row.payload) as Map; // Extract the JMAP email ID from the DB id (format: "accountId:jmapId"). final jmapEmailId = row.resourceId.contains(':') ? row.resourceId.substring(row.resourceId.indexOf(':') + 1) : row.resourceId; + Map setArgs(Map extra) => { + 'accountId': jmap.accountId, + if (ifInState != null) 'ifInState': ifInState, + ...extra, + }; + + List responses; switch (row.changeType) { case 'flag_seen': final seen = payload['seen'] as bool; - await jmap.call([ + responses = await jmap.call([ [ 'Email/set', - { - 'accountId': jmap.accountId, + setArgs({ 'update': { - jmapEmailId: { - 'keywords/\$seen': seen, - }, + jmapEmailId: {'keywords/\$seen': seen}, }, - }, + }), '0', ] ]); case 'flag_flagged': final flagged = payload['flagged'] as bool; - await jmap.call([ + responses = await jmap.call([ [ 'Email/set', - { - 'accountId': jmap.accountId, + setArgs({ 'update': { - jmapEmailId: { - 'keywords/\$flagged': flagged, - }, + jmapEmailId: {'keywords/\$flagged': flagged}, }, - }, + }), '0', ] ]); case 'move': final destMailboxId = payload['dest'] as String; - await jmap.call([ + responses = await jmap.call([ [ 'Email/set', - { - 'accountId': jmap.accountId, + setArgs({ 'update': { jmapEmailId: { 'mailboxIds/$destMailboxId': true, 'mailboxIds/${row.resourceId}': null, }, }, - }, + }), '0', ] ]); case 'delete': - await jmap.call([ + responses = await jmap.call([ [ 'Email/set', - { - 'accountId': jmap.accountId, - 'destroy': [jmapEmailId], - }, + setArgs({'destroy': jmapEmailId}), '0', ] ]); + + default: + return null; } + + final result = _responseArgs(responses, 0, 'Email/set'); + + // stateMismatch is returned as a top-level error in the Email/set response + // (not the per-method error handled by _responseArgs). + if (result['type'] == 'stateMismatch') { + throw const JmapStateMismatchException(); + } + + return result['newState'] as String?; } @override diff --git a/test/unit/email_repository_impl_test.dart b/test/unit/email_repository_impl_test.dart index f33f016..473663d 100644 --- a/test/unit/email_repository_impl_test.dart +++ b/test/unit/email_repository_impl_test.dart @@ -1318,5 +1318,155 @@ void main() { expect(changes.first.attempts, 1); expect(changes.first.lastError, isNotNull); }); + + test('passes ifInState when sync_state exists', () async { + late Map capturedBody; + final client = MockClient((req) async { + if (req.url.path.contains('well-known')) { + return http.Response( + jsonEncode({ + 'apiUrl': 'https://jmap.example.com/api/', + 'accounts': {'acct1': {}}, + 'primaryAccounts': { + 'urn:ietf:params:jmap:core': 'acct1', + 'urn:ietf:params:jmap:mail': 'acct1', + }, + 'capabilities': {}, + 'username': 'alice@example.com', + 'state': 'sess1', + }), + 200, + ); + } + capturedBody = jsonDecode(req.body) as Map; + return http.Response( + jsonEncode({'sessionState': 's1', 'methodResponses': [ + ['Email/set', {'accountId': 'acct1', 'newState': 'est2', 'updated': {}}, '0'], + ]}), + 200, + ); + }); + + final r = _makeRepos(httpClient: client); + await r.accounts.addAccount(_jmapAccount, 'pw'); + await r.db.into(r.db.syncStates).insertOnConflictUpdate(SyncStatesCompanion.insert( + accountId: 'jmap-1', resourceType: 'Email', + state: 'est1', syncedAt: DateTime.now(), + )); + await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert( + accountId: 'jmap-1', resourceType: 'Email', resourceId: 'jmap-1:e1', + changeType: 'flag_seen', payload: '{"seen":true}', createdAt: DateTime.now(), + )); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + final firstCall = (capturedBody['methodCalls'] as List).first as List; + final args = firstCall[1] as Map; + expect(args['ifInState'], 'est1'); + + // newState returned by server should update our checkpoint + final states = await r.db.select(r.db.syncStates).get(); + expect(states.first.state, 'est2'); + }); + + test('stateMismatch clears sync state and marks change as failed', () async { + final client = MockClient((req) async { + if (req.url.path.contains('well-known')) { + return http.Response( + jsonEncode({ + 'apiUrl': 'https://jmap.example.com/api/', + 'accounts': {'acct1': {}}, + 'primaryAccounts': { + 'urn:ietf:params:jmap:core': 'acct1', + 'urn:ietf:params:jmap:mail': 'acct1', + }, + 'capabilities': {}, + 'username': 'alice@example.com', + 'state': 'sess1', + }), + 200, + ); + } + // Server responds with stateMismatch error inside Email/set + return http.Response( + jsonEncode({'sessionState': 's1', 'methodResponses': [ + ['Email/set', {'accountId': 'acct1', 'type': 'stateMismatch'}, '0'], + ]}), + 200, + ); + }); + + final r = _makeRepos(httpClient: client); + await r.accounts.addAccount(_jmapAccount, 'pw'); + await r.db.into(r.db.syncStates).insertOnConflictUpdate(SyncStatesCompanion.insert( + accountId: 'jmap-1', resourceType: 'Email', + state: 'est1', syncedAt: DateTime.now(), + )); + await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert( + accountId: 'jmap-1', resourceType: 'Email', resourceId: 'jmap-1:e1', + changeType: 'flag_seen', payload: '{"seen":true}', createdAt: DateTime.now(), + )); + + await r.emails.flushPendingChanges('jmap-1', 'pw'); + + // Sync state should be cleared so next cycle does a full re-sync + expect(await r.db.select(r.db.syncStates).get(), isEmpty); + + // Change should still be present but with attempt count bumped + final changes = await r.db.select(r.db.pendingChanges).get(); + expect(changes.first.attempts, 1); + }); + }); + + group('JMAP syncEmails body caching', () { + Map jmapEmailWithBody({ + required String id, + required String mailboxId, + String? textContent, + String? htmlContent, + }) => + { + ..._jmapEmail(id: id, mailboxId: mailboxId), + 'textBody': [if (textContent != null) {'partId': 'text1', 'type': 'text/plain'}], + 'htmlBody': [if (htmlContent != null) {'partId': 'html1', 'type': 'text/html'}], + 'bodyValues': { + if (textContent != null) 'text1': {'value': textContent, 'isEncodingProblem': false, 'isTruncated': false}, + if (htmlContent != null) 'html1': {'value': htmlContent, 'isEncodingProblem': false, 'isTruncated': false}, + }, + 'attachments': [], + }; + + test('full sync caches bodies when bodyValues are present', () async { + final r = _makeRepos( + httpClient: _mockJmapEmails(apiResponses: [ + _emailGetResponse(state: 'est1', list: [ + jmapEmailWithBody(id: 'e1', mailboxId: 'mbx1', + textContent: 'Hello text', htmlContent: '

Hello

'), + ]), + ]), + ); + await r.accounts.addAccount(_jmapAccount, 'pw'); + await r.emails.syncEmails('jmap-1', 'mbx1'); + + final bodies = await r.db.select(r.db.emailBodies).get(); + expect(bodies, hasLength(1)); + expect(bodies.first.textBody, 'Hello text'); + expect(bodies.first.htmlBody, '

Hello

'); + }); + + test('full sync does not write body row when bodyValues absent', () async { + final r = _makeRepos( + httpClient: _mockJmapEmails(apiResponses: [ + _emailGetResponse(state: 'est1', list: [ + _jmapEmail(id: 'e1', mailboxId: 'mbx1'), + ]), + ]), + ); + await r.accounts.addAccount(_jmapAccount, 'pw'); + await r.emails.syncEmails('jmap-1', 'mbx1'); + + final bodies = await r.db.select(r.db.emailBodies).get(); + expect(bodies, isEmpty); + }); }); }