feat: JMAP push via EventSource instead of polling

- Add watchJmapPush(accountId, password) to EmailRepository; IMAP and
  JMAP-without-push return Stream.empty() so callers fall through to polling
- EmailRepositoryImpl opens an SSE (text/event-stream) connection to the
  server's eventSourceUrl; yields void on each StateChange event; properly
  cancellable via StreamController.onCancel
- _JmapAccountSync._wait() subscribes to watchJmapPush and races it against
  the 30 s poll timer and the stop signal — whichever fires first unblocks
  the next sync cycle

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Güttler
2026-04-19 17:48:40 +02:00
co-authored by Claude Sonnet 4.6
parent 8d8dbc33db
commit 795001d268
7 changed files with 243 additions and 0 deletions
@@ -30,4 +30,12 @@ abstract class EmailRepository {
/// Sends any queued local mutations for [accountId] to the server.
/// No-op for IMAP accounts (mutations are applied synchronously).
Future<void> flushPendingChanges(String accountId, String password);
/// Returns a stream that emits once for each JMAP push event (RFC 8887
/// `StateChange`) received from the server's EventSource URL.
///
/// Completes immediately — emitting nothing — if the account does not
/// support push (IMAP accounts, or JMAP servers without an eventSourceUrl).
/// Callers should fall back to polling when the stream ends.
Stream<void> watchJmapPush(String accountId, String password);
}
+14
View File
@@ -276,10 +276,24 @@ class _JmapAccountSync implements _SyncLoop {
Future<void> _wait() async {
if (!_running) return;
_stopSignal = Completer<void>();
final password = await _accounts.getPassword(account.id);
// Try JMAP push (RFC 8887 EventSource). Falls back to poll timer when
// the server doesn't advertise an eventSourceUrl or the connection fails.
final pushReady = Completer<void>();
final pushSub = _emails
.watchJmapPush(account.id, password)
.listen((_) {
if (!pushReady.isCompleted) pushReady.complete();
}, onDone: () {}, onError: (_) {});
await Future.any([
pushReady.future,
Future.delayed(_pollInterval),
_stopSignal!.future,
]);
await pushSub.cancel();
_stopSignal = null;
}
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math' as math;
@@ -581,6 +582,103 @@ class EmailRepositoryImpl implements EmailRepository {
);
}
// ── JMAP push ────────────────────────────────────────────────────────────
@override
Stream<void> watchJmapPush(String accountId, String password) {
final controller = StreamController<void>();
StreamSubscription<String>? innerSub;
controller.onCancel = () => innerSub?.cancel();
() async {
try {
final account = await _accounts.getAccount(accountId);
if (account == null ||
account.type != account_model.AccountType.jmap) {
await controller.close();
return;
}
final jmapUrl = account.jmapUrl;
if (jmapUrl == null || jmapUrl.isEmpty) {
await controller.close();
return;
}
final JmapClient jmap;
try {
jmap = await JmapClient.connect(
httpClient: _httpClient,
jmapUrl: Uri.parse(jmapUrl),
username: _effectiveUsername(account),
password: password,
);
} catch (_) {
await controller.close();
return;
}
final sseUrl = jmap.eventSourceUrl;
if (sseUrl == null) {
await controller.close();
return;
}
final credentials = base64
.encode(utf8.encode('${_effectiveUsername(account)}:$password'));
http.StreamedResponse response;
try {
final request = http.Request('GET', Uri.parse(sseUrl));
request.headers['Accept'] = 'text/event-stream';
request.headers['Authorization'] = 'Basic $credentials';
response = await _httpClient
.send(request)
.timeout(const Duration(seconds: 10));
if (response.statusCode != 200) {
await controller.close();
return;
}
} catch (_) {
await controller.close();
return;
}
var buffer = '';
innerSub = response.stream
.transform(utf8.decoder)
.timeout(const Duration(minutes: 25))
.listen(
(chunk) {
buffer += chunk;
final lines = buffer.split('\n');
buffer = lines.removeLast();
for (final line in lines) {
if (!line.startsWith('data:')) continue;
final data = line.substring(5).trim();
try {
final decoded = jsonDecode(data) as Map<String, dynamic>;
if (decoded['@type'] == 'StateChange') {
controller.add(null);
}
} catch (_) {
// Malformed JSON — ignore line
}
}
},
onDone: () => controller.close(),
onError: (_) => controller.close(),
cancelOnError: true,
);
} catch (_) {
await controller.close();
}
}();
return controller.stream;
}
// ── JMAP helpers ─────────────────────────────────────────────────────────
Map<String, dynamic> _responseArgs(
@@ -95,6 +95,10 @@ class _FakeEmails implements EmailRepository {
@override
Future<List<Email>> searchEmails(String a, String m, String q) async => [];
@override
Stream<void> watchJmapPush(String accountId, String password) =>
const Stream.empty();
}
// ── Tests ─────────────────────────────────────────────────────────────────────
+4
View File
@@ -100,6 +100,10 @@ class FakeEmailRepository implements EmailRepository {
String query,
) async =>
[];
@override
Stream<void> watchJmapPush(String accountId, String password) =>
const Stream.empty();
}
// ── Helpers ───────────────────────────────────────────────────────────────────
+111
View File
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -1635,4 +1636,114 @@ void main() {
expect(em1Create['mailboxIds'], {'sentMbxJmapId': true});
});
});
group('JMAP watchJmapPush', () {
// A custom BaseClient that serves session JSON for well-known requests
// and an SSE stream for all other GET requests.
http.Client makeSseClient({
String? eventSourceUrl,
Stream<List<int>>? sseStream,
}) {
return _SseTestClient(
eventSourceUrl: eventSourceUrl,
sseStream: sseStream ?? const Stream.empty(),
);
}
test('returns empty stream when server has no eventSourceUrl', () async {
final r = _makeRepos(httpClient: makeSseClient());
await r.accounts.addAccount(_jmapAccount, 'pw');
final events = await r.emails.watchJmapPush('jmap-1', 'pw').toList();
expect(events, isEmpty);
});
test('yields on StateChange event', () async {
final sseController = StreamController<List<int>>();
final r = _makeRepos(
httpClient: makeSseClient(
eventSourceUrl: 'https://jmap.example.com/events/',
sseStream: sseController.stream,
),
);
await r.accounts.addAccount(_jmapAccount, 'pw');
final emitted = <void>[];
final sub = r.emails
.watchJmapPush('jmap-1', 'pw')
.listen(emitted.add);
// Push a StateChange event
const event = 'data: {"@type":"StateChange","changed":{}}\n\n';
sseController.add(utf8.encode(event));
await Future.delayed(const Duration(milliseconds: 50));
expect(emitted, hasLength(1));
await sub.cancel();
await sseController.close();
});
test('ignores non-StateChange SSE data lines', () async {
final sseController = StreamController<List<int>>();
final r = _makeRepos(
httpClient: makeSseClient(
eventSourceUrl: 'https://jmap.example.com/events/',
sseStream: sseController.stream,
),
);
await r.accounts.addAccount(_jmapAccount, 'pw');
final emitted = <void>[];
final sub = r.emails
.watchJmapPush('jmap-1', 'pw')
.listen(emitted.add);
const keepalive = ': keepalive\n\n';
const other = 'data: {"@type":"Something"}\n\n';
sseController.add(utf8.encode(keepalive + other));
await Future.delayed(const Duration(milliseconds: 50));
expect(emitted, isEmpty);
await sub.cancel();
await sseController.close();
});
});
}
// ── SSE test helper ──────────────────────────────────────────────────────────
class _SseTestClient extends http.BaseClient {
_SseTestClient({required this.eventSourceUrl, required this.sseStream});
final String? eventSourceUrl;
final Stream<List<int>> sseStream;
@override
Future<http.StreamedResponse> send(http.BaseRequest request) async {
if (request.url.path.contains('well-known')) {
final session = jsonEncode({
'apiUrl': 'https://jmap.example.com/api/',
'accounts': {'acct1': {}},
'primaryAccounts': {
'urn:ietf:params:jmap:core': 'acct1',
'urn:ietf:params:jmap:mail': 'acct1',
},
'capabilities': {
'urn:ietf:params:jmap:core': {},
'urn:ietf:params:jmap:mail': {},
},
'username': 'alice@example.com',
'state': 'sess1',
if (eventSourceUrl != null) 'eventSourceUrl': eventSourceUrl,
});
return http.StreamedResponse(
Stream.value(utf8.encode(session)), 200);
}
if (request.headers['Accept'] == 'text/event-stream') {
return http.StreamedResponse(sseStream, 200);
}
return http.StreamedResponse(Stream.value(utf8.encode('{}') ), 200);
}
}
+4
View File
@@ -186,6 +186,10 @@ class FakeEmailRepository implements EmailRepository {
String query,
) async =>
_searchResults;
@override
Stream<void> watchJmapPush(String accountId, String password) =>
const Stream.empty();
}
// ---------------------------------------------------------------------------