import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:math'; import 'package:drift/drift.dart'; import 'package:drift/native.dart'; import 'package:enough_mail/enough_mail.dart' as mail; import 'package:http/http.dart' as http; import 'package:path/path.dart' as p; import 'package:sharedinbox/core/models/account.dart' as model; import 'package:sharedinbox/core/storage/secure_storage.dart'; import 'package:sharedinbox/data/db/database.dart'; import 'package:sharedinbox/data/repositories/account_repository_impl.dart'; import 'package:sharedinbox/data/repositories/email_repository_impl.dart'; import 'package:sharedinbox/data/repositories/mailbox_repository_impl.dart'; Future main() async { final rawArgs = Platform.environment['SYNC_RELIABILITY_ARGS']; final args = rawArgs == null || rawArgs.isEmpty ? const [] : const LineSplitter().convert(rawArgs); await runSyncReliability(args); } Future runSyncReliability(List args) async { driftRuntimeOptions.dontWarnAboutMultipleDatabases = true; final options = _parseOptions(args); final random = Random(); stdout.writeln( 'sync-reliability: updates=${options.updates} cycles=${options.cycles} ' 'imap-dbs=${options.imapDbs} jmap-dbs=${options.jmapDbs}', ); final imapEnv = _StalwartEnv.fromEnvironment(); final jmapEnv = _StalwartEnv.fromEnvironment(); final protocolConfigs = <(_Protocol protocol, int dbCount)>[]; if (options.imapDbs > 0) { protocolConfigs.add((_Protocol.imap, options.imapDbs)); } if (options.jmapDbs > 0) { protocolConfigs.add((_Protocol.jmap, options.jmapDbs)); } if (protocolConfigs.isEmpty) { throw StateError( 'No DBs configured. Set --imap-dbs and/or --jmap-dbs > 0.', ); } for (final config in protocolConfigs) { final protocol = config.$1; final dbCount = config.$2; stdout.writeln('\n== protocol: ${protocol.name} dbs=$dbCount =='); final tempRoot = await Directory.systemTemp.createTemp( 'sharedinbox_sync_reliability_', ); final runners = <_Runner>[]; for (var i = 0; i < dbCount; i++) { final runner = await _createRunner( rootDir: Directory(p.join(tempRoot.path, 'db_${i + 1}')), accountId: 'sync-account', env: protocol == _Protocol.imap ? imapEnv : jmapEnv, protocol: protocol, ); runners.add(runner); } try { for (var cycle = 1; cycle <= options.cycles; cycle++) { stdout.writeln('cycle $cycle/${options.cycles}: sync start'); await _fullSyncAll(runners); final opPlan = await _buildOperationPlan( runners, updates: options.updates, random: random, ); stdout.writeln( 'cycle $cycle/${options.cycles}: running ${opPlan.length} concurrent mutations', ); await Future.wait(opPlan.map((op) => op.run())); await _waitForConvergence(runners, cycle: cycle); stdout.writeln('cycle $cycle/${options.cycles}: OK'); } } finally { for (final runner in runners) { await runner.close(); } await tempRoot.delete(recursive: true); } } stdout.writeln('\nAll sync reliability checks passed.'); } Future _waitForConvergence( List<_Runner> runners, { required int cycle, }) async { const maxAttempts = 20; const settleDelay = Duration(milliseconds: 250); for (var attempt = 1; attempt <= maxAttempts; attempt++) { await _fullSyncAll(runners); try { await _assertSnapshotsEqual(runners, cycle: cycle); if (attempt > 1) { stdout.writeln('cycle $cycle: converged after $attempt sync attempts'); } return; } catch (_) { if (attempt == maxAttempts) break; await Future.delayed(settleDelay); } } stdout.writeln( 'cycle $cycle: convergence not reached, forcing full resync on all DBs', ); await Future.wait(runners.map((r) => r.forceFullResync())); await _assertSnapshotsEqual(runners, cycle: cycle); } Future _fullSyncAll(List<_Runner> runners) async { await Future.wait(runners.map((r) => r.syncAll())); } Future> _buildOperationPlan( List<_Runner> runners, { required int updates, required Random random, }) async { final allIdsSet = {}; for (final runner in runners) { allIdsSet.addAll(await runner.emailIds()); } final allIds = allIdsSet.toList()..sort(); final deletableIds = {...allIds}; final ops = <_Op>[]; for (var i = 0; i < updates; i++) { final target = runners[random.nextInt(runners.length)]; final roll = random.nextInt(100); if (roll < 40 || allIds.isEmpty) { ops.add( _Op( label: 'create', run: () async => target.createMessage(), ), ); continue; } if (roll < 70) { final id = allIds[random.nextInt(allIds.length)]; final updateSeen = random.nextBool(); final flagValue = random.nextBool(); ops.add( _Op( label: 'update', run: () async { await target.setRandomFlag( id, useSeen: updateSeen, value: flagValue, ); }, ), ); continue; } if (deletableIds.isEmpty) { ops.add( _Op( label: 'create', run: () async => target.createMessage(), ), ); continue; } final id = deletableIds.elementAt(random.nextInt(deletableIds.length)); deletableIds.remove(id); ops.add( _Op( label: 'delete', run: () async => target.deleteIfPresent(id), ), ); } return ops; } Future _assertSnapshotsEqual( List<_Runner> runners, { required int cycle, }) async { final baseline = runners.first; final baselineSnapshot = await baseline.snapshot(); for (var i = 1; i < runners.length; i++) { final snapshot = await runners[i].snapshot(); if (baselineSnapshot != snapshot) { final leftLines = const LineSplitter().convert(baselineSnapshot); final rightLines = const LineSplitter().convert(snapshot); final max = leftLines.length > rightLines.length ? leftLines.length : rightLines.length; var diffLine = -1; for (var line = 0; line < max; line++) { final l = line < leftLines.length ? leftLines[line] : ''; final r = line < rightLines.length ? rightLines[line] : ''; if (l != r) { diffLine = line + 1; break; } } throw StateError( 'DB snapshots differ after cycle $cycle. ' 'First differing line: $diffLine between db1 and db${i + 1}\n' '--- db1 ---\n$baselineSnapshot\n' '--- db${i + 1} ---\n$snapshot', ); } } for (var i = 0; i < runners.length; i++) { final pending = await runners[i].pendingCount(); if (pending != 0) { throw StateError( 'Pending queue not empty after cycle $cycle: db${i + 1}=$pending', ); } } } Future<_Runner> _createRunner({ required Directory rootDir, required String accountId, required _StalwartEnv env, required _Protocol protocol, }) async { await rootDir.create(recursive: true); final dbFile = File(p.join(rootDir.path, 'db.sqlite')); final db = AppDatabase(NativeDatabase(dbFile)); final storage = _MemSecureStorage(); final accounts = AccountRepositoryImpl(db, storage); final mailboxRepo = MailboxRepositoryImpl( db, accounts, imapConnect: _connectImapPlaintext, httpClient: http.Client(), ); final emailRepo = EmailRepositoryImpl( db, accounts, imapConnect: _connectImapPlaintext, smtpConnect: _connectSmtpPlaintext, httpClient: http.Client(), ); final account = switch (protocol) { _Protocol.imap => model.Account( id: accountId, displayName: 'Sync Reliability IMAP', email: env.user, imapHost: env.imapHost, imapPort: env.imapPort, smtpHost: env.smtpHost, smtpPort: env.smtpPort, ), _Protocol.jmap => model.Account( id: accountId, displayName: 'Sync Reliability JMAP', email: env.user, type: model.AccountType.jmap, jmapUrl: '${env.baseUrl}/.well-known/jmap', imapHost: env.imapHost, imapPort: env.imapPort, smtpHost: env.smtpHost, smtpPort: env.smtpPort, ), }; await accounts.addAccount(account, env.password); return _Runner( protocol: protocol, accountId: accountId, accountEmail: env.user, imapHost: env.imapHost, imapPort: env.imapPort, accountPassword: env.password, db: db, mailboxes: mailboxRepo, emails: emailRepo, ); } Future _connectImapPlaintext( model.Account account, String username, String password, ) async { final client = mail.ImapClient( defaultResponseTimeout: const Duration(seconds: 20), ); await client.connectToServer( account.imapHost, account.imapPort, // ignore: avoid_redundant_argument_values isSecure: false, ); await client.login(username, password); return client; } Future _connectSmtpPlaintext( model.Account account, String username, String password, ) async { final at = account.email.lastIndexOf('@'); final domain = at == -1 ? account.smtpHost : account.email.substring(at + 1); final client = mail.SmtpClient(domain); await client.connectToServer( account.smtpHost, account.smtpPort, // ignore: avoid_redundant_argument_values isSecure: false, ); await client.ehlo(); await client.authenticate(username, password); return client; } class _Runner { _Runner({ required this.protocol, required this.accountId, required this.accountEmail, required this.imapHost, required this.imapPort, required this.accountPassword, required this.db, required this.mailboxes, required this.emails, }); final _Protocol protocol; final String accountId; final String accountEmail; final String imapHost; final int imapPort; final String accountPassword; final AppDatabase db; final MailboxRepositoryImpl mailboxes; final EmailRepositoryImpl emails; int _createCounter = 0; Future syncAll() async { await emails.flushPendingChanges(accountId, accountPassword); await mailboxes.syncMailboxes(accountId); final mailboxRows = await (db.select(db.mailboxes) ..where((t) => t.accountId.equals(accountId)) ..orderBy([(t) => OrderingTerm.asc(t.path)])) .get(); if (mailboxRows.isEmpty) { throw StateError('No mailboxes found for account $accountId after sync'); } for (final mailbox in mailboxRows) { await emails.syncEmails(accountId, mailbox.path); } } Future createMessage() async { _createCounter++; final now = DateTime.now().microsecondsSinceEpoch; final builder = mail.MessageBuilder() ..from = [mail.MailAddress('Sync Bot', accountEmail)] ..to = [mail.MailAddress('Sync Bot', accountEmail)] ..subject = 'sync-reliability-${protocol.name}-$now-$_createCounter' ..text = 'sync reliability body $now'; final client = mail.ImapClient( defaultResponseTimeout: const Duration(seconds: 20), ); await client.connectToServer( imapHost, imapPort, // ignore: avoid_redundant_argument_values isSecure: false, ); try { await client.login(accountEmail, accountPassword); await client.appendMessage( builder.buildMimeMessage(), targetMailboxPath: 'INBOX', ); } finally { await client.logout(); } } Future setRandomFlag( String emailId, { required bool useSeen, required bool value, }) async { if (!await _emailExists(emailId)) { return; } if (useSeen) { await emails.setFlag(emailId, seen: value); } else { await emails.setFlag(emailId, flagged: value); } } Future deleteIfPresent(String emailId) async { if (!await _emailExists(emailId)) { return; } await emails.deleteEmail(emailId); } Future _emailExists(String emailId) async { final row = await (db.select(db.emails)..where((t) => t.id.equals(emailId))) .getSingleOrNull(); return row != null; } Future> emailIds() async { final rows = await (db.select(db.emails) ..where((t) => t.accountId.equals(accountId)) ..orderBy([(t) => OrderingTerm.asc(t.id)])) .get(); return rows.map((e) => e.id).toList(growable: false); } Future pendingCount() async { final rows = await (db.select(db.pendingChanges) ..where((t) => t.accountId.equals(accountId))) .get(); return rows.length; } Future snapshot() async { final mailboxRows = await (db.select(db.mailboxes) ..where((t) => t.accountId.equals(accountId)) ..orderBy([(t) => OrderingTerm.asc(t.path)])) .get(); final emailRows = await (db.select(db.emails) ..where((t) => t.accountId.equals(accountId)) ..orderBy([(t) => OrderingTerm.asc(t.id)])) .get(); final pendingRows = await (db.select(db.pendingChanges) ..where((t) => t.accountId.equals(accountId)) ..orderBy([(t) => OrderingTerm.asc(t.id)])) .get(); final obj = { 'mailboxes': mailboxRows .map( (r) => { 'id': r.id, 'accountId': r.accountId, 'path': r.path, 'name': r.name, 'unreadCount': r.unreadCount, 'totalCount': r.totalCount, 'role': r.role, }, ) .toList(growable: false), 'emails': emailRows .map( (r) => { 'id': r.id, 'accountId': r.accountId, 'mailboxPath': r.mailboxPath, 'uid': r.uid, 'subject': r.subject, 'fromJson': r.fromJson, 'toAddresses': r.toAddresses, 'ccJson': r.ccJson, 'preview': r.preview, 'isSeen': r.isSeen, 'isFlagged': r.isFlagged, 'hasAttachment': r.hasAttachment, }, ) .toList(growable: false), 'pendingChanges': pendingRows .map( (r) => { 'accountId': r.accountId, 'resourceType': r.resourceType, 'resourceId': r.resourceId, 'changeType': r.changeType, 'payload': r.payload, 'attempts': r.attempts, 'lastError': r.lastError, }, ) .toList(growable: false), }; return const JsonEncoder.withIndent(' ').convert(obj); } Future close() async { await db.close(); } Future forceFullResync() async { await (db.delete(db.syncStates) ..where((t) => t.accountId.equals(accountId))) .go(); await (db.delete(db.emails)..where((t) => t.accountId.equals(accountId))) .go(); await (db.delete(db.mailboxes)..where((t) => t.accountId.equals(accountId))) .go(); await syncAll(); } } class _MemSecureStorage implements SecureStorage { final Map _values = {}; @override Future read({required String key}) async => _values[key]; @override Future write({required String key, required String? value}) async { if (value == null) { _values.remove(key); } else { _values[key] = value; } } @override Future delete({required String key}) async { _values.remove(key); } } class _Op { _Op({required this.label, required this.run}); final String label; final Future Function() run; } class _StalwartEnv { const _StalwartEnv({ required this.baseUrl, required this.imapHost, required this.imapPort, required this.smtpHost, required this.smtpPort, required this.user, required this.password, }); final String baseUrl; final String imapHost; final int imapPort; final String smtpHost; final int smtpPort; final String user; final String password; factory _StalwartEnv.fromEnvironment() { final baseUrl = Platform.environment['STALWART_URL'] ?? 'http://127.0.0.1:8080'; final imapHost = Platform.environment['STALWART_IMAP_HOST'] ?? '127.0.0.1'; final imapPort = int.tryParse(Platform.environment['STALWART_IMAP_PORT'] ?? '') ?? 1430; final smtpHost = Platform.environment['STALWART_SMTP_HOST'] ?? '127.0.0.1'; final smtpPort = int.tryParse(Platform.environment['STALWART_SMTP_PORT'] ?? '') ?? 1025; final user = Platform.environment['STALWART_USER_B'] ?? 'alice@example.com'; final password = Platform.environment['STALWART_PASS_B'] ?? 'secret'; return _StalwartEnv( baseUrl: baseUrl, imapHost: imapHost, imapPort: imapPort, smtpHost: smtpHost, smtpPort: smtpPort, user: user, password: password, ); } } class _Options { const _Options({ required this.updates, required this.cycles, required this.imapDbs, required this.jmapDbs, }); final int updates; final int cycles; final int imapDbs; final int jmapDbs; } enum _Protocol { imap, jmap } _Options _parseOptions(List args) { var updates = 10; var cycles = 3; var imapDbs = 1; var jmapDbs = 1; final positionals = []; for (final arg in args) { if (arg == '--help' || arg == '-h') { _printUsageAndExit(0); } if (arg.startsWith('--updates=')) { updates = _parsePositiveInt(arg.split('=').last, '--updates'); continue; } if (arg.startsWith('--cycles=')) { cycles = _parsePositiveInt(arg.split('=').last, '--cycles'); continue; } if (arg.startsWith('--imap-dbs=')) { imapDbs = _parseNonNegativeInt(arg.split('=').last, '--imap-dbs'); continue; } if (arg.startsWith('--jmap-dbs=')) { jmapDbs = _parseNonNegativeInt(arg.split('=').last, '--jmap-dbs'); continue; } if (arg.startsWith('-')) { throw StateError('Unknown option: $arg'); } positionals.add(arg); } if (positionals.isNotEmpty) { updates = _parsePositiveInt(positionals[0], 'updates'); } if (positionals.length > 1) { cycles = _parsePositiveInt(positionals[1], 'cycles'); } if (positionals.length > 2) { throw StateError('Too many positional args: $positionals'); } if (imapDbs == 0 && jmapDbs == 0) { throw StateError('At least one of --imap-dbs or --jmap-dbs must be > 0'); } return _Options( updates: updates, cycles: cycles, imapDbs: imapDbs, jmapDbs: jmapDbs, ); } int _parsePositiveInt(String value, String name) { final parsed = int.tryParse(value); if (parsed == null || parsed <= 0) { throw StateError('$name must be a positive integer, got "$value"'); } return parsed; } int _parseNonNegativeInt(String value, String name) { final parsed = int.tryParse(value); if (parsed == null || parsed < 0) { throw StateError('$name must be a non-negative integer, got "$value"'); } return parsed; } Never _printUsageAndExit(int code) { stdout.writeln( 'Usage: fvm flutter pub run scripts/sync_reliability.dart [updates] [cycles] ' '[--imap-dbs=N] [--jmap-dbs=N]\n\n' 'Defaults: updates=10 cycles=3 imap-dbs=1 jmap-dbs=1', ); exit(code); }