diff --git a/lib/core/repositories/email_repository.dart b/lib/core/repositories/email_repository.dart index 3cdd468..0f76adc 100644 --- a/lib/core/repositories/email_repository.dart +++ b/lib/core/repositories/email_repository.dart @@ -30,4 +30,12 @@ abstract class EmailRepository { /// Sends any queued local mutations for [accountId] to the server. /// No-op for IMAP accounts (mutations are applied synchronously). Future flushPendingChanges(String accountId, String password); + + /// Returns a stream that emits once for each JMAP push event (RFC 8887 + /// `StateChange`) received from the server's EventSource URL. + /// + /// Completes immediately — emitting nothing — if the account does not + /// support push (IMAP accounts, or JMAP servers without an eventSourceUrl). + /// Callers should fall back to polling when the stream ends. + Stream watchJmapPush(String accountId, String password); } diff --git a/lib/core/sync/account_sync_manager.dart b/lib/core/sync/account_sync_manager.dart index d59cf81..09c099e 100644 --- a/lib/core/sync/account_sync_manager.dart +++ b/lib/core/sync/account_sync_manager.dart @@ -276,10 +276,24 @@ class _JmapAccountSync implements _SyncLoop { Future _wait() async { if (!_running) return; _stopSignal = Completer(); + final password = await _accounts.getPassword(account.id); + + // Try JMAP push (RFC 8887 EventSource). Falls back to poll timer when + // the server doesn't advertise an eventSourceUrl or the connection fails. + final pushReady = Completer(); + final pushSub = _emails + .watchJmapPush(account.id, password) + .listen((_) { + if (!pushReady.isCompleted) pushReady.complete(); + }, onDone: () {}, onError: (_) {}); + await Future.any([ + pushReady.future, Future.delayed(_pollInterval), _stopSignal!.future, ]); + + await pushSub.cancel(); _stopSignal = null; } diff --git a/lib/data/repositories/email_repository_impl.dart b/lib/data/repositories/email_repository_impl.dart index 5a2ddd2..ced4983 100644 --- a/lib/data/repositories/email_repository_impl.dart +++ b/lib/data/repositories/email_repository_impl.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:math' as math; @@ -581,6 +582,103 @@ class EmailRepositoryImpl implements EmailRepository { ); } + // ── JMAP push ──────────────────────────────────────────────────────────── + + @override + Stream watchJmapPush(String accountId, String password) { + final controller = StreamController(); + StreamSubscription? innerSub; + + controller.onCancel = () => innerSub?.cancel(); + + () async { + try { + final account = await _accounts.getAccount(accountId); + if (account == null || + account.type != account_model.AccountType.jmap) { + await controller.close(); + return; + } + + final jmapUrl = account.jmapUrl; + if (jmapUrl == null || jmapUrl.isEmpty) { + await controller.close(); + return; + } + + final JmapClient jmap; + try { + jmap = await JmapClient.connect( + httpClient: _httpClient, + jmapUrl: Uri.parse(jmapUrl), + username: _effectiveUsername(account), + password: password, + ); + } catch (_) { + await controller.close(); + return; + } + + final sseUrl = jmap.eventSourceUrl; + if (sseUrl == null) { + await controller.close(); + return; + } + + final credentials = base64 + .encode(utf8.encode('${_effectiveUsername(account)}:$password')); + + http.StreamedResponse response; + try { + final request = http.Request('GET', Uri.parse(sseUrl)); + request.headers['Accept'] = 'text/event-stream'; + request.headers['Authorization'] = 'Basic $credentials'; + response = await _httpClient + .send(request) + .timeout(const Duration(seconds: 10)); + if (response.statusCode != 200) { + await controller.close(); + return; + } + } catch (_) { + await controller.close(); + return; + } + + var buffer = ''; + innerSub = response.stream + .transform(utf8.decoder) + .timeout(const Duration(minutes: 25)) + .listen( + (chunk) { + buffer += chunk; + final lines = buffer.split('\n'); + buffer = lines.removeLast(); + for (final line in lines) { + if (!line.startsWith('data:')) continue; + final data = line.substring(5).trim(); + try { + final decoded = jsonDecode(data) as Map; + if (decoded['@type'] == 'StateChange') { + controller.add(null); + } + } catch (_) { + // Malformed JSON — ignore line + } + } + }, + onDone: () => controller.close(), + onError: (_) => controller.close(), + cancelOnError: true, + ); + } catch (_) { + await controller.close(); + } + }(); + + return controller.stream; + } + // ── JMAP helpers ───────────────────────────────────────────────────────── Map _responseArgs( diff --git a/test/integration/account_sync_manager_test.dart b/test/integration/account_sync_manager_test.dart index 2abe95b..d0801ce 100644 --- a/test/integration/account_sync_manager_test.dart +++ b/test/integration/account_sync_manager_test.dart @@ -95,6 +95,10 @@ class _FakeEmails implements EmailRepository { @override Future> searchEmails(String a, String m, String q) async => []; + + @override + Stream watchJmapPush(String accountId, String password) => + const Stream.empty(); } // ── Tests ───────────────────────────────────────────────────────────────────── diff --git a/test/unit/account_sync_manager_test.dart b/test/unit/account_sync_manager_test.dart index c50c39f..e39c24e 100644 --- a/test/unit/account_sync_manager_test.dart +++ b/test/unit/account_sync_manager_test.dart @@ -100,6 +100,10 @@ class FakeEmailRepository implements EmailRepository { String query, ) async => []; + + @override + Stream watchJmapPush(String accountId, String password) => + const Stream.empty(); } // ── Helpers ─────────────────────────────────────────────────────────────────── diff --git a/test/unit/email_repository_impl_test.dart b/test/unit/email_repository_impl_test.dart index 2021c97..c91cb8f 100644 --- a/test/unit/email_repository_impl_test.dart +++ b/test/unit/email_repository_impl_test.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; @@ -1635,4 +1636,114 @@ void main() { expect(em1Create['mailboxIds'], {'sentMbxJmapId': true}); }); }); + + group('JMAP watchJmapPush', () { + // A custom BaseClient that serves session JSON for well-known requests + // and an SSE stream for all other GET requests. + http.Client makeSseClient({ + String? eventSourceUrl, + Stream>? sseStream, + }) { + return _SseTestClient( + eventSourceUrl: eventSourceUrl, + sseStream: sseStream ?? const Stream.empty(), + ); + } + + test('returns empty stream when server has no eventSourceUrl', () async { + final r = _makeRepos(httpClient: makeSseClient()); + await r.accounts.addAccount(_jmapAccount, 'pw'); + + final events = await r.emails.watchJmapPush('jmap-1', 'pw').toList(); + expect(events, isEmpty); + }); + + test('yields on StateChange event', () async { + final sseController = StreamController>(); + final r = _makeRepos( + httpClient: makeSseClient( + eventSourceUrl: 'https://jmap.example.com/events/', + sseStream: sseController.stream, + ), + ); + await r.accounts.addAccount(_jmapAccount, 'pw'); + + final emitted = []; + final sub = r.emails + .watchJmapPush('jmap-1', 'pw') + .listen(emitted.add); + + // Push a StateChange event + const event = 'data: {"@type":"StateChange","changed":{}}\n\n'; + sseController.add(utf8.encode(event)); + + await Future.delayed(const Duration(milliseconds: 50)); + expect(emitted, hasLength(1)); + + await sub.cancel(); + await sseController.close(); + }); + + test('ignores non-StateChange SSE data lines', () async { + final sseController = StreamController>(); + final r = _makeRepos( + httpClient: makeSseClient( + eventSourceUrl: 'https://jmap.example.com/events/', + sseStream: sseController.stream, + ), + ); + await r.accounts.addAccount(_jmapAccount, 'pw'); + + final emitted = []; + final sub = r.emails + .watchJmapPush('jmap-1', 'pw') + .listen(emitted.add); + + const keepalive = ': keepalive\n\n'; + const other = 'data: {"@type":"Something"}\n\n'; + sseController.add(utf8.encode(keepalive + other)); + + await Future.delayed(const Duration(milliseconds: 50)); + expect(emitted, isEmpty); + + await sub.cancel(); + await sseController.close(); + }); + }); +} + +// ── SSE test helper ────────────────────────────────────────────────────────── + +class _SseTestClient extends http.BaseClient { + _SseTestClient({required this.eventSourceUrl, required this.sseStream}); + + final String? eventSourceUrl; + final Stream> sseStream; + + @override + Future send(http.BaseRequest request) async { + if (request.url.path.contains('well-known')) { + final session = jsonEncode({ + 'apiUrl': 'https://jmap.example.com/api/', + 'accounts': {'acct1': {}}, + 'primaryAccounts': { + 'urn:ietf:params:jmap:core': 'acct1', + 'urn:ietf:params:jmap:mail': 'acct1', + }, + 'capabilities': { + 'urn:ietf:params:jmap:core': {}, + 'urn:ietf:params:jmap:mail': {}, + }, + 'username': 'alice@example.com', + 'state': 'sess1', + if (eventSourceUrl != null) 'eventSourceUrl': eventSourceUrl, + }); + return http.StreamedResponse( + Stream.value(utf8.encode(session)), 200); + } + if (request.headers['Accept'] == 'text/event-stream') { + return http.StreamedResponse(sseStream, 200); + } + return http.StreamedResponse(Stream.value(utf8.encode('{}') ), 200); + } } diff --git a/test/widget/helpers.dart b/test/widget/helpers.dart index db189a5..c00f76e 100644 --- a/test/widget/helpers.dart +++ b/test/widget/helpers.dart @@ -186,6 +186,10 @@ class FakeEmailRepository implements EmailRepository { String query, ) async => _searchResults; + + @override + Stream watchJmapPush(String accountId, String password) => + const Stream.empty(); } // ---------------------------------------------------------------------------