feat: apply local Sieve rules after sync (#119)

- Add LocalSieveApplied table (schema v32) keyed by (accountId, messageId)
  so each email is processed by Sieve at most once, even across restarts.
- Implement EmailRepository.applySieveRules(): loads the active local Sieve
  script, runs the interpreter against new INBOX emails, and queues pending
  move/delete/flag_seen changes for any matched rules.
- Wire applySieveRules() into both _AccountSync._sync() and
  _JmapAccountSync._sync() after the per-mailbox email sync loop.
- Make _flushPendingChangesImap() treat NONEXISTENT / not-found errors as
  silent no-ops (counts as flushed) so a second device racing on the same
  email does not accumulate retries.
- Add migration test assertions and a dedicated unit test suite covering
  rule matching, deduplication, discard, and multi-email processing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Thomas SharedInbox
2026-05-17 10:34:21 +02:00
co-authored by Claude Sonnet 4.6
parent 8ef1c7c96f
commit 517f799b99
13 changed files with 600 additions and 4 deletions
+20 -1
View File
@@ -287,6 +287,21 @@ class UndoActions extends Table {
Set<Column> get primaryKey => {id};
}
/// Records which emails have already had local Sieve rules applied.
/// Keyed by (accountId, messageId) so the same email is never processed twice,
/// even across restarts or re-syncs.
@DataClassName('LocalSieveAppliedRow')
class LocalSieveApplied extends Table {
TextColumn get accountId =>
text().references(Accounts, #id, onDelete: KeyAction.cascade)();
// RFC 2822 Message-ID header value — stable across folder moves.
TextColumn get messageId => text()();
DateTimeColumn get appliedAt => dateTime()();
@override
Set<Column> get primaryKey => {accountId, messageId};
}
// ── Database ──────────────────────────────────────────────────────────────────
@DriftDatabase(
@@ -305,6 +320,7 @@ class UndoActions extends Table {
UndoActions,
SearchHistoryEntries,
LocalSieveScripts,
LocalSieveApplied,
ShareKeys,
],
)
@@ -312,7 +328,7 @@ class AppDatabase extends _$AppDatabase {
AppDatabase([QueryExecutor? executor]) : super(executor ?? _openConnection());
@override
int get schemaVersion => 31;
int get schemaVersion => 32;
Future<void> _createEmailFts() async {
await customStatement('''
@@ -550,6 +566,9 @@ class AppDatabase extends _$AppDatabase {
if (from < 31) {
await m.createTable(shareKeys);
}
if (from < 32) {
await m.createTable(localSieveApplied);
}
},
);
}
@@ -13,6 +13,9 @@ import 'package:sharedinbox/core/models/account.dart' as account_model;
import 'package:sharedinbox/core/models/email.dart' as model;
import 'package:sharedinbox/core/repositories/account_repository.dart';
import 'package:sharedinbox/core/repositories/email_repository.dart';
import 'package:sharedinbox/core/sieve/sieve_interpreter.dart';
import 'package:sharedinbox/core/sieve/sieve_parser.dart';
import 'package:sharedinbox/core/sieve/sieve_rule.dart';
import 'package:sharedinbox/core/utils/cid_utils.dart';
import 'package:sharedinbox/core/utils/logger.dart';
import 'package:sharedinbox/data/db/database.dart';
@@ -1917,6 +1920,218 @@ class EmailRepositoryImpl implements EmailRepository {
}
}
/// Applies locally stored active Sieve rules to INBOX emails that have not
/// been processed yet. See [EmailRepository.applySieveRules] for details.
@override
Future<int> applySieveRules(String accountId) async {
final scriptRow = await (_db.select(_db.localSieveScripts)
..where(
(t) => t.accountId.equals(accountId) & t.isActive.equals(true),
)
..limit(1))
.getSingleOrNull();
if (scriptRow == null) return 0;
List<SieveRule> rules;
try {
rules = SieveParser().parse(scriptRow.content);
} catch (e) {
log('Sieve parse error for account $accountId: $e');
return 0;
}
if (rules.isEmpty) return 0;
final inboxMailbox = await (_db.select(_db.mailboxes)
..where(
(t) => t.accountId.equals(accountId) & t.role.equals('inbox'),
)
..limit(1))
.getSingleOrNull();
final inboxPath = inboxMailbox?.path ?? 'INBOX';
final alreadyApplied = await (_db.select(_db.localSieveApplied)
..where((t) => t.accountId.equals(accountId)))
.get();
final appliedIds = alreadyApplied.map((r) => r.messageId).toSet();
final inboxEmails = await (_db.select(_db.emails)
..where(
(t) =>
t.accountId.equals(accountId) &
t.mailboxPath.equals(inboxPath) &
t.messageId.isNotNull(),
))
.get();
final account = (await _accounts.getAccount(accountId))!;
final interpreter = SieveInterpreter();
var matched = 0;
for (final row in inboxEmails) {
final msgId = row.messageId!;
if (appliedIds.contains(msgId)) continue;
final emailCtx = _buildSieveContext(row);
SieveExecutionContext result;
try {
result = interpreter.execute(rules, emailCtx);
} catch (e) {
log('Sieve interpreter error for message $msgId: $e');
await _markSieveApplied(accountId, msgId);
continue;
}
await _markSieveApplied(accountId, msgId);
if (result.isCancelled) {
await _enqueueSieveDelete(account, row);
matched++;
} else if (result.targetFolders.isNotEmpty) {
final dest = result.targetFolders.first;
await _enqueueSieveMove(account, row, dest);
matched++;
} else if (result.flagsToAdd.isNotEmpty) {
await _enqueueSieveFlagSeen(account, row);
matched++;
}
}
return matched;
}
SieveEmailContext _buildSieveContext(Email row) {
String formatAddrs(String json) {
try {
final list = jsonDecode(json) as List<dynamic>;
return list.map((e) {
final m = e as Map<String, dynamic>;
final name = m['name'] as String? ?? '';
final email = m['email'] as String? ?? '';
return name.isEmpty ? email : '$name <$email>';
}).join(', ');
} catch (_) {
return '';
}
}
return SieveEmailContext(
headers: {
if (row.subject != null && row.subject!.isNotEmpty)
'subject': [row.subject!],
'from': [formatAddrs(row.fromJson)],
'to': [formatAddrs(row.toAddresses)],
'cc': [formatAddrs(row.ccJson)],
if (row.messageId != null) 'message-id': [row.messageId!],
},
);
}
Future<void> _markSieveApplied(String accountId, String messageId) async {
await _db.into(_db.localSieveApplied).insertOnConflictUpdate(
LocalSieveAppliedCompanion.insert(
accountId: accountId,
messageId: messageId,
appliedAt: DateTime.now(),
),
);
}
Future<void> _enqueueSieveMove(
account_model.Account account,
Email row,
String folder,
) async {
String destPath;
if (account.type == account_model.AccountType.jmap) {
final destMailbox = await (_db.select(_db.mailboxes)
..where(
(t) => t.accountId.equals(account.id) & t.name.equals(folder),
)
..limit(1))
.getSingleOrNull();
if (destMailbox == null) {
log('Sieve: JMAP mailbox "$folder" not found for account ${account.id}');
return;
}
destPath = destMailbox.path;
await _enqueueChange(
account.id,
row.id,
'move',
jsonEncode({'src': row.mailboxPath, 'dest': destPath}),
);
} else {
destPath = folder;
await _enqueueChange(
account.id,
row.id,
'move',
jsonEncode({
'uid': row.uid,
'mailboxPath': row.mailboxPath,
'dest': destPath,
}),
);
}
await (_db.update(_db.emails)..where((t) => t.id.equals(row.id))).write(
EmailsCompanion(mailboxPath: Value(destPath)),
);
await _updateThread(account.id, row.mailboxPath, row.threadId ?? row.id);
await _updateThread(account.id, destPath, row.threadId ?? row.id);
}
Future<void> _enqueueSieveDelete(
account_model.Account account,
Email row,
) async {
if (account.type == account_model.AccountType.jmap) {
await _enqueueChange(
account.id,
row.id,
'delete',
jsonEncode(<String, dynamic>{}),
);
} else {
await _enqueueChange(
account.id,
row.id,
'delete',
jsonEncode({'uid': row.uid, 'mailboxPath': row.mailboxPath}),
);
}
await (_db.delete(_db.emails)..where((t) => t.id.equals(row.id))).go();
await _updateThread(account.id, row.mailboxPath, row.threadId ?? row.id);
}
Future<void> _enqueueSieveFlagSeen(
account_model.Account account,
Email row,
) async {
if (account.type == account_model.AccountType.jmap) {
await _enqueueChange(
account.id,
row.id,
'flag_seen',
jsonEncode({'seen': true}),
);
} else {
await _enqueueChange(
account.id,
row.id,
'flag_seen',
jsonEncode({
'uid': row.uid,
'mailboxPath': row.mailboxPath,
'seen': true,
}),
);
}
await (_db.update(_db.emails)..where((t) => t.id.equals(row.id))).write(
const EmailsCompanion(isSeen: Value(true)),
);
await _updateThread(account.id, row.mailboxPath, row.threadId ?? row.id);
}
/// Drains pending changes for [accountId] via the appropriate protocol.
/// Called at the start of each sync cycle. Returns count of applied changes.
@override
@@ -2032,7 +2247,18 @@ class EmailRepositoryImpl implements EmailRepository {
.go();
applied++;
} catch (e) {
await _recordChangeError(row, e);
if (_isImapNotFoundError(e)) {
// Email already gone on the server — treat as success so the
// pending change doesn't accumulate or block future changes.
await (_db.delete(
_db.pendingChanges,
)..where((t) => t.id.equals(row.id)))
.go();
applied++;
log('IMAP change ${row.id} skipped: message already gone ($e)');
} else {
await _recordChangeError(row, e);
}
}
}
} finally {
@@ -2041,6 +2267,11 @@ class EmailRepositoryImpl implements EmailRepository {
return applied;
}
bool _isImapNotFoundError(Object e) {
final s = e.toString().toLowerCase();
return s.contains('nonexistent') || s.contains('not found');
}
Future<void> _applyPendingChangeImap(
imap.ImapClient client,
PendingChangeRow row,