feat: JMAP Email sync — full and incremental (Step 5)

EmailRepositoryImpl.syncEmails now dispatches on account type.
For JMAP accounts:
- First run: Email/query (filtered by mailbox, limit 500) + Email/get
  via back-reference → upsert emails, persist state.
- Subsequent runs: Email/changes → fetch new/updated via Email/get,
  delete destroyed rows, update state in sync_state.

Maps JMAP keywords ($seen, $flagged), mailboxIds, addresses, and
hasAttachment to the existing Emails table. uid stored as 0 for
JMAP emails (unused; JMAP operations go through Email/set in Step 7).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas Güttler
2026-04-19 16:19:03 +02:00
co-authored by Claude Sonnet 4.6
parent f580cd0197
commit 2efeba9d2e
3 changed files with 406 additions and 6 deletions
+1 -1
View File
@@ -84,7 +84,7 @@ Implement `syncMailboxes(accountId)` for JMAP:
Reuse the existing `Mailboxes` table. No new DB columns needed.
### Step 5 — JMAP Email sync `[ ]`
### Step 5 — JMAP Email sync `[x]`
Implement `syncEmails(accountId, mailboxId)` for JMAP:
@@ -6,12 +6,15 @@ import 'package:enough_mail/enough_mail.dart' as imap;
import 'package:path/path.dart' as p;
import 'package:path_provider/path_provider.dart';
import 'package:http/http.dart' as http;
import '../../core/models/account.dart' as account_model;
import '../../core/models/email.dart' as model;
import '../../core/repositories/account_repository.dart';
import '../../core/repositories/email_repository.dart';
import '../db/database.dart';
import '../imap/imap_client_factory.dart';
import '../jmap/jmap_client.dart';
typedef ImapConnectFn = Future<imap.ImapClient> Function(
account_model.Account account, String username, String password);
@@ -26,15 +29,18 @@ class EmailRepositoryImpl implements EmailRepository {
ImapConnectFn imapConnect = connectImap,
SmtpConnectFn smtpConnect = connectSmtp,
GetCacheDirFn getCacheDir = getTemporaryDirectory,
http.Client? httpClient,
}) : _imapConnect = imapConnect,
_smtpConnect = smtpConnect,
_getCacheDir = getCacheDir;
_getCacheDir = getCacheDir,
_httpClient = httpClient ?? http.Client();
final AppDatabase _db;
final AccountRepository _accounts;
final ImapConnectFn _imapConnect;
final SmtpConnectFn _smtpConnect;
final GetCacheDirFn _getCacheDir;
final http.Client _httpClient;
String _effectiveUsername(account_model.Account account) =>
account.username.isNotEmpty ? account.username : account.email;
@@ -126,7 +132,21 @@ class EmailRepositoryImpl implements EmailRepository {
Future<void> syncEmails(String accountId, String mailboxPath) async {
final account = (await _accounts.getAccount(accountId))!;
final password = await _accounts.getPassword(accountId);
final client = await _imapConnect(account, _effectiveUsername(account), password);
switch (account.type) {
case account_model.AccountType.imap:
await _syncEmailsImap(account, password, mailboxPath);
case account_model.AccountType.jmap:
await _syncEmailsJmap(account, password, mailboxPath);
}
}
Future<void> _syncEmailsImap(
account_model.Account account,
String password,
String mailboxPath,
) async {
final client =
await _imapConnect(account, _effectiveUsername(account), password);
try {
await client.selectMailboxByPath(mailboxPath);
final fetch = await client.fetchMessages(
@@ -138,12 +158,12 @@ class EmailRepositoryImpl implements EmailRepository {
if (envelope == null) continue;
final uid = msg.uid;
if (uid == null) continue;
final emailId = '$accountId:$uid';
final emailId = '${account.id}:$uid';
await _db.into(_db.emails).insertOnConflictUpdate(
EmailsCompanion.insert(
id: emailId,
accountId: accountId,
accountId: account.id,
mailboxPath: mailboxPath,
uid: uid,
subject: Value(envelope.subject),
@@ -163,6 +183,203 @@ class EmailRepositoryImpl implements EmailRepository {
}
}
// ── JMAP email sync ────────────────────────────────────────────────────────
static const _emailProperties = [
'id', 'mailboxIds', 'subject', 'sentAt', 'receivedAt',
'from', 'to', 'cc', 'keywords', 'hasAttachment', 'preview',
];
Future<void> _syncEmailsJmap(
account_model.Account account,
String password,
String mailboxJmapId,
) async {
final jmapUrl = account.jmapUrl;
if (jmapUrl == null || jmapUrl.isEmpty) {
throw Exception('JMAP account ${account.id} has no jmapUrl');
}
final jmap = await JmapClient.connect(
httpClient: _httpClient,
jmapUrl: Uri.parse(jmapUrl),
username: _effectiveUsername(account),
password: password,
);
final storedState = await _loadSyncState(account.id, 'Email');
if (storedState == null) {
await _jmapFullEmailSync(account.id, jmap, mailboxJmapId);
} else {
await _jmapIncrementalEmailSync(account.id, jmap, storedState);
}
}
Future<void> _jmapFullEmailSync(
String accountId, JmapClient jmap, String mailboxJmapId) async {
// Query IDs in this mailbox, newest first, up to 500.
final responses = await jmap.call([
[
'Email/query',
{
'accountId': jmap.accountId,
'filter': {'inMailbox': mailboxJmapId},
'sort': [{'property': 'receivedAt', 'isAscending': false}],
'limit': 500,
},
'0',
],
[
'Email/get',
{
'accountId': jmap.accountId,
'#ids': {'resultOf': '0', 'name': 'Email/query', 'path': '/ids'},
'properties': _emailProperties,
},
'1',
],
]);
final getResult = _responseArgs(responses, 1, 'Email/get');
final emails = getResult['list'] as List<dynamic>;
final newState = getResult['state'] as String;
await _upsertJmapEmails(accountId, emails);
await _saveSyncState(accountId, 'Email', newState);
}
Future<void> _jmapIncrementalEmailSync(
String accountId, JmapClient jmap, String sinceState) async {
final responses = await jmap.call([
[
'Email/changes',
{'accountId': jmap.accountId, 'sinceState': sinceState},
'0',
]
]);
final changes = _responseArgs(responses, 0, 'Email/changes');
final newState = changes['newState'] as String;
final created = List<String>.from(changes['created'] as List? ?? []);
final updated = List<String>.from(changes['updated'] as List? ?? []);
final destroyed = List<String>.from(changes['destroyed'] as List? ?? []);
final toFetch = [...created, ...updated];
if (toFetch.isNotEmpty) {
final getResponses = await jmap.call([
[
'Email/get',
{
'accountId': jmap.accountId,
'ids': toFetch,
'properties': _emailProperties,
},
'1',
]
]);
final getResult = _responseArgs(getResponses, 0, 'Email/get');
await _upsertJmapEmails(accountId, getResult['list'] as List<dynamic>);
}
for (final jmapId in destroyed) {
await (_db.delete(_db.emails)
..where((t) => t.id.equals('$accountId:$jmapId')))
.go();
}
await _saveSyncState(accountId, 'Email', newState);
}
Future<void> _upsertJmapEmails(
String accountId, List<dynamic> emails) async {
for (final e in emails) {
final m = e as Map<String, dynamic>;
final jmapId = m['id'] as String;
final dbId = '$accountId:$jmapId';
// Use first mailbox ID as the primary mailboxPath.
final mailboxIds = m['mailboxIds'] as Map<String, dynamic>?;
final mailboxPath = mailboxIds?.keys.firstOrNull ?? '';
final keywords = m['keywords'] as Map<String, dynamic>? ?? {};
final from = _encodeJmapAddresses(m['from']);
final to = _encodeJmapAddresses(m['to']);
final cc = _encodeJmapAddresses(m['cc']);
final sentAt = _parseDate(m['sentAt'] as String?);
final receivedAt = _parseDate(m['receivedAt'] as String?) ?? DateTime.now();
await _db.into(_db.emails).insertOnConflictUpdate(
EmailsCompanion.insert(
id: dbId,
accountId: accountId,
mailboxPath: mailboxPath,
uid: 0, // not used for JMAP accounts
subject: Value(m['subject'] as String?),
sentAt: Value(sentAt),
receivedAt: receivedAt,
fromJson: Value(from),
toAddresses: Value(to),
ccJson: Value(cc),
preview: Value(m['preview'] as String?),
isSeen: Value(keywords.containsKey(r'$seen')),
isFlagged: Value(keywords.containsKey(r'$flagged')),
hasAttachment: Value((m['hasAttachment'] as bool?) ?? false),
),
);
}
}
// ── sync_state helpers ────────────────────────────────────────────────────
Future<String?> _loadSyncState(String accountId, String resourceType) async {
final row = await (_db.select(_db.syncStates)
..where((t) =>
t.accountId.equals(accountId) &
t.resourceType.equals(resourceType)))
.getSingleOrNull();
return row?.state;
}
Future<void> _saveSyncState(
String accountId, String resourceType, String state) async {
await _db.into(_db.syncStates).insertOnConflictUpdate(
SyncStatesCompanion.insert(
accountId: accountId,
resourceType: resourceType,
state: state,
syncedAt: DateTime.now(),
),
);
}
// ── JMAP helpers ─────────────────────────────────────────────────────────
Map<String, dynamic> _responseArgs(
List<dynamic> responses, int index, String expectedMethod) {
final triple = responses[index] as List<dynamic>;
final method = triple[0] as String;
if (method == 'error') {
final err = triple[1] as Map<String, dynamic>;
throw JmapException('$expectedMethod error: ${err['type']}');
}
return triple[1] as Map<String, dynamic>;
}
String _encodeJmapAddresses(dynamic addressList) {
if (addressList == null) return '[]';
final list = addressList as List<dynamic>;
return jsonEncode(list
.map((a) => {
'name': (a as Map<String, dynamic>)['name'],
'email': a['email'],
})
.toList());
}
DateTime? _parseDate(String? iso) =>
iso == null ? null : DateTime.tryParse(iso);
// ── Mutations ──────────────────────────────────────────────────────────────
@override
+184 -1
View File
@@ -4,6 +4,8 @@ import 'dart:io';
import 'package:drift/drift.dart' show Value;
import 'package:enough_mail/enough_mail.dart' as imap;
import 'package:flutter_test/flutter_test.dart';
import 'package:http/http.dart' as http;
import 'package:http/testing.dart';
import 'package:sharedinbox/core/models/account.dart';
import 'package:sharedinbox/core/models/email.dart';
@@ -25,6 +27,104 @@ const _account = Account(
smtpHost: 'smtp.example.com',
);
const _jmapAccount = Account(
id: 'jmap-1',
displayName: 'Alice',
email: 'alice@example.com',
type: AccountType.jmap,
jmapUrl: 'https://jmap.example.com/.well-known/jmap',
);
http.Client _mockJmapEmails({required List<Map<String, dynamic>> apiResponses}) {
var callIndex = 0;
return MockClient((req) async {
if (req.url.path.contains('well-known')) {
return http.Response(
jsonEncode({
'apiUrl': 'https://jmap.example.com/api/',
'accounts': {'acct1': {'name': 'alice@example.com', 'isPersonal': true}},
'primaryAccounts': {
'urn:ietf:params:jmap:core': 'acct1',
'urn:ietf:params:jmap:mail': 'acct1',
},
'capabilities': {},
'username': 'alice@example.com',
'state': 'sess1',
}),
200,
);
}
final resp = apiResponses[callIndex % apiResponses.length];
callIndex++;
return http.Response(jsonEncode(resp), 200);
});
}
Map<String, dynamic> _emailGetResponse(
{required String state, required List<Map<String, dynamic>> list}) =>
{
'sessionState': 'sess1',
'methodResponses': [
['Email/query', {'accountId': 'acct1', 'ids': list.map((e) => e['id']).toList()}, '0'],
['Email/get', {'accountId': 'acct1', 'state': state, 'list': list}, '1'],
],
};
Map<String, dynamic> _emailChangesResponse({
required String oldState,
required String newState,
List<String> created = const [],
List<String> updated = const [],
List<String> destroyed = const [],
}) =>
{
'sessionState': 'sess1',
'methodResponses': [
[
'Email/changes',
{
'accountId': 'acct1',
'oldState': oldState,
'newState': newState,
'hasMoreChanges': false,
'created': created,
'updated': updated,
'destroyed': destroyed,
},
'0',
],
],
};
Map<String, dynamic> _emailGetOnly(
{required String state, required List<Map<String, dynamic>> list}) =>
{
'sessionState': 'sess1',
'methodResponses': [
['Email/get', {'accountId': 'acct1', 'state': state, 'list': list}, '1'],
],
};
Map<String, dynamic> _jmapEmail({
required String id,
required String mailboxId,
String subject = 'Hello',
bool seen = false,
}) =>
{
'id': id,
'mailboxIds': {mailboxId: true},
'subject': subject,
'sentAt': '2024-01-01T10:00:00Z',
'receivedAt': '2024-01-01T10:00:01Z',
'from': [{'name': 'Sender', 'email': 'sender@example.com'}],
'to': [{'name': 'Alice', 'email': 'alice@example.com'}],
'cc': [],
'keywords': seen ? {r'$seen': true} : <String, dynamic>{},
'hasAttachment': false,
'preview': 'Hello world',
};
Future<imap.ImapClient> _noImapConnect(Account a, String u, String p) =>
Future.error(UnsupportedError('IMAP unavailable in unit tests'));
@@ -35,7 +135,7 @@ Future<imap.SmtpClient> _noSmtpConnect(Account a, String u, String p) =>
AppDatabase db,
AccountRepositoryImpl accounts,
EmailRepositoryImpl emails,
}) _makeRepos() {
}) _makeRepos({http.Client? httpClient}) {
final db = openTestDatabase();
final storage = MapSecureStorage();
final accounts = AccountRepositoryImpl(db, storage);
@@ -44,6 +144,7 @@ Future<imap.SmtpClient> _noSmtpConnect(Account a, String u, String p) =>
accounts,
imapConnect: _noImapConnect,
smtpConnect: _noSmtpConnect,
httpClient: httpClient,
);
return (db: db, accounts: accounts, emails: emails);
}
@@ -564,4 +665,86 @@ void main() {
expect(r.fakeImap.logoutCalled, isFalse);
});
});
group('JMAP syncEmails', () {
test('full sync upserts emails and persists state', () async {
final r = _makeRepos(
httpClient: _mockJmapEmails(apiResponses: [
_emailGetResponse(state: 'est1', list: [
_jmapEmail(id: 'e1', mailboxId: 'mbx1', subject: 'First'),
_jmapEmail(id: 'e2', mailboxId: 'mbx1', subject: 'Second', seen: true),
]),
]),
);
await r.accounts.addAccount(_jmapAccount, 'pw');
await r.emails.syncEmails('jmap-1', 'mbx1');
final emails = await r.emails.observeEmails('jmap-1', 'mbx1').first;
expect(emails, hasLength(2));
expect(emails.map((e) => e.subject).toSet(), {'First', 'Second'});
expect(emails.firstWhere((e) => e.subject == 'Second').isSeen, isTrue);
final states = await r.db.select(r.db.syncStates).get();
expect(states, hasLength(1));
expect(states.first.state, 'est1');
});
test('incremental sync applies created, updated, destroyed', () async {
final r = _makeRepos(
httpClient: _mockJmapEmails(apiResponses: [
// Call 1: Email/changes
_emailChangesResponse(
oldState: 'est1', newState: 'est2',
created: ['e3'], updated: ['e1'], destroyed: ['e2'],
),
// Call 2: Email/get for created + updated
_emailGetOnly(state: 'est2', list: [
_jmapEmail(id: 'e1', mailboxId: 'mbx1', subject: 'First updated'),
_jmapEmail(id: 'e3', mailboxId: 'mbx1', subject: 'Third'),
]),
]),
);
await r.accounts.addAccount(_jmapAccount, 'pw');
// Pre-populate
await r.db.into(r.db.emails).insertOnConflictUpdate(EmailsCompanion.insert(
id: 'jmap-1:e1', accountId: 'jmap-1', mailboxPath: 'mbx1', uid: 0,
subject: const Value('First'), receivedAt: DateTime(2024),
));
await r.db.into(r.db.emails).insertOnConflictUpdate(EmailsCompanion.insert(
id: 'jmap-1:e2', accountId: 'jmap-1', mailboxPath: 'mbx1', uid: 0,
subject: const Value('Second'), receivedAt: DateTime(2024),
));
await r.db.into(r.db.syncStates).insertOnConflictUpdate(SyncStatesCompanion.insert(
accountId: 'jmap-1', resourceType: 'Email',
state: 'est1', syncedAt: DateTime.now(),
));
await r.emails.syncEmails('jmap-1', 'mbx1');
final emails = await r.emails.observeEmails('jmap-1', 'mbx1').first;
expect(emails.map((e) => e.subject).toSet(), {'First updated', 'Third'});
final states = await r.db.select(r.db.syncStates).get();
expect(states.first.state, 'est2');
});
test('incremental sync with no changes updates state only', () async {
final r = _makeRepos(
httpClient: _mockJmapEmails(apiResponses: [
_emailChangesResponse(oldState: 'est1', newState: 'est1'),
]),
);
await r.accounts.addAccount(_jmapAccount, 'pw');
await r.db.into(r.db.syncStates).insertOnConflictUpdate(SyncStatesCompanion.insert(
accountId: 'jmap-1', resourceType: 'Email',
state: 'est1', syncedAt: DateTime.now(),
));
await r.emails.syncEmails('jmap-1', 'mbx1');
final states = await r.db.select(r.db.syncStates).get();
expect(states.first.state, 'est1');
});
});
}