feat: JMAP background sync worker (Step 6)

AccountSyncManager now starts a _JmapAccountSync loop for JMAP accounts
alongside the existing _AccountSync for IMAP accounts.

_JmapAccountSync:
- Syncs mailboxes then emails for each known mailbox per cycle.
- Polls every 30 seconds (no IDLE for JMAP; EventSource deferred).
- Reuses the same exponential backoff (5–300 s) on failure.
- stop() interrupts the poll wait immediately via a Completer.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Güttler
2026-04-19 16:22:09 +02:00
co-authored by Claude Sonnet 4.6
parent 2efeba9d2e
commit 559eb9a467
3 changed files with 195 additions and 17 deletions
+1 -1
View File
@@ -95,7 +95,7 @@ Implement `syncEmails(accountId, mailboxId)` for JMAP:
No new DB columns needed beyond `sync_state`.
### Step 6 — JMAP background sync worker `[ ]`
### Step 6 — JMAP background sync worker `[x]`
Add JMAP handling to `AccountSyncManager`:
+106 -16
View File
@@ -9,8 +9,10 @@ import '../repositories/mailbox_repository.dart';
import '../utils/logger.dart';
import '../../data/imap/imap_client_factory.dart';
/// Manages one IMAP IDLE connection per account.
/// On a new-message notification it triggers a re-sync then goes back to IDLE.
/// Manages background sync for all accounts.
///
/// IMAP accounts get an IDLE-based sync loop (_AccountSync).
/// JMAP accounts get a polling-based sync loop (_JmapAccountSync).
class AccountSyncManager {
AccountSyncManager(this._accounts, this._mailboxes, this._emails);
@@ -18,24 +20,25 @@ class AccountSyncManager {
final MailboxRepository _mailboxes;
final EmailRepository _emails;
final Map<String, _AccountSync> _active = {};
final Map<String, _SyncLoop> _active = {};
StreamSubscription<List<Account>>? _accountsSub;
void start() {
_accountsSub = _accounts.observeAccounts().listen((accounts) {
final currentIds = accounts.map((a) => a.id).toSet();
// Start sync for newly added IMAP accounts only.
for (final account in accounts) {
if (account.type != AccountType.imap) continue;
if (!_active.containsKey(account.id)) {
final sync = _AccountSync(account, _accounts, _mailboxes, _emails);
_active[account.id] = sync;
sync.start();
}
if (_active.containsKey(account.id)) continue;
final loop = switch (account.type) {
AccountType.imap =>
_AccountSync(account, _accounts, _mailboxes, _emails),
AccountType.jmap =>
_JmapAccountSync(account, _mailboxes, _emails),
};
_active[account.id] = loop;
loop.start();
}
// Stop sync for removed accounts.
for (final id in _active.keys.toList()) {
if (!currentIds.contains(id)) {
_active.remove(id)?.stop();
@@ -53,7 +56,16 @@ class AccountSyncManager {
}
}
class _AccountSync {
// ── Shared interface ──────────────────────────────────────────────────────────
abstract class _SyncLoop {
void start();
void stop();
}
// ── IMAP ──────────────────────────────────────────────────────────────────────
class _AccountSync implements _SyncLoop {
_AccountSync(this.account, this._accounts, this._mailboxes, this._emails);
final Account account;
@@ -64,15 +76,15 @@ class _AccountSync {
imap.ImapClient? _idleClient;
bool _running = false;
int _backoffSeconds = 5;
// Completed by stop() to wake up _idle() immediately rather than waiting
// for the 25-minute cap or the next incoming message.
Completer<void>? _stopSignal;
@override
void start() {
_running = true;
_loop();
}
@override
void stop() {
_running = false;
if (_stopSignal != null && !_stopSignal!.isCompleted) {
@@ -118,7 +130,6 @@ class _AccountSync {
final newMessageCompleter = Completer<void>();
// Wake up when new messages arrive or messages are expunged.
final sub = client.eventBus
.on<imap.ImapEvent>()
.where(
@@ -132,7 +143,7 @@ class _AccountSync {
await client.idleStart();
// Cap IDLE at 25 minutes (RFC 2177). Also wakes up when stop() is
// Cap IDLE at 25 minutes (RFC 2177). Also wakes up when stop() is
// called or a new message / expunge event arrives.
await Future.any([
newMessageCompleter.future,
@@ -149,3 +160,82 @@ class _AccountSync {
}
}
}
// ── JMAP ──────────────────────────────────────────────────────────────────────
class _JmapAccountSync implements _SyncLoop {
_JmapAccountSync(this.account, this._mailboxes, this._emails);
final Account account;
final MailboxRepository _mailboxes;
final EmailRepository _emails;
bool _running = false;
int _backoffSeconds = 5;
Completer<void>? _stopSignal;
static const _pollInterval = Duration(seconds: 30);
@override
void start() {
_running = true;
_loop();
}
@override
void stop() {
_running = false;
if (_stopSignal != null && !_stopSignal!.isCompleted) {
_stopSignal!.complete();
}
}
Future<void> _loop() async {
while (_running) {
try {
await _sync();
_backoffSeconds = 5;
await _wait();
} catch (e, st) {
log(
'JMAP sync failed for ${account.email}, retrying in ${_backoffSeconds}s',
error: e,
stackTrace: st,
);
await _waitSeconds(_backoffSeconds);
_backoffSeconds = (_backoffSeconds * 2).clamp(5, 300);
}
}
}
Future<void> _sync() async {
await _mailboxes.syncMailboxes(account.id);
// Sync emails for each known mailbox.
final mailboxes = await _mailboxes.observeMailboxes(account.id).first;
for (final mailbox in mailboxes) {
if (!_running) break;
await _emails.syncEmails(account.id, mailbox.path);
}
}
Future<void> _wait() async {
if (!_running) return;
_stopSignal = Completer<void>();
await Future.any([
Future.delayed(_pollInterval),
_stopSignal!.future,
]);
_stopSignal = null;
}
Future<void> _waitSeconds(int seconds) async {
if (!_running) return;
_stopSignal = Completer<void>();
await Future.any([
Future.delayed(Duration(seconds: seconds)),
_stopSignal!.future,
]);
_stopSignal = null;
}
}
+88
View File
@@ -109,6 +109,41 @@ const _account = Account(
smtpHost: 'smtp.example.com',
);
const _jmapAccount = Account(
id: 'jmap-account',
displayName: 'Test JMAP',
email: 'test@example.com',
type: AccountType.jmap,
jmapUrl: 'https://jmap.example.com/.well-known/jmap',
);
class FakeMailboxRepositoryWithInbox implements MailboxRepository {
@override
Stream<List<Mailbox>> observeMailboxes(String accountId) => Stream.value([
const Mailbox(
id: 'jmap-account:mbx1',
accountId: 'jmap-account',
path: 'mbx1',
name: 'Inbox',
unreadCount: 0,
totalCount: 0,
),
]);
@override
Future<void> syncMailboxes(String accountId) async {}
}
class FailingJmapEmailRepository extends FakeEmailRepository {
int syncCount = 0;
@override
Future<void> syncEmails(String accountId, String mailboxPath) async {
syncCount++;
if (syncCount == 1) throw Exception('simulated JMAP failure');
}
}
// ── Tests ─────────────────────────────────────────────────────────────────────
void main() {
@@ -172,6 +207,59 @@ void main() {
await pumpEventQueue(times: 10);
});
group('JMAP accounts', () {
test('starts a JMAP sync loop when a JMAP account is pushed', () async {
final accounts = FakeAccountRepository();
final emails = FakeEmailRepository();
final mgr = AccountSyncManager(
accounts,
FakeMailboxRepositoryWithInbox(),
emails,
);
mgr.start();
accounts.push([_jmapAccount]);
mgr.dispose();
await pumpEventQueue();
});
test('JMAP stop() interrupts the poll wait', () {
fakeAsync((async) {
final accounts = FakeAccountRepository();
final mgr = AccountSyncManager(
accounts,
FakeMailboxRepositoryWithInbox(),
FakeEmailRepository(),
);
mgr.start();
accounts.push([_jmapAccount]);
async.flushMicrotasks();
// Dispose before the 30-second poll interval elapses.
mgr.dispose();
async.elapse(const Duration(seconds: 35));
async.flushMicrotasks();
});
});
test('JMAP backoff on sync failure', () {
fakeAsync((async) {
final accounts = FakeAccountRepository();
final mgr = AccountSyncManager(
accounts,
FakeMailboxRepositoryWithInbox(),
FailingJmapEmailRepository(),
);
mgr.start();
accounts.push([_jmapAccount]);
async.flushMicrotasks();
mgr.dispose();
async.elapse(const Duration(seconds: 10));
async.flushMicrotasks();
});
});
});
test('logs error and applies backoff when sync fails', () {
fakeAsync((async) {
final accounts = FakeAccountRepository();