feat: IMAP incremental sync via sync_state checkpoints
_syncEmailsImap now stores {uidValidity, lastUid} per mailbox in the
sync_state table after each full sync. Subsequent syncs only fetch
UIDs newer than lastUid (UID N+1:*) and then do an ALL search to
reconcile remote deletions — avoiding a full re-download on every poll.
When UID validity changes the stale local emails are discarded and a
full re-sync is performed automatically.
fake_imap: add uidValidityResult + searchCallQueue so tests can feed
distinct responses to consecutive uidSearchMessages calls.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
co-authored by
Claude Sonnet 4.6
parent
091c848d0e
commit
bf66b2118e
@@ -1,5 +1,6 @@
|
||||
import 'dart:convert';
|
||||
import 'dart:io';
|
||||
import 'dart:math' as math;
|
||||
|
||||
import 'package:drift/drift.dart';
|
||||
import 'package:enough_mail/enough_mail.dart' as imap;
|
||||
@@ -146,41 +147,127 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
final client =
|
||||
await _imapConnect(account, _effectiveUsername(account), password);
|
||||
try {
|
||||
await client.selectMailboxByPath(mailboxPath);
|
||||
final fetch = await client.fetchMessages(
|
||||
imap.MessageSequence.fromAll(),
|
||||
'(UID FLAGS ENVELOPE BODYSTRUCTURE)',
|
||||
);
|
||||
for (final msg in fetch.messages) {
|
||||
final envelope = msg.envelope;
|
||||
if (envelope == null) continue;
|
||||
final uid = msg.uid;
|
||||
if (uid == null) continue;
|
||||
final emailId = '${account.id}:$uid';
|
||||
final selectedMailbox = await client.selectMailboxByPath(mailboxPath);
|
||||
final uidValidity = selectedMailbox.uidValidity ?? 0;
|
||||
final resourceType = 'IMAP:$mailboxPath';
|
||||
final checkpoint = await _loadImapCheckpoint(account.id, resourceType);
|
||||
|
||||
await _db.into(_db.emails).insertOnConflictUpdate(
|
||||
EmailsCompanion.insert(
|
||||
id: emailId,
|
||||
accountId: account.id,
|
||||
mailboxPath: mailboxPath,
|
||||
uid: uid,
|
||||
subject: Value(envelope.subject),
|
||||
sentAt: Value(envelope.date),
|
||||
receivedAt: envelope.date ?? DateTime.now(),
|
||||
fromJson: Value(_encodeAddresses(envelope.from)),
|
||||
toAddresses: Value(_encodeAddresses(envelope.to)),
|
||||
ccJson: Value(_encodeAddresses(envelope.cc)),
|
||||
isSeen: Value(msg.flags?.contains(r'\Seen') ?? false),
|
||||
isFlagged: Value(msg.flags?.contains(r'\Flagged') ?? false),
|
||||
hasAttachment: Value(msg.hasAttachments()),
|
||||
),
|
||||
);
|
||||
if (checkpoint == null || checkpoint['uidValidity'] != uidValidity) {
|
||||
// First run or UID validity changed — full sync.
|
||||
if (checkpoint != null) {
|
||||
// UID validity changed: remove stale local emails for this mailbox.
|
||||
await (_db.delete(_db.emails)
|
||||
..where((t) =>
|
||||
t.accountId.equals(account.id) &
|
||||
t.mailboxPath.equals(mailboxPath)))
|
||||
.go();
|
||||
}
|
||||
await _fetchAndUpsertImap(
|
||||
client, account, mailboxPath, imap.MessageSequence.fromAll());
|
||||
final maxUid = await _maxLocalUid(account.id, mailboxPath);
|
||||
await _saveImapCheckpoint(
|
||||
account.id, resourceType, uidValidity, maxUid);
|
||||
} else {
|
||||
// Incremental sync.
|
||||
final lastUid = checkpoint['lastUid'] as int;
|
||||
final newUids =
|
||||
(await client.uidSearchMessages(searchCriteria: 'UID ${lastUid + 1}:*'))
|
||||
.matchingSequence
|
||||
?.toList() ??
|
||||
[];
|
||||
if (newUids.isNotEmpty) {
|
||||
await _fetchAndUpsertImap(client, account, mailboxPath,
|
||||
imap.MessageSequence.fromIds(newUids, isUid: true));
|
||||
}
|
||||
// Detect remote deletions.
|
||||
final serverUids =
|
||||
(await client.uidSearchMessages(searchCriteria: 'ALL'))
|
||||
.matchingSequence
|
||||
?.toList() ??
|
||||
[];
|
||||
await _reconcileDeletedImap(account.id, mailboxPath, serverUids);
|
||||
final maxUid =
|
||||
serverUids.isEmpty ? lastUid : serverUids.reduce(math.max);
|
||||
await _saveImapCheckpoint(
|
||||
account.id, resourceType, uidValidity, maxUid);
|
||||
}
|
||||
} finally {
|
||||
await client.logout();
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _fetchAndUpsertImap(
|
||||
imap.ImapClient client,
|
||||
account_model.Account account,
|
||||
String mailboxPath,
|
||||
imap.MessageSequence sequence,
|
||||
) async {
|
||||
final fetch = await client.fetchMessages(
|
||||
sequence, '(UID FLAGS ENVELOPE BODYSTRUCTURE)');
|
||||
for (final msg in fetch.messages) {
|
||||
final envelope = msg.envelope;
|
||||
if (envelope == null) continue;
|
||||
final uid = msg.uid;
|
||||
if (uid == null) continue;
|
||||
final emailId = '${account.id}:$uid';
|
||||
await _db.into(_db.emails).insertOnConflictUpdate(
|
||||
EmailsCompanion.insert(
|
||||
id: emailId,
|
||||
accountId: account.id,
|
||||
mailboxPath: mailboxPath,
|
||||
uid: uid,
|
||||
subject: Value(envelope.subject),
|
||||
sentAt: Value(envelope.date),
|
||||
receivedAt: envelope.date ?? DateTime.now(),
|
||||
fromJson: Value(_encodeAddresses(envelope.from)),
|
||||
toAddresses: Value(_encodeAddresses(envelope.to)),
|
||||
ccJson: Value(_encodeAddresses(envelope.cc)),
|
||||
isSeen: Value(msg.flags?.contains(r'\Seen') ?? false),
|
||||
isFlagged: Value(msg.flags?.contains(r'\Flagged') ?? false),
|
||||
hasAttachment: Value(msg.hasAttachments()),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Future<int> _maxLocalUid(String accountId, String mailboxPath) async {
|
||||
final rows = await (_db.select(_db.emails)
|
||||
..where((t) =>
|
||||
t.accountId.equals(accountId) &
|
||||
t.mailboxPath.equals(mailboxPath)))
|
||||
.get();
|
||||
if (rows.isEmpty) return 0;
|
||||
return rows.map((r) => r.uid).reduce(math.max);
|
||||
}
|
||||
|
||||
Future<Map<String, dynamic>?> _loadImapCheckpoint(
|
||||
String accountId, String resourceType) async {
|
||||
final raw = await _loadSyncState(accountId, resourceType);
|
||||
if (raw == null) return null;
|
||||
return jsonDecode(raw) as Map<String, dynamic>;
|
||||
}
|
||||
|
||||
Future<void> _saveImapCheckpoint(String accountId, String resourceType,
|
||||
int uidValidity, int lastUid) async {
|
||||
await _saveSyncState(accountId, resourceType,
|
||||
jsonEncode({'uidValidity': uidValidity, 'lastUid': lastUid}));
|
||||
}
|
||||
|
||||
Future<void> _reconcileDeletedImap(
|
||||
String accountId, String mailboxPath, List<int> serverUids) async {
|
||||
final serverUidSet = serverUids.toSet();
|
||||
final localRows = await (_db.select(_db.emails)
|
||||
..where((t) =>
|
||||
t.accountId.equals(accountId) &
|
||||
t.mailboxPath.equals(mailboxPath)))
|
||||
.get();
|
||||
for (final row in localRows) {
|
||||
if (!serverUidSet.contains(row.uid)) {
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(row.id))).go();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── JMAP email sync ────────────────────────────────────────────────────────
|
||||
|
||||
static const _emailProperties = [
|
||||
|
||||
@@ -533,6 +533,131 @@ void main() {
|
||||
expect(results, isEmpty);
|
||||
});
|
||||
|
||||
test('syncEmails saves IMAP checkpoint after full sync', () async {
|
||||
final r = _makeReposWithFakes();
|
||||
await r.accounts.addAccount(_account, 'pw');
|
||||
r.fakeImap.uidValidityResult = 1000;
|
||||
r.fakeImap.fetchResults = [
|
||||
buildEnvelopeMessage(uid: 10, subject: 'First'),
|
||||
buildEnvelopeMessage(uid: 20, subject: 'Second'),
|
||||
];
|
||||
|
||||
await r.emails.syncEmails('acc-1', 'INBOX');
|
||||
|
||||
final states = await r.db.select(r.db.syncStates).get();
|
||||
expect(states, hasLength(1));
|
||||
final checkpoint =
|
||||
jsonDecode(states.first.state) as Map<String, dynamic>;
|
||||
expect(checkpoint['uidValidity'], 1000);
|
||||
expect(checkpoint['lastUid'], 20);
|
||||
});
|
||||
|
||||
test('syncEmails incremental sync fetches only messages newer than checkpoint',
|
||||
() async {
|
||||
final r = _makeReposWithFakes();
|
||||
await r.accounts.addAccount(_account, 'pw');
|
||||
r.fakeImap.uidValidityResult = 1000;
|
||||
await r.db.into(r.db.syncStates).insertOnConflictUpdate(
|
||||
SyncStatesCompanion.insert(
|
||||
accountId: 'acc-1',
|
||||
resourceType: 'IMAP:INBOX',
|
||||
state: jsonEncode({'uidValidity': 1000, 'lastUid': 10}),
|
||||
syncedAt: DateTime.now(),
|
||||
),
|
||||
);
|
||||
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
|
||||
id: 'acc-1:10',
|
||||
accountId: 'acc-1',
|
||||
mailboxPath: 'INBOX',
|
||||
uid: 10,
|
||||
receivedAt: DateTime(2024),
|
||||
));
|
||||
// Call 1 (UID 11:*): returns uid 20; call 2 (ALL): returns [10, 20]
|
||||
r.fakeImap.searchCallQueue = [
|
||||
[20],
|
||||
[10, 20]
|
||||
];
|
||||
r.fakeImap.fetchResults = [buildEnvelopeMessage(uid: 20, subject: 'New')];
|
||||
|
||||
await r.emails.syncEmails('acc-1', 'INBOX');
|
||||
|
||||
final emails =
|
||||
await r.emails.observeEmails('acc-1', 'INBOX').first;
|
||||
expect(emails.map((e) => e.uid).toSet(), {10, 20});
|
||||
final state = jsonDecode(
|
||||
(await r.db.select(r.db.syncStates).get()).first.state)
|
||||
as Map<String, dynamic>;
|
||||
expect(state['lastUid'], 20);
|
||||
});
|
||||
|
||||
test('syncEmails reconciliation removes emails deleted on server', () async {
|
||||
final r = _makeReposWithFakes();
|
||||
await r.accounts.addAccount(_account, 'pw');
|
||||
r.fakeImap.uidValidityResult = 1000;
|
||||
await r.db.into(r.db.syncStates).insertOnConflictUpdate(
|
||||
SyncStatesCompanion.insert(
|
||||
accountId: 'acc-1',
|
||||
resourceType: 'IMAP:INBOX',
|
||||
state: jsonEncode({'uidValidity': 1000, 'lastUid': 20}),
|
||||
syncedAt: DateTime.now(),
|
||||
),
|
||||
);
|
||||
for (final uid in [10, 20]) {
|
||||
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
|
||||
id: 'acc-1:$uid',
|
||||
accountId: 'acc-1',
|
||||
mailboxPath: 'INBOX',
|
||||
uid: uid,
|
||||
receivedAt: DateTime(2024),
|
||||
));
|
||||
}
|
||||
// No new UIDs; server only has uid=10 (uid=20 was deleted)
|
||||
r.fakeImap.searchCallQueue = [[], [10]];
|
||||
|
||||
await r.emails.syncEmails('acc-1', 'INBOX');
|
||||
|
||||
final emails =
|
||||
await r.emails.observeEmails('acc-1', 'INBOX').first;
|
||||
expect(emails, hasLength(1));
|
||||
expect(emails.first.uid, 10);
|
||||
});
|
||||
|
||||
test('syncEmails full re-sync when UID validity changes', () async {
|
||||
final r = _makeReposWithFakes();
|
||||
await r.accounts.addAccount(_account, 'pw');
|
||||
r.fakeImap.uidValidityResult = 9999;
|
||||
await r.db.into(r.db.syncStates).insertOnConflictUpdate(
|
||||
SyncStatesCompanion.insert(
|
||||
accountId: 'acc-1',
|
||||
resourceType: 'IMAP:INBOX',
|
||||
state: jsonEncode({'uidValidity': 1000, 'lastUid': 50}),
|
||||
syncedAt: DateTime.now(),
|
||||
),
|
||||
);
|
||||
await r.db.into(r.db.emails).insert(EmailsCompanion.insert(
|
||||
id: 'acc-1:50',
|
||||
accountId: 'acc-1',
|
||||
mailboxPath: 'INBOX',
|
||||
uid: 50,
|
||||
receivedAt: DateTime(2024),
|
||||
));
|
||||
r.fakeImap.fetchResults = [
|
||||
buildEnvelopeMessage(uid: 1, subject: 'Fresh start'),
|
||||
];
|
||||
|
||||
await r.emails.syncEmails('acc-1', 'INBOX');
|
||||
|
||||
final emails =
|
||||
await r.emails.observeEmails('acc-1', 'INBOX').first;
|
||||
expect(emails, hasLength(1));
|
||||
expect(emails.first.uid, 1);
|
||||
final state = jsonDecode(
|
||||
(await r.db.select(r.db.syncStates).get()).first.state)
|
||||
as Map<String, dynamic>;
|
||||
expect(state['uidValidity'], 9999);
|
||||
expect(state['lastUid'], 1);
|
||||
});
|
||||
|
||||
test('syncEmails skips messages with no envelope or no uid', () async {
|
||||
final r = _makeReposWithFakes();
|
||||
await r.accounts.addAccount(_account, 'pw');
|
||||
|
||||
@@ -8,6 +8,10 @@ class FakeImapClient extends imap.ImapClient {
|
||||
List<imap.MimeMessage> fetchResults = [];
|
||||
List<imap.Mailbox> listMailboxesResult = [];
|
||||
List<int> searchUids = [];
|
||||
/// If set, each [uidSearchMessages] call pops the first element.
|
||||
/// Falls back to [searchUids] when the queue is empty or null.
|
||||
List<List<int>>? searchCallQueue;
|
||||
int uidValidityResult = 0;
|
||||
bool logoutCalled = false;
|
||||
bool throwOnStatus = false;
|
||||
int markSeenCalls = 0;
|
||||
@@ -32,6 +36,7 @@ class FakeImapClient extends imap.ImapClient {
|
||||
encodedPath: path,
|
||||
flags: [],
|
||||
pathSeparator: '/',
|
||||
uidValidity: uidValidityResult,
|
||||
);
|
||||
|
||||
@override
|
||||
@@ -150,10 +155,13 @@ class FakeImapClient extends imap.ImapClient {
|
||||
List<imap.ReturnOption>? returnOptions,
|
||||
Duration? responseTimeout,
|
||||
}) async {
|
||||
final uids = (searchCallQueue != null && searchCallQueue!.isNotEmpty)
|
||||
? searchCallQueue!.removeAt(0)
|
||||
: searchUids;
|
||||
final result = imap.SearchImapResult();
|
||||
if (searchUids.isNotEmpty) {
|
||||
if (uids.isNotEmpty) {
|
||||
result.matchingSequence =
|
||||
imap.MessageSequence.fromIds(searchUids, isUid: true);
|
||||
imap.MessageSequence.fromIds(uids, isUid: true);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user