Optimize deployment, fix E2E flakiness, and implement database-backed threading
- Optimize task deploy-android with marker files and source/generate tracking. - Fix flaky Android E2E test with pumpAndSettle and safety delays. - Implement global CrashScreen and error handlers in main.dart. - Refactor threading to use a persistent Threads table for performance. - Add database indexes and migration for schema v18. - Enhance coverage gate with ghost path checks and increased coverage (82%).
This commit is contained in:
@@ -76,55 +76,110 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
String accountId,
|
||||
String mailboxPath,
|
||||
) {
|
||||
return observeEmails(accountId, mailboxPath).map(_groupIntoThreads);
|
||||
return (_db.select(_db.threads)
|
||||
..where(
|
||||
(t) =>
|
||||
t.accountId.equals(accountId) &
|
||||
t.mailboxPath.equals(mailboxPath),
|
||||
)
|
||||
..orderBy([(t) => OrderingTerm.desc(t.latestDate)]))
|
||||
.watch()
|
||||
.map((rows) => rows.map(_threadRowToModel).toList());
|
||||
}
|
||||
|
||||
static List<model.EmailThread> _groupIntoThreads(List<model.Email> emails) {
|
||||
// Group emails by threadId, falling back to email id for unthreaded mail.
|
||||
final groups = <String, List<model.Email>>{};
|
||||
for (final email in emails) {
|
||||
final key = email.threadId ?? email.id;
|
||||
groups.putIfAbsent(key, () => []).add(email);
|
||||
model.EmailThread _threadRowToModel(ThreadRow row) {
|
||||
List<model.EmailAddress> parseAddresses(String json) {
|
||||
final list = jsonDecode(json) as List<dynamic>;
|
||||
return list
|
||||
.map(
|
||||
(e) => model.EmailAddress(
|
||||
name: (e as Map<String, dynamic>)['name'] as String?,
|
||||
email: e['email'] as String,
|
||||
),
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
|
||||
final threads = groups.values.map((threadEmails) {
|
||||
// Sort within thread oldest-first so latest is last.
|
||||
threadEmails.sort((a, b) {
|
||||
final da = a.sentAt ?? a.receivedAt;
|
||||
final db = b.sentAt ?? b.receivedAt;
|
||||
return da.compareTo(db);
|
||||
});
|
||||
return model.EmailThread(
|
||||
threadId: row.id,
|
||||
accountId: row.accountId,
|
||||
mailboxPath: row.mailboxPath,
|
||||
subject: row.subject,
|
||||
latestDate: row.latestDate,
|
||||
messageCount: row.messageCount,
|
||||
hasUnread: row.hasUnread,
|
||||
isFlagged: row.isFlagged,
|
||||
participants: parseAddresses(row.participantsJson),
|
||||
preview: row.preview,
|
||||
latestEmailId: row.latestEmailId,
|
||||
emailIds: List<String>.from(jsonDecode(row.emailIdsJson) as List),
|
||||
);
|
||||
}
|
||||
|
||||
final latest = threadEmails.last;
|
||||
/// Recalculates and updates the [Threads] table for [threadId].
|
||||
/// Called after any change to the [Emails] table.
|
||||
Future<void> _updateThread(
|
||||
String accountId,
|
||||
String mailboxPath,
|
||||
String threadId,
|
||||
) async {
|
||||
final threadEmails = await (_db.select(_db.emails)
|
||||
..where(
|
||||
(t) =>
|
||||
t.accountId.equals(accountId) &
|
||||
t.mailboxPath.equals(mailboxPath) &
|
||||
t.threadId.equals(threadId),
|
||||
)
|
||||
..orderBy([
|
||||
(t) => OrderingTerm.asc(t.sentAt),
|
||||
(t) => OrderingTerm.asc(t.receivedAt),
|
||||
]))
|
||||
.get();
|
||||
|
||||
// Collect unique participants across the whole thread.
|
||||
final seen = <String>{};
|
||||
final participants = <model.EmailAddress>[];
|
||||
for (final e in threadEmails) {
|
||||
for (final a in e.from) {
|
||||
if (seen.add(a.email)) participants.add(a);
|
||||
if (threadEmails.isEmpty) {
|
||||
await (_db.delete(_db.threads)
|
||||
..where(
|
||||
(t) =>
|
||||
t.accountId.equals(accountId) &
|
||||
t.mailboxPath.equals(mailboxPath) &
|
||||
t.id.equals(threadId),
|
||||
))
|
||||
.go();
|
||||
return;
|
||||
}
|
||||
|
||||
final latest = threadEmails.last;
|
||||
|
||||
// Collect unique participants across the whole thread.
|
||||
final seen = <String>{};
|
||||
final participants = <Map<String, dynamic>>[];
|
||||
for (final e in threadEmails) {
|
||||
final from = jsonDecode(e.fromJson) as List<dynamic>;
|
||||
for (final a in from.cast<Map<String, dynamic>>()) {
|
||||
final email = a['email'] as String;
|
||||
if (seen.add(email)) {
|
||||
participants.add({'name': a['name'], 'email': email});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return model.EmailThread(
|
||||
threadId: latest.threadId ?? latest.id,
|
||||
subject: latest.subject,
|
||||
participants: participants,
|
||||
latestDate: latest.sentAt ?? latest.receivedAt,
|
||||
messageCount: threadEmails.length,
|
||||
hasUnread: threadEmails.any((e) => !e.isSeen),
|
||||
isFlagged: threadEmails.any((e) => e.isFlagged),
|
||||
latestEmailId: latest.id,
|
||||
preview: latest.preview,
|
||||
emailIds: threadEmails.map((e) => e.id).toList(),
|
||||
accountId: latest.accountId,
|
||||
mailboxPath: latest.mailboxPath,
|
||||
);
|
||||
}).toList();
|
||||
|
||||
// Sort threads by latest message descending.
|
||||
threads.sort((a, b) => b.latestDate.compareTo(a.latestDate));
|
||||
return threads;
|
||||
await _db.into(_db.threads).insertOnConflictUpdate(
|
||||
ThreadsCompanion.insert(
|
||||
id: threadId,
|
||||
accountId: accountId,
|
||||
mailboxPath: mailboxPath,
|
||||
subject: Value(latest.subject),
|
||||
latestDate: latest.sentAt ?? latest.receivedAt,
|
||||
messageCount: Value(threadEmails.length),
|
||||
hasUnread: Value(threadEmails.any((e) => !e.isSeen)),
|
||||
isFlagged: Value(threadEmails.any((e) => e.isFlagged)),
|
||||
participantsJson: Value(jsonEncode(participants)),
|
||||
preview: Value(latest.preview),
|
||||
latestEmailId: latest.id,
|
||||
emailIdsJson:
|
||||
Value(jsonEncode(threadEmails.map((e) => e.id).toList())),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
@@ -448,6 +503,7 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
final pendingByUid =
|
||||
await _pendingDeleteOrMoveUids(account.id, mailboxPath);
|
||||
var bytes = 0;
|
||||
final affectedThreads = <String>{};
|
||||
await _db.transaction(() async {
|
||||
for (final msg in fetch.messages) {
|
||||
final envelope = msg.envelope;
|
||||
@@ -478,11 +534,13 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
final inReplyTo = envelope.inReplyTo?.trim();
|
||||
final refs = msg.getHeaderValue('References')?.trim();
|
||||
final threadId = _computeThreadId(
|
||||
emailId: emailId,
|
||||
messageId: msgId,
|
||||
inReplyTo: inReplyTo,
|
||||
references: refs,
|
||||
);
|
||||
emailId: emailId,
|
||||
messageId: msgId,
|
||||
inReplyTo: inReplyTo,
|
||||
references: refs,
|
||||
) ??
|
||||
emailId;
|
||||
affectedThreads.add(threadId);
|
||||
await _db.into(_db.emails).insertOnConflictUpdate(
|
||||
EmailsCompanion.insert(
|
||||
id: emailId,
|
||||
@@ -506,6 +564,9 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
);
|
||||
}
|
||||
});
|
||||
for (final tid in affectedThreads) {
|
||||
await _updateThread(account.id, mailboxPath, tid);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@@ -587,11 +648,16 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
}
|
||||
|
||||
final serverUidSet = serverUids.toSet();
|
||||
final affectedThreads = <String>{};
|
||||
for (final row in localRows) {
|
||||
if (!serverUidSet.contains(row.uid)) {
|
||||
affectedThreads.add(row.threadId ?? row.id);
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(row.id))).go();
|
||||
}
|
||||
}
|
||||
for (final tid in affectedThreads) {
|
||||
await _updateThread(accountId, mailboxPath, tid);
|
||||
}
|
||||
}
|
||||
|
||||
// ── JMAP email sync ────────────────────────────────────────────────────────
|
||||
@@ -757,9 +823,14 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
}
|
||||
|
||||
for (final jmapId in destroyed) {
|
||||
await (_db.delete(_db.emails)
|
||||
..where((t) => t.id.equals('$accountId:$jmapId')))
|
||||
.go();
|
||||
final dbId = '$accountId:$jmapId';
|
||||
final email = await getEmail(dbId);
|
||||
if (email != null) {
|
||||
final tid = email.threadId ?? dbId;
|
||||
final mailbox = email.mailboxPath;
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(dbId))).go();
|
||||
await _updateThread(accountId, mailbox, tid);
|
||||
}
|
||||
}
|
||||
|
||||
await _saveSyncState(accountId, 'Email', newState);
|
||||
@@ -773,6 +844,7 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
// Returns total bytes transferred (sum of JMAP `size` fields).
|
||||
Future<int> _upsertJmapEmails(String accountId, List<dynamic> emails) async {
|
||||
var bytes = 0;
|
||||
final affectedByMailbox = <String, Set<String>>{};
|
||||
for (final e in emails) {
|
||||
final m = e as Map<String, dynamic>;
|
||||
final jmapId = m['id'] as String;
|
||||
@@ -791,7 +863,9 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
final receivedAt =
|
||||
_parseDate(m['receivedAt'] as String?) ?? DateTime.now();
|
||||
|
||||
final jmapThreadId = m['threadId'] as String?;
|
||||
final jmapThreadId = m['threadId'] as String? ?? dbId;
|
||||
affectedByMailbox.putIfAbsent(mailboxPath, () => {}).add(jmapThreadId);
|
||||
|
||||
// JMAP messageId/inReplyTo/references are arrays; join to space-separated.
|
||||
final jmapMessageId =
|
||||
_joinJmapStringList(m['messageId'] as List<dynamic>?);
|
||||
@@ -837,6 +911,12 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
for (final mailboxPath in affectedByMailbox.keys) {
|
||||
for (final tid in affectedByMailbox[mailboxPath]!) {
|
||||
await _updateThread(accountId, mailboxPath, tid);
|
||||
}
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
@@ -1104,6 +1184,11 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
isFlagged: flagged != null ? Value(flagged) : const Value.absent(),
|
||||
),
|
||||
);
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1135,6 +1220,11 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
isFlagged: flagged != null ? Value(flagged) : const Value.absent(),
|
||||
),
|
||||
);
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
@@ -1156,6 +1246,16 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
await (_db.update(_db.emails)..where((t) => t.id.equals(emailId))).write(
|
||||
EmailsCompanion(mailboxPath: Value(destMailboxPath)),
|
||||
);
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
destMailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1170,6 +1270,12 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
}),
|
||||
);
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
// Destination will be updated when synced (IMAP move is a delete + copy).
|
||||
}
|
||||
|
||||
@override
|
||||
@@ -1200,6 +1306,11 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
jsonEncode(<String, dynamic>{}),
|
||||
);
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1210,6 +1321,11 @@ class EmailRepositoryImpl implements EmailRepository {
|
||||
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath}),
|
||||
);
|
||||
await (_db.delete(_db.emails)..where((t) => t.id.equals(emailId))).go();
|
||||
await _updateThread(
|
||||
row.accountId,
|
||||
row.mailboxPath,
|
||||
row.threadId ?? emailId,
|
||||
);
|
||||
}
|
||||
|
||||
// ── pending_changes queue ──────────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user