feat: JMAP outbound changes via pending_changes queue (Step 7)
For JMAP accounts, setFlag/moveEmail/deleteEmail now write to the pending_changes table instead of making direct server calls, enabling offline-first mutation with durable retries. flushPendingChanges() drains the queue at the start of each JMAP sync cycle via Email/set (flag updates use keyword patches; move updates mailboxIds; delete uses Email/set destroy). On failure the attempt count and last error are recorded; the change remains queued. Local DB is updated optimistically on mutation so the UI responds immediately. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
co-authored by
Claude Sonnet 4.6
parent
559eb9a467
commit
0797dd914b
+1
-1
@@ -104,7 +104,7 @@ Add JMAP handling to `AccountSyncManager`:
|
||||
EventSource if server supports it) → repeat.
|
||||
- Reuse the existing exponential backoff pattern from `_AccountSync`.
|
||||
|
||||
### Step 7 — JMAP outbound changes `[ ]`
|
||||
### Step 7 — JMAP outbound changes `[x]`
|
||||
|
||||
Wire local mutations (flag, move, delete) for JMAP accounts into `pending_changes`
|
||||
instead of direct server calls. Add a queue-draining step at the start of each sync
|
||||
|
||||
@@ -26,4 +26,8 @@ abstract class EmailRepository {
|
||||
String mailboxPath,
|
||||
String query,
|
||||
);
|
||||
|
||||
/// Sends any queued local mutations for [accountId] to the server.
|
||||
/// No-op for IMAP accounts (mutations are applied synchronously).
|
||||
Future<void> flushPendingChanges(String accountId, String password);
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ class AccountSyncManager {
|
||||
AccountType.imap =>
|
||||
_AccountSync(account, _accounts, _mailboxes, _emails),
|
||||
AccountType.jmap =>
|
||||
_JmapAccountSync(account, _mailboxes, _emails),
|
||||
_JmapAccountSync(account, _mailboxes, _emails, _accounts),
|
||||
};
|
||||
_active[account.id] = loop;
|
||||
loop.start();
|
||||
@@ -164,11 +164,12 @@ class _AccountSync implements _SyncLoop {
|
||||
// ── JMAP ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
class _JmapAccountSync implements _SyncLoop {
|
||||
_JmapAccountSync(this.account, this._mailboxes, this._emails);
|
||||
_JmapAccountSync(this.account, this._mailboxes, this._emails, this._accounts);
|
||||
|
||||
final Account account;
|
||||
final MailboxRepository _mailboxes;
|
||||
final EmailRepository _emails;
|
||||
final AccountRepository _accounts;
|
||||
|
||||
bool _running = false;
|
||||
int _backoffSeconds = 5;
|
||||
@@ -209,9 +210,13 @@ class _JmapAccountSync implements _SyncLoop {
|
||||
}
|
||||
|
||||
Future<void> _sync() async {
|
||||
final password = await _accounts.getPassword(account.id);
|
||||
|
||||
// Drain outbound queue before pulling from server.
|
||||
await _emails.flushPendingChanges(account.id, password);
|
||||
|
||||
await _mailboxes.syncMailboxes(account.id);
|
||||
|
||||
// Sync emails for each known mailbox.
|
||||
final mailboxes = await _mailboxes.observeMailboxes(account.id).first;
|
||||
for (final mailbox in mailboxes) {
|
||||
if (!_running) break;
|
||||
|
||||
@@ -392,8 +392,29 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
..where((t) => t.id.equals(emailId)))
|
||||
.getSingle();
|
||||
final account = (await _accounts.getAccount(row.accountId))!;
|
||||
|
||||
if (account.type == account_model.AccountType.jmap) {
|
||||
if (seen != null) {
|
||||
await _enqueueChange(account.id, emailId, 'flag_seen',
|
||||
jsonEncode({'seen': seen}));
|
||||
}
|
||||
if (flagged != null) {
|
||||
await _enqueueChange(account.id, emailId, 'flag_flagged',
|
||||
jsonEncode({'flagged': flagged}));
|
||||
}
|
||||
// Optimistic local update.
|
||||
await (_db.update(_db.emails)..where((t) => t.id.equals(emailId))).write(
|
||||
EmailsCompanion(
|
||||
isSeen: seen != null ? Value(seen) : const Value.absent(),
|
||||
isFlagged: flagged != null ? Value(flagged) : const Value.absent(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
final password = await _accounts.getPassword(account.id);
|
||||
final client = await _imapConnect(account, _effectiveUsername(account), password);
|
||||
final client =
|
||||
await _imapConnect(account, _effectiveUsername(account), password);
|
||||
try {
|
||||
await client.selectMailboxByPath(row.mailboxPath);
|
||||
final seq = imap.MessageSequence.fromId(row.uid, isUid: true);
|
||||
@@ -425,8 +446,18 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
..where((t) => t.id.equals(emailId)))
|
||||
.getSingle();
|
||||
final account = (await _accounts.getAccount(row.accountId))!;
|
||||
|
||||
if (account.type == account_model.AccountType.jmap) {
|
||||
await _enqueueChange(account.id, emailId, 'move',
|
||||
jsonEncode({'dest': destMailboxPath}));
|
||||
// Optimistic: remove from current view; next sync will reconcile.
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
|
||||
return;
|
||||
}
|
||||
|
||||
final password = await _accounts.getPassword(account.id);
|
||||
final client = await _imapConnect(account, _effectiveUsername(account), password);
|
||||
final client =
|
||||
await _imapConnect(account, _effectiveUsername(account), password);
|
||||
try {
|
||||
await client.selectMailboxByPath(row.mailboxPath);
|
||||
await client.uidMove(
|
||||
@@ -445,8 +476,17 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
..where((t) => t.id.equals(emailId)))
|
||||
.getSingle();
|
||||
final account = (await _accounts.getAccount(row.accountId))!;
|
||||
|
||||
if (account.type == account_model.AccountType.jmap) {
|
||||
await _enqueueChange(
|
||||
account.id, emailId, 'delete', jsonEncode(<String, dynamic>{}));
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
|
||||
return;
|
||||
}
|
||||
|
||||
final password = await _accounts.getPassword(account.id);
|
||||
final client = await _imapConnect(account, _effectiveUsername(account), password);
|
||||
final client =
|
||||
await _imapConnect(account, _effectiveUsername(account), password);
|
||||
try {
|
||||
await client.selectMailboxByPath(row.mailboxPath);
|
||||
final seq = imap.MessageSequence.fromId(row.uid, isUid: true);
|
||||
@@ -458,6 +498,140 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
}
|
||||
}
|
||||
|
||||
// ── pending_changes queue ──────────────────────────────────────────────────
|
||||
|
||||
Future<void> _enqueueChange(
|
||||
String accountId,
|
||||
String resourceId,
|
||||
String changeType,
|
||||
String payload,
|
||||
) async {
|
||||
await _db.into(_db.pendingChanges).insert(
|
||||
PendingChangesCompanion.insert(
|
||||
accountId: accountId,
|
||||
resourceType: 'Email',
|
||||
resourceId: resourceId,
|
||||
changeType: changeType,
|
||||
payload: payload,
|
||||
createdAt: DateTime.now(),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
/// Drains pending changes for [accountId] via JMAP Email/set.
|
||||
/// Called at the start of each JMAP sync cycle.
|
||||
@override
|
||||
Future<void> flushPendingChanges(
|
||||
String accountId, String password) async {
|
||||
final rows = await (_db.select(_db.pendingChanges)
|
||||
..where((t) => t.accountId.equals(accountId))
|
||||
..orderBy([(t) => OrderingTerm.asc(t.createdAt)]))
|
||||
.get();
|
||||
if (rows.isEmpty) return;
|
||||
|
||||
final account = (await _accounts.getAccount(accountId))!;
|
||||
final jmapUrl = account.jmapUrl;
|
||||
if (jmapUrl == null || jmapUrl.isEmpty) return;
|
||||
|
||||
final jmap = await JmapClient.connect(
|
||||
httpClient: _httpClient,
|
||||
jmapUrl: Uri.parse(jmapUrl),
|
||||
username: _effectiveUsername(account),
|
||||
password: password,
|
||||
);
|
||||
|
||||
for (final row in rows) {
|
||||
try {
|
||||
await _applyPendingChange(jmap, row);
|
||||
await (_db.delete(_db.pendingChanges)
|
||||
..where((t) => t.id.equals(row.id)))
|
||||
.go();
|
||||
} catch (e) {
|
||||
await (_db.update(_db.pendingChanges)
|
||||
..where((t) => t.id.equals(row.id)))
|
||||
.write(PendingChangesCompanion(
|
||||
attempts: Value(row.attempts + 1),
|
||||
lastError: Value(e.toString()),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _applyPendingChange(
|
||||
JmapClient jmap, PendingChangeRow row) async {
|
||||
final payload = jsonDecode(row.payload) as Map<String, dynamic>;
|
||||
// Extract the JMAP email ID from the DB id (format: "accountId:jmapId").
|
||||
final jmapEmailId = row.resourceId.contains(':')
|
||||
? row.resourceId.substring(row.resourceId.indexOf(':') + 1)
|
||||
: row.resourceId;
|
||||
|
||||
switch (row.changeType) {
|
||||
case 'flag_seen':
|
||||
final seen = payload['seen'] as bool;
|
||||
await jmap.call([
|
||||
[
|
||||
'Email/set',
|
||||
{
|
||||
'accountId': jmap.accountId,
|
||||
'update': {
|
||||
jmapEmailId: {
|
||||
'keywords/\$seen': seen,
|
||||
},
|
||||
},
|
||||
},
|
||||
'0',
|
||||
]
|
||||
]);
|
||||
|
||||
case 'flag_flagged':
|
||||
final flagged = payload['flagged'] as bool;
|
||||
await jmap.call([
|
||||
[
|
||||
'Email/set',
|
||||
{
|
||||
'accountId': jmap.accountId,
|
||||
'update': {
|
||||
jmapEmailId: {
|
||||
'keywords/\$flagged': flagged,
|
||||
},
|
||||
},
|
||||
},
|
||||
'0',
|
||||
]
|
||||
]);
|
||||
|
||||
case 'move':
|
||||
final destMailboxId = payload['dest'] as String;
|
||||
await jmap.call([
|
||||
[
|
||||
'Email/set',
|
||||
{
|
||||
'accountId': jmap.accountId,
|
||||
'update': {
|
||||
jmapEmailId: {
|
||||
'mailboxIds/$destMailboxId': true,
|
||||
'mailboxIds/${row.resourceId}': null,
|
||||
},
|
||||
},
|
||||
},
|
||||
'0',
|
||||
]
|
||||
]);
|
||||
|
||||
case 'delete':
|
||||
await jmap.call([
|
||||
[
|
||||
'Email/set',
|
||||
{
|
||||
'accountId': jmap.accountId,
|
||||
'destroy': [jmapEmailId],
|
||||
},
|
||||
'0',
|
||||
]
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> sendEmail(String accountId, model.EmailDraft draft) async {
|
||||
final account = (await _accounts.getAccount(accountId))!;
|
||||
|
||||
@@ -82,6 +82,9 @@ class _FakeEmails implements EmailRepository {
|
||||
@override
|
||||
Future<void> deleteEmail(String id) async {}
|
||||
|
||||
@override
|
||||
Future<void> flushPendingChanges(String accountId, String password) async {}
|
||||
|
||||
@override
|
||||
Future<void> sendEmail(String a, EmailDraft d) async {}
|
||||
|
||||
|
||||
@@ -82,6 +82,9 @@ class FakeEmailRepository implements EmailRepository {
|
||||
@override
|
||||
Future<void> deleteEmail(String emailId) async {}
|
||||
|
||||
@override
|
||||
Future<void> flushPendingChanges(String accountId, String password) async {}
|
||||
|
||||
@override
|
||||
Future<void> sendEmail(String accountId, EmailDraft draft) async {}
|
||||
|
||||
|
||||
@@ -747,4 +747,167 @@ void main() {
|
||||
expect(states.first.state, 'est1');
|
||||
});
|
||||
});
|
||||
|
||||
group('JMAP setFlag / moveEmail / deleteEmail enqueue pending_changes', () {
|
||||
Future<void> seedJmapEmail(
|
||||
AppDatabase db, AccountRepositoryImpl accounts) async {
|
||||
await accounts.addAccount(_jmapAccount, 'pw');
|
||||
await db.into(db.emails).insert(EmailsCompanion.insert(
|
||||
id: 'jmap-1:e1',
|
||||
accountId: 'jmap-1',
|
||||
mailboxPath: 'mbx1',
|
||||
uid: 0,
|
||||
receivedAt: DateTime(2024),
|
||||
));
|
||||
}
|
||||
|
||||
test('setFlag seen enqueues flag_seen change and updates local DB', () async {
|
||||
final r = _makeRepos();
|
||||
await seedJmapEmail(r.db, r.accounts);
|
||||
|
||||
await r.emails.setFlag('jmap-1:e1', seen: true);
|
||||
|
||||
final changes = await r.db.select(r.db.pendingChanges).get();
|
||||
expect(changes, hasLength(1));
|
||||
expect(changes.first.changeType, 'flag_seen');
|
||||
expect(changes.first.payload, contains('true'));
|
||||
|
||||
final email = await r.emails.getEmail('jmap-1:e1');
|
||||
expect(email?.isSeen, isTrue);
|
||||
});
|
||||
|
||||
test('setFlag flagged enqueues flag_flagged change', () async {
|
||||
final r = _makeRepos();
|
||||
await seedJmapEmail(r.db, r.accounts);
|
||||
|
||||
await r.emails.setFlag('jmap-1:e1', flagged: true);
|
||||
|
||||
final changes = await r.db.select(r.db.pendingChanges).get();
|
||||
expect(changes.first.changeType, 'flag_flagged');
|
||||
});
|
||||
|
||||
test('moveEmail enqueues move change and removes email from local DB', () async {
|
||||
final r = _makeRepos();
|
||||
await seedJmapEmail(r.db, r.accounts);
|
||||
|
||||
await r.emails.moveEmail('jmap-1:e1', 'mbx2');
|
||||
|
||||
final changes = await r.db.select(r.db.pendingChanges).get();
|
||||
expect(changes.first.changeType, 'move');
|
||||
expect(changes.first.payload, contains('mbx2'));
|
||||
|
||||
expect(await r.emails.getEmail('jmap-1:e1'), isNull);
|
||||
});
|
||||
|
||||
test('deleteEmail enqueues delete change and removes email from local DB', () async {
|
||||
final r = _makeRepos();
|
||||
await seedJmapEmail(r.db, r.accounts);
|
||||
|
||||
await r.emails.deleteEmail('jmap-1:e1');
|
||||
|
||||
final changes = await r.db.select(r.db.pendingChanges).get();
|
||||
expect(changes.first.changeType, 'delete');
|
||||
expect(await r.emails.getEmail('jmap-1:e1'), isNull);
|
||||
});
|
||||
});
|
||||
|
||||
group('JMAP flushPendingChanges', () {
|
||||
http.Client mockFlush(int apiStatus) {
|
||||
return MockClient((req) async {
|
||||
if (req.url.path.contains('well-known')) {
|
||||
return http.Response(
|
||||
jsonEncode({
|
||||
'apiUrl': 'https://jmap.example.com/api/',
|
||||
'accounts': {'acct1': {'name': 'alice@example.com', 'isPersonal': true}},
|
||||
'primaryAccounts': {
|
||||
'urn:ietf:params:jmap:core': 'acct1',
|
||||
'urn:ietf:params:jmap:mail': 'acct1',
|
||||
},
|
||||
'capabilities': {},
|
||||
'username': 'alice@example.com',
|
||||
'state': 'sess1',
|
||||
}),
|
||||
200,
|
||||
);
|
||||
}
|
||||
return http.Response(
|
||||
jsonEncode({'sessionState': 's1', 'methodResponses': [
|
||||
['Email/set', {'accountId': 'acct1', 'updated': {}, 'destroyed': []}, '0'],
|
||||
]}),
|
||||
apiStatus,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
Future<void> seedChange(AppDatabase db, AccountRepositoryImpl accounts,
|
||||
{String changeType = 'flag_seen', String payload = '{"seen":true}'}) async {
|
||||
await accounts.addAccount(_jmapAccount, 'pw');
|
||||
await db.into(db.pendingChanges).insert(PendingChangesCompanion.insert(
|
||||
accountId: 'jmap-1',
|
||||
resourceType: 'Email',
|
||||
resourceId: 'jmap-1:e1',
|
||||
changeType: changeType,
|
||||
payload: payload,
|
||||
createdAt: DateTime.now(),
|
||||
));
|
||||
}
|
||||
|
||||
test('no-op when no pending changes', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(200));
|
||||
await r.accounts.addAccount(_jmapAccount, 'pw');
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
|
||||
});
|
||||
|
||||
test('sends flag_seen and removes change on success', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(200));
|
||||
await seedChange(r.db, r.accounts);
|
||||
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
|
||||
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
|
||||
});
|
||||
|
||||
test('sends flag_flagged and removes change on success', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(200));
|
||||
await seedChange(r.db, r.accounts,
|
||||
changeType: 'flag_flagged', payload: '{"flagged":true}');
|
||||
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
|
||||
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
|
||||
});
|
||||
|
||||
test('sends move and removes change on success', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(200));
|
||||
await seedChange(r.db, r.accounts,
|
||||
changeType: 'move', payload: '{"dest":"mbx2"}');
|
||||
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
|
||||
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
|
||||
});
|
||||
|
||||
test('sends delete and removes change on success', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(200));
|
||||
await seedChange(r.db, r.accounts,
|
||||
changeType: 'delete', payload: '{}');
|
||||
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
|
||||
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
|
||||
});
|
||||
|
||||
test('records attempt count and error on API failure', () async {
|
||||
final r = _makeRepos(httpClient: mockFlush(500));
|
||||
await seedChange(r.db, r.accounts);
|
||||
|
||||
await r.emails.flushPendingChanges('jmap-1', 'pw');
|
||||
|
||||
final changes = await r.db.select(r.db.pendingChanges).get();
|
||||
expect(changes, hasLength(1));
|
||||
expect(changes.first.attempts, 1);
|
||||
expect(changes.first.lastError, isNotNull);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -168,6 +168,9 @@ class FakeEmailRepository implements EmailRepository {
|
||||
@override
|
||||
Future<void> deleteEmail(String emailId) async {}
|
||||
|
||||
@override
|
||||
Future<void> flushPendingChanges(String accountId, String password) async {}
|
||||
|
||||
@override
|
||||
Future<void> sendEmail(String accountId, EmailDraft draft) async {}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user