feat: IMAP durable outbound queue via pending_changes

setFlag/moveEmail/deleteEmail for IMAP accounts now enqueue to
pending_changes (with uid + mailboxPath in the payload) and apply an
optimistic local update, instead of calling the IMAP server directly.

flushPendingChanges dispatches on account type: JMAP uses the existing
Email/set path; IMAP opens one connection and drains all queued changes.
Connection failure marks every queued row with an incremented attempt
count so retries work correctly.

_AccountSync._sync() now calls flushPendingChanges before syncing so
queued mutations are delivered on the next poll.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Güttler
2026-04-19 16:48:13 +02:00
co-authored by Claude Sonnet 4.6
parent 4bf157c550
commit 091c848d0e
3 changed files with 228 additions and 77 deletions
+2
View File
@@ -121,6 +121,8 @@ class _AccountSync implements _SyncLoop {
}
Future<void> _sync() async {
final password = await _accounts.getPassword(account.id);
await _emails.flushPendingChanges(account.id, password);
await _mailboxes.syncMailboxes(account.id);
final mailboxes = await _mailboxes.observeMailboxes(account.id).first;
for (final mailbox in mailboxes) {
@@ -410,32 +410,20 @@ class EmailRepositoryImpl implements EmailRepository {
return;
}
final password = await _accounts.getPassword(account.id);
final client =
await _imapConnect(account, _effectiveUsername(account), password);
try {
await client.selectMailboxByPath(row.mailboxPath);
final seq = imap.MessageSequence.fromId(row.uid, isUid: true);
if (seen != null) {
seen
? await client.uidMarkSeen(seq)
: await client.uidMarkUnseen(seq);
}
if (flagged != null) {
flagged
? await client.uidMarkFlagged(seq)
: await client.uidMarkUnflagged(seq);
}
await (_db.update(_db.emails)..where((t) => t.id.equals(emailId)))
.write(
EmailsCompanion(
isSeen: seen != null ? Value(seen) : const Value.absent(),
isFlagged: flagged != null ? Value(flagged) : const Value.absent(),
),
);
} finally {
await client.logout();
if (seen != null) {
await _enqueueChange(account.id, emailId, 'flag_seen',
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath, 'seen': seen}));
}
if (flagged != null) {
await _enqueueChange(account.id, emailId, 'flag_flagged',
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath, 'flagged': flagged}));
}
await (_db.update(_db.emails)..where((t) => t.id.equals(emailId))).write(
EmailsCompanion(
isSeen: seen != null ? Value(seen) : const Value.absent(),
isFlagged: flagged != null ? Value(flagged) : const Value.absent(),
),
);
}
@override
@@ -453,19 +441,9 @@ class EmailRepositoryImpl implements EmailRepository {
return;
}
final password = await _accounts.getPassword(account.id);
final client =
await _imapConnect(account, _effectiveUsername(account), password);
try {
await client.selectMailboxByPath(row.mailboxPath);
await client.uidMove(
imap.MessageSequence.fromId(row.uid, isUid: true),
targetMailboxPath: destMailboxPath,
);
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
} finally {
await client.logout();
}
await _enqueueChange(account.id, emailId, 'move',
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath, 'dest': destMailboxPath}));
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
}
@override
@@ -482,18 +460,9 @@ class EmailRepositoryImpl implements EmailRepository {
return;
}
final password = await _accounts.getPassword(account.id);
final client =
await _imapConnect(account, _effectiveUsername(account), password);
try {
await client.selectMailboxByPath(row.mailboxPath);
final seq = imap.MessageSequence.fromId(row.uid, isUid: true);
await client.uidMarkDeleted(seq);
await client.uidExpunge(seq);
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
} finally {
await client.logout();
}
await _enqueueChange(account.id, emailId, 'delete',
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath}));
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
}
// ── pending_changes queue ──────────────────────────────────────────────────
@@ -516,8 +485,8 @@ class EmailRepositoryImpl implements EmailRepository {
);
}
/// Drains pending changes for [accountId] via JMAP Email/set.
/// Called at the start of each JMAP sync cycle.
/// Drains pending changes for [accountId] via the appropriate protocol.
/// Called at the start of each sync cycle.
@override
Future<void> flushPendingChanges(
String accountId, String password) async {
@@ -528,6 +497,16 @@ class EmailRepositoryImpl implements EmailRepository {
if (rows.isEmpty) return;
final account = (await _accounts.getAccount(accountId))!;
switch (account.type) {
case account_model.AccountType.imap:
await _flushPendingChangesImap(account, password, rows);
case account_model.AccountType.jmap:
await _flushPendingChangesJmap(account, password, rows);
}
}
Future<void> _flushPendingChangesJmap(account_model.Account account,
String password, List<PendingChangeRow> rows) async {
final jmapUrl = account.jmapUrl;
if (jmapUrl == null || jmapUrl.isEmpty) return;
@@ -540,7 +519,7 @@ class EmailRepositoryImpl implements EmailRepository {
for (final row in rows) {
try {
await _applyPendingChange(jmap, row);
await _applyPendingChangeJmap(jmap, row);
await (_db.delete(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.go();
@@ -555,7 +534,73 @@ class EmailRepositoryImpl implements EmailRepository {
}
}
Future<void> _applyPendingChange(
Future<void> _flushPendingChangesImap(account_model.Account account,
String password, List<PendingChangeRow> rows) async {
imap.ImapClient? client;
try {
client =
await _imapConnect(account, _effectiveUsername(account), password);
} catch (e) {
for (final row in rows) {
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: Value(e.toString()),
));
}
return;
}
try {
for (final row in rows) {
try {
await _applyPendingChangeImap(client, row);
await (_db.delete(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.go();
} catch (e) {
await (_db.update(_db.pendingChanges)
..where((t) => t.id.equals(row.id)))
.write(PendingChangesCompanion(
attempts: Value(row.attempts + 1),
lastError: Value(e.toString()),
));
}
}
} finally {
await client.logout();
}
}
Future<void> _applyPendingChangeImap(
imap.ImapClient client, PendingChangeRow row) async {
final payload = jsonDecode(row.payload) as Map<String, dynamic>;
final uid = payload['uid'] as int;
final mailboxPath = payload['mailboxPath'] as String;
final seq = imap.MessageSequence.fromId(uid, isUid: true);
await client.selectMailboxByPath(mailboxPath);
switch (row.changeType) {
case 'flag_seen':
final seen = payload['seen'] as bool;
seen
? await client.uidMarkSeen(seq)
: await client.uidMarkUnseen(seq);
case 'flag_flagged':
final flagged = payload['flagged'] as bool;
flagged
? await client.uidMarkFlagged(seq)
: await client.uidMarkUnflagged(seq);
case 'move':
await client.uidMove(seq,
targetMailboxPath: payload['dest'] as String);
case 'delete':
await client.uidMarkDeleted(seq);
await client.uidExpunge(seq);
}
}
Future<void> _applyPendingChangeJmap(
JmapClient jmap, PendingChangeRow row) async {
final payload = jsonDecode(row.payload) as Map<String, dynamic>;
// Extract the JMAP email ID from the DB id (format: "accountId:jmapId").
+127 -23
View File
@@ -366,8 +366,8 @@ void main() {
expect(r.fakeImap.logoutCalled, isTrue);
});
test('setFlag seen=true calls uidMarkSeen and updates DB', () async {
final r = _makeReposWithFakes();
test('setFlag seen=true enqueues flag_seen change and updates local DB', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -379,15 +379,16 @@ void main() {
await r.emails.setFlag('acc-1:5', seen: true);
expect(r.fakeImap.markSeenCalls, 1);
expect(r.fakeImap.markUnseenCalls, 0);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes, hasLength(1));
expect(changes.first.changeType, 'flag_seen');
expect(changes.first.payload, contains('"seen":true'));
final email = await r.emails.getEmail('acc-1:5');
expect(email!.isSeen, isTrue);
expect(r.fakeImap.logoutCalled, isTrue);
});
test('setFlag seen=false calls uidMarkUnseen', () async {
final r = _makeReposWithFakes();
test('setFlag seen=false enqueues flag_seen change with seen=false', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -395,17 +396,20 @@ void main() {
mailboxPath: 'INBOX',
uid: 5,
receivedAt: DateTime(2024),
isSeen: const Value(true),
));
await r.emails.setFlag('acc-1:5', seen: false);
expect(r.fakeImap.markUnseenCalls, 1);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.changeType, 'flag_seen');
expect(changes.first.payload, contains('"seen":false'));
final email = await r.emails.getEmail('acc-1:5');
expect(email!.isSeen, isFalse);
});
test('setFlag flagged=true calls uidMarkFlagged', () async {
final r = _makeReposWithFakes();
test('setFlag flagged=true enqueues flag_flagged change', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -417,13 +421,14 @@ void main() {
await r.emails.setFlag('acc-1:5', flagged: true);
expect(r.fakeImap.markFlaggedCalls, 1);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.changeType, 'flag_flagged');
final email = await r.emails.getEmail('acc-1:5');
expect(email!.isFlagged, isTrue);
});
test('setFlag flagged=false calls uidMarkUnflagged', () async {
final r = _makeReposWithFakes();
test('setFlag flagged=false enqueues flag_flagged change with flagged=false', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -431,15 +436,18 @@ void main() {
mailboxPath: 'INBOX',
uid: 5,
receivedAt: DateTime(2024),
isFlagged: const Value(true),
));
await r.emails.setFlag('acc-1:5', flagged: false);
expect(r.fakeImap.markUnflaggedCalls, 1);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.changeType, 'flag_flagged');
expect(changes.first.payload, contains('"flagged":false'));
});
test('moveEmail removes email from DB and calls uidMove', () async {
final r = _makeReposWithFakes();
test('moveEmail enqueues move change and removes email from local DB', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -451,13 +459,14 @@ void main() {
await r.emails.moveEmail('acc-1:5', 'Archive');
expect(r.fakeImap.moveEmailCalls, 1);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.changeType, 'move');
expect(changes.first.payload, contains('Archive'));
expect(await r.emails.getEmail('acc-1:5'), isNull);
expect(r.fakeImap.logoutCalled, isTrue);
});
test('deleteEmail removes email from DB and marks deleted', () async {
final r = _makeReposWithFakes();
test('deleteEmail enqueues delete change and removes email from local DB', () async {
final r = _makeRepos();
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
id: 'acc-1:5',
@@ -469,10 +478,9 @@ void main() {
await r.emails.deleteEmail('acc-1:5');
expect(r.fakeImap.markDeletedCalls, 1);
expect(r.fakeImap.expungeCalls, 1);
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes.first.changeType, 'delete');
expect(await r.emails.getEmail('acc-1:5'), isNull);
expect(r.fakeImap.logoutCalled, isTrue);
});
test('sendEmail sends via SMTP and appends copy to Sent folder', () async {
@@ -666,6 +674,102 @@ void main() {
});
});
group('IMAP flushPendingChanges', () {
Future<void> seedImapChange(
AppDatabase db,
AccountRepositoryImpl accounts, {
String changeType = 'flag_seen',
String payload = '{"uid":5,"mailboxPath":"INBOX","seen":true}',
}) async {
await accounts.addAccount(_account, 'pw');
await db.into(db.pendingChanges).insert(PendingChangesCompanion.insert(
accountId: 'acc-1',
resourceType: 'Email',
resourceId: 'acc-1:5',
changeType: changeType,
payload: payload,
createdAt: DateTime.now(),
));
}
test('flag_seen sends uidMarkSeen and removes change', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts);
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.markSeenCalls, 1);
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
expect(r.fakeImap.logoutCalled, isTrue);
});
test('flag_seen false sends uidMarkUnseen and removes change', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts,
payload: '{"uid":5,"mailboxPath":"INBOX","seen":false}');
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.markUnseenCalls, 1);
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
test('flag_flagged sends uidMarkFlagged and removes change', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts,
changeType: 'flag_flagged',
payload: '{"uid":5,"mailboxPath":"INBOX","flagged":true}');
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.markFlaggedCalls, 1);
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
test('flag_flagged false sends uidMarkUnflagged', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts,
changeType: 'flag_flagged',
payload: '{"uid":5,"mailboxPath":"INBOX","flagged":false}');
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.markUnflaggedCalls, 1);
});
test('move sends uidMove and removes change', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts,
changeType: 'move',
payload: '{"uid":5,"mailboxPath":"INBOX","dest":"Archive"}');
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.moveEmailCalls, 1);
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
test('delete sends uidMarkDeleted + uidExpunge and removes change', () async {
final r = _makeReposWithFakes();
await seedImapChange(r.db, r.accounts,
changeType: 'delete',
payload: '{"uid":5,"mailboxPath":"INBOX"}');
await r.emails.flushPendingChanges('acc-1', 'pw');
expect(r.fakeImap.markDeletedCalls, 1);
expect(r.fakeImap.expungeCalls, 1);
expect(await r.db.select(r.db.pendingChanges).get(), isEmpty);
});
test('records attempt and error when IMAP throws', () async {
final r = _makeRepos();
// _makeRepos uses _noImapConnect which throws UnsupportedError
await r.accounts.addAccount(_account, 'pw');
await r.db.into(r.db.pendingChanges).insert(PendingChangesCompanion.insert(
accountId: 'acc-1',
resourceType: 'Email',
resourceId: 'acc-1:5',
changeType: 'flag_seen',
payload: '{"uid":5,"mailboxPath":"INBOX","seen":true}',
createdAt: DateTime.now(),
));
await r.emails.flushPendingChanges('acc-1', 'pw');
final changes = await r.db.select(r.db.pendingChanges).get();
expect(changes, hasLength(1));
expect(changes.first.attempts, 1);
expect(changes.first.lastError, isNotNull);
});
});
group('JMAP syncEmails', () {
test('full sync upserts emails and persists state', () async {
final r = _makeRepos(