Skip to content

perf(llc): Reduce the number of read message per channel from DB when paginating (part 1)#2679

Open
VelikovPetar wants to merge 8 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db
Open

perf(llc): Reduce the number of read message per channel from DB when paginating (part 1)#2679
VelikovPetar wants to merge 8 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db

Conversation

@VelikovPetar
Copy link
Copy Markdown
Contributor

@VelikovPetar VelikovPetar commented May 21, 2026

Submit a pull request

Linear: Part one of: FLU-485

CLA

  • I have signed the Stream CLA (required).
  • The code changes follow best practices
  • Code changes are tested (add some information if not applicable)

Description of the pull request

  • Read only the messages matching the PaginationParams from DB when calling MessageDao.getMessagesByCid instead of reading all messages for the channel and applying pagination in memory.
  • Read only the reactions matching the userId from DB when calling ReactionDao.getReactionsByUserId instead of reading all reactions for the message and filtering in memory.
  • Read only the reactions matching the userId from DB when calling PinnedMessageReactionDao.getReactionsByUserId instead of reading all reactions for the message and filtering in memory.

Screenshots / Videos

NA

Testing

Apply the given patch and run the test suites:

  • get_messages_by_cid_parity.dart - parity test check affirming no regressions in the new SQL first implementation against the old implementation
  • get_messages_by_cid_bench_test.dart - bench test comparing the performance of the new vs old implementations
Patch
Subject: [PATCH] refactor(dao): Update CHANGELOG.md
---
Index: packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_bench_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_bench_test.dart b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_bench_test.dart
new file mode 100644
--- /dev/null	(date 1779371365664)
+++ b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_bench_test.dart	(date 1779371365664)
@@ -0,0 +1,237 @@
+import 'package:drift/drift.dart';
+import 'package:drift/native.dart';
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+/// Counts SELECT statements and rows returned through them. Used to
+/// instrument the legacy and SQL-pushdown implementations of
+/// `getMessagesByCid` for head-to-head comparison.
+class _CountingInterceptor extends QueryInterceptor {
+  int selectCount = 0;
+  int rowsReturned = 0;
+
+  void reset() {
+    selectCount = 0;
+    rowsReturned = 0;
+  }
+
+  @override
+  Future<List<Map<String, Object?>>> runSelect(
+    QueryExecutor executor,
+    String statement,
+    List<Object?> args,
+  ) async {
+    final result = await executor.runSelect(statement, args);
+    selectCount++;
+    rowsReturned += result.length;
+    return result;
+  }
+}
+
+typedef _BenchResult = ({
+  List<String> messageIds,
+  int selectCount,
+  int rowsReturned,
+  int medianMicros,
+});
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+  late _CountingInterceptor interceptor;
+
+  setUp(() {
+    interceptor = _CountingInterceptor();
+    final executor = NativeDatabase.memory().interceptWith(interceptor);
+    database = DriftChatDatabase('testUserId', executor);
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds `count` messages with strictly monotonic `createdAt`. Drift stores
+  // DateTime as integer Unix seconds by default, so the offset must be at
+  // least 1 second per row to avoid tie collapses.
+  Future<void> seedMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final users = List.generate(count, (i) => User(id: 'user$i'));
+    final baseTime = DateTime.now();
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: users[i],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+      ),
+    );
+    await database.userDao.updateUsers(users);
+    await database.channelDao.updateChannels(channels);
+    await messageDao.updateMessages(cid, messages);
+  }
+
+  Future<_BenchResult> runBench(
+    Future<List<Message>> Function() fn, {
+    int warmups = 2,
+    int iterations = 10,
+  }) async {
+    // Warmup runs prime the JIT / SQLite query cache. Their interceptor
+    // counts are discarded by the reset before each timed iteration below.
+    for (var i = 0; i < warmups; i++) {
+      await fn();
+    }
+
+    final timings = <int>[];
+    List<Message>? lastResult;
+    for (var i = 0; i < iterations; i++) {
+      interceptor.reset();
+      final sw = Stopwatch()..start();
+      lastResult = await fn();
+      sw.stop();
+      timings.add(sw.elapsedMicroseconds);
+    }
+    timings.sort();
+    final median = timings[timings.length ~/ 2];
+
+    return (
+      messageIds: lastResult!.map((m) => m.id).toList(),
+      selectCount: interceptor.selectCount,
+      rowsReturned: interceptor.rowsReturned,
+      medianMicros: median,
+    );
+  }
+
+  void printTable(
+    String scenario,
+    int n,
+    _BenchResult legacy,
+    _BenchResult pushdown,
+  ) {
+    String improvement(int oldV, int newV) =>
+        newV == 0 ? '—' : '${(oldV / newV).toStringAsFixed(2)}×';
+
+    String pad(Object v, [int width = 10]) => v.toString().padRight(width);
+
+    final parity = pushdown.messageIds.toString() ==
+            legacy.messageIds.toString()
+        ? 'OK'
+        : 'MISMATCH';
+
+    // ignore: avoid_print
+    print('''
+
+Scenario: $scenario  (N=$n, P=${pushdown.messageIds.length})
+                 ${pad('OLD')}${pad('NEW')}Improvement (old/new)
+SELECT calls     ${pad(legacy.selectCount)}${pad(pushdown.selectCount)}${improvement(legacy.selectCount, pushdown.selectCount)}
+Rows fetched     ${pad(legacy.rowsReturned)}${pad(pushdown.rowsReturned)}${improvement(legacy.rowsReturned, pushdown.rowsReturned)}
+Time (us, med)   ${pad(legacy.medianMicros)}${pad(pushdown.medianMicros)}${improvement(legacy.medianMicros, pushdown.medianMicros)}
+Result parity    $parity
+''');
+  }
+
+  group('getMessagesByCid: legacy vs SQL pushdown', () {
+    const cid = 'test:Cid';
+    const n = 200;
+
+    Future<void> runScenario(String label, PaginationParams? p) async {
+      await seedMessages(cid, n);
+
+      final legacy = await runBench(
+        () => messageDao.getMessagesByCidLegacy(cid, messagePagination: p),
+      );
+      final pushdown = await runBench(
+        () => messageDao.getMessagesByCid(cid, messagePagination: p),
+      );
+
+      expect(
+        pushdown.messageIds,
+        equals(legacy.messageIds),
+        reason: 'parity broken for "$label"',
+      );
+      expect(
+        pushdown.selectCount,
+        lessThanOrEqualTo(legacy.selectCount),
+        reason: 'SQL pushdown issued more SELECTs than legacy for "$label"',
+      );
+      expect(
+        pushdown.rowsReturned,
+        lessThanOrEqualTo(legacy.rowsReturned),
+        reason: 'SQL pushdown materialized more rows than legacy for "$label"',
+      );
+
+      printTable(label, n, legacy, pushdown);
+    }
+
+    test('no pagination', () => runScenario('no pagination', null));
+
+    test(
+      'limit: 30',
+      () => runScenario('limit: 30', const PaginationParams(limit: 30)),
+    );
+
+    test(
+      'lessThan + limit (scroll up)',
+      () => runScenario(
+        'lessThan: msg175, limit: 30',
+        const PaginationParams(limit: 30, lessThan: 'msg175'),
+      ),
+    );
+
+    test(
+      'greaterThan + limit (scroll down)',
+      () => runScenario(
+        'greaterThan: msg25, limit: 30',
+        const PaginationParams(limit: 30, greaterThan: 'msg25'),
+      ),
+    );
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => runScenario(
+        'lessThan: msg175, greaterThan: msg25, limit: 30',
+        const PaginationParams(
+          limit: 30,
+          lessThan: 'msg175',
+          greaterThan: 'msg25',
+        ),
+      ),
+    );
+  });
+
+  group('getMessagesByCid: stress', () {
+    const cid = 'test:Cid';
+
+    test(
+      '1000 messages, limit: 30',
+      () async {
+        await seedMessages(cid, 1000);
+
+        final legacy = await runBench(
+          () => messageDao.getMessagesByCidLegacy(
+            cid,
+            messagePagination: const PaginationParams(limit: 30),
+          ),
+        );
+        final pushdown = await runBench(
+          () => messageDao.getMessagesByCid(
+            cid,
+            messagePagination: const PaginationParams(limit: 30),
+          ),
+        );
+
+        expect(pushdown.messageIds, equals(legacy.messageIds));
+        expect(pushdown.selectCount, lessThanOrEqualTo(legacy.selectCount));
+        expect(pushdown.rowsReturned, lessThanOrEqualTo(legacy.rowsReturned));
+
+        printTable('stress: 1000 messages, limit 30', 1000, legacy, pushdown);
+      },
+      // skip: 'Stress test — slow, runnable on demand by removing this skip',
+    );
+  });
+}
Index: packages/stream_chat_persistence/lib/src/dao/message_dao.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart
--- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(revision 468960622a1d24f0c0619cd3453a2687d1e9537c)
+++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(date 1779371587795)
@@ -1,4 +1,7 @@
+import 'dart:math';
+
 import 'package:drift/drift.dart';
+import 'package:flutter/foundation.dart';
 import 'package:stream_chat/stream_chat.dart';
 import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
 import 'package:stream_chat_persistence/src/entity/messages.dart';
@@ -161,6 +164,66 @@
     }
     return msgList;
   }
+
+  /// Pre-SQL-pushdown reference implementation of [getMessagesByCid]. Fetches
+  /// every cached message for the channel, hydrates each row, then trims the
+  /// result in Dart. Kept only as the head-to-head baseline for the
+  /// `get_messages_by_cid_bench_test.dart` benchmark — remove once we no
+  /// longer need behavioral parity proof.
+  @visibleForTesting
+  Future<List<Message>> getMessagesByCidLegacy(
+    String cid, {
+    bool fetchDraft = true,
+    PaginationParams? messagePagination,
+  }) async {
+    final query = select(messages).join([
+      leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
+      leftOuterJoin(
+        _pinnedByUsers,
+        messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
+      ),
+    ])
+      ..where(messages.channelCid.equals(cid))
+      ..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
+      ..orderBy([OrderingTerm.asc(messages.createdAt)]);
+
+    final result = await query.get();
+    if (result.isEmpty) return [];
+
+    final msgList = await Future.wait(
+      result.map(
+            (row) => _messageFromJoinRow(
+          row,
+          fetchDraft: fetchDraft,
+        ),
+      ),
+    );
+
+    if (msgList.isNotEmpty) {
+      if (messagePagination?.lessThan != null) {
+        final lessThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.lessThan,
+        );
+        if (lessThanIndex != -1) {
+          msgList.removeRange(lessThanIndex, msgList.length);
+        }
+      }
+      if (messagePagination?.greaterThan != null) {
+        final greaterThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.greaterThan,
+        );
+        if (greaterThanIndex != -1) {
+          msgList.removeRange(0, greaterThanIndex);
+        }
+      }
+      if (messagePagination?.limit != null) {
+        return msgList
+            .skip(max(0, msgList.length - messagePagination!.limit))
+            .toList();
+      }
+    }
+    return msgList;
+  }
 
   /// Returns all the messages of a channel by matching
   /// [Messages.channelCid] with [parentId]
Index: packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
new file mode 100644
--- /dev/null	(date 1779371365667)
+++ b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart	(date 1779371365667)
@@ -0,0 +1,187 @@
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+import '../../stream_chat_persistence_client_test.dart';
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+
+  setUp(() {
+    database = testDatabaseProvider('testUserId');
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds a channel with `count` messages that exercise every hydration path
+  // `_messageFromJoinRow` touches: user attribution, latest/own reactions,
+  // quoted messages, polls, and a channel-level draft. Monotonic 1-second
+  // offsets on `createdAt` because Drift stores DateTime as integer Unix
+  // seconds — sub-second offsets collapse onto the same tick.
+  Future<void> seedRichMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final dbUser = User(id: 'testUserId'); // matches the DB's _userId
+    final otherUsers = List.generate(count, (i) => User(id: 'otherUser$i'));
+    final allUsers = [dbUser, ...otherUsers];
+    final baseTime = DateTime.now();
+
+    final poll = Poll(
+      id: 'poll0',
+      name: 'Pick one',
+      options: [
+        PollOption(id: 'opt0', text: 'A'),
+        PollOption(id: 'opt1', text: 'B'),
+      ],
+      // `createdById` must reference an existing user — `PollDao._pollFromJoinRow`
+      // uses `readTable(users)` (not `readTableOrNull`) on a LEFT JOIN, which
+      // throws if there's no matching row. Pre-existing PollDao quirk; out of
+      // scope for the pagination-pushdown work.
+      createdById: dbUser.id,
+    );
+
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: allUsers[i % allUsers.length],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+        // Every 3rd message (i ≥ 3) quotes the message 2 positions earlier.
+        quotedMessageId: (i >= 3 && i % 3 == 0) ? 'msg${i - 2}' : null,
+        // Every 5th message attaches to a poll.
+        pollId: (i % 5 == 0) ? 'poll0' : null,
+      ),
+    );
+
+    // Reactions populate both `latestReactions` (any user) and
+    // `ownReactions` (matching the DB user). Mix on each message so the two
+    // lists end up non-identical, which surfaces any divergence in hydration.
+    final reactions = <Reaction>[
+      for (var i = 0; i < count; i++) ...[
+        if (i.isEven)
+          Reaction(
+            type: 'like',
+            messageId: 'msg$i',
+            user: dbUser,
+            createdAt: baseTime.add(Duration(seconds: i)),
+          ),
+        Reaction(
+          type: 'love',
+          messageId: 'msg$i',
+          user: otherUsers[i % otherUsers.length],
+          createdAt: baseTime.add(Duration(seconds: i)),
+        ),
+      ],
+    ];
+
+    final draft = Draft(
+      channelCid: cid,
+      createdAt: baseTime,
+      message: DraftMessage(
+        id: 'draft0',
+        text: 'Unsent reply',
+      ),
+    );
+
+    await database.userDao.updateUsers(allUsers);
+    await database.channelDao.updateChannels(channels);
+    await database.pollDao.updatePolls([poll]);
+    await messageDao.updateMessages(cid, messages);
+    await database.reactionDao.updateReactions(reactions);
+    await database.draftMessageDao.updateDraftMessages([draft]);
+  }
+
+  // Builds a structural fingerprint of a Message that captures every field
+  // the two implementations should agree on after hydration. Used in place
+  // of `==` because `Reaction` (and friends) don't extend `Equatable`, so
+  // identity-based equality on nested lists fails for instances built by
+  // separate calls.
+  String fingerprintMessage(Message m) {
+    String reactionFp(Reaction r) => '${r.type}@${r.user?.id ?? "-"}';
+    String reactionListFp(List<Reaction>? rs) =>
+        '[${(rs ?? const []).map(reactionFp).join(",")}]';
+
+    return [
+      'id=${m.id}',
+      'text=${m.text ?? ""}',
+      'user=${m.user?.id ?? "-"}',
+      'createdAt=${m.createdAt.toUtc().toIso8601String()}',
+      'latest=${reactionListFp(m.latestReactions)}',
+      'own=${reactionListFp(m.ownReactions)}',
+      'quoted=${m.quotedMessage?.id ?? "-"}',
+      'poll=${m.poll?.id ?? "-"}',
+      'draft=${m.draft?.message.id ?? "-"}',
+    ].join(' | ');
+  }
+
+  group('getMessagesByCid: full data parity (legacy vs SQL pushdown)', () {
+    const cid = 'test:Cid';
+    const n = 30;
+
+    Future<void> assertParity(String label, PaginationParams? p) async {
+      await seedRichMessages(cid, n);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(
+        cid,
+        messagePagination: p,
+      );
+      final pushdown = await messageDao.getMessagesByCid(
+        cid,
+        messagePagination: p,
+      );
+
+      expect(
+        pushdown.length,
+        legacy.length,
+        reason: 'list lengths differ for "$label"',
+      );
+      expect(
+        pushdown.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+        reason: 'message data parity broken for "$label"',
+      );
+    }
+
+    test('no pagination', () => assertParity('no pagination', null));
+
+    test(
+      'limit only',
+      () => assertParity('limit: 10', const PaginationParams(limit: 10)),
+    );
+
+    test(
+      'lessThan + limit',
+      () => assertParity(
+        'lessThan: msg25, limit: 10',
+        const PaginationParams(limit: 10, lessThan: 'msg25'),
+      ),
+    );
+
+    test(
+      'greaterThan + limit',
+      () => assertParity(
+        'greaterThan: msg5, limit: 10',
+        const PaginationParams(limit: 10, greaterThan: 'msg5'),
+      ),
+    );
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => assertParity(
+        'lessThan: msg25, greaterThan: msg5, limit: 10',
+        const PaginationParams(
+          limit: 10,
+          lessThan: 'msg25',
+          greaterThan: 'msg5',
+        ),
+      ),
+    );
+  });
+}

Summary by CodeRabbit

  • Bug Fixes

    • Reactions and pinned reactions now correctly filter by user ID at the database level instead of in-memory.
  • Performance

    • Message queries now apply pagination limits and cursor filters in the database, reducing data fetched and improving efficiency.
  • Tests

    • Expanded tests for pagination trimming and user-specific reaction filtering.
  • Documentation

    • Added an "Upcoming Changes" section to the package changelog.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c0016199-90c3-428a-944c-7346f2a93878

📥 Commits

Reviewing files that changed from the base of the PR and between 4689606 and edb86f7.

📒 Files selected for processing (1)
  • packages/stream_chat_persistence/lib/src/dao/message_dao.dart

📝 Walkthrough

Walkthrough

This PR moves pagination and userId filtering into SQL: Message pagination resolves cursor ids to createdAt and applies cursor/limit in Drift queries; Reaction and PinnedMessageReaction DAOs centralize joined queries and apply messageId+userId predicates in SQL. Tests and changelog updated.

Changes

Database Query Filtering and Pagination Optimization

Layer / File(s) Summary
Message pagination refactored to SQL cursors and limits
packages/stream_chat_persistence/lib/src/dao/message_dao.dart, packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
MessageDao.getMessagesByCid now resolves message-id pagination cursors to createdAt timestamps and applies cursor filters and limit in the Drift SQL query. Adds _lookupMessageCreatedAt. Query orders descending and reverses results before mapping. Tests use monotonic createdAt and expand pagination assertions for lessThan/greaterThan/limit and missing-cutoff cases.
Reaction and pinned reaction filtering refactored to SQL with shared helper
packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart, packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart, packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
ReactionDao and PinnedMessageReactionDao introduce a private _selectReactions(where) helper that performs a left join with users, applies SQL predicates (including userId), orders by createdAt, and maps rows to Reaction. Public getReactions/getReactionsByUserId delegate to this helper so filtering occurs in SQL. Tests add other-user reactions and assert only the target user's reactions are returned.
Performance improvements changelog documentation
packages/stream_chat_persistence/CHANGELOG.md
Adds an "Upcoming Changes" section documenting the new SQL-level query optimizations for message pagination and reaction/pinned-reaction userId filtering.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested reviewers

  • renefloor
  • Brazol

Poem

🐰 I hopped through rows and chased each timestamp bright,
Cursors now point to times, not IDs in flight,
Reactions filtered early, snug in SQL's nest,
Tests steady as clocks—pagination at its best,
A tiny rabbit cheers the database — swift and light.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title directly and specifically describes the main optimization: pushing pagination filtering down to the database level for message retrieval, reducing the number of messages materialized in memory.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/FLU-485_optimize_read_message_from_db

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@VelikovPetar VelikovPetar marked this pull request as ready for review May 21, 2026 13:58
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
packages/stream_chat_persistence/lib/src/dao/message_dao.dart (1)

195-200: 💤 Low value

Asymmetric cursor semantics preserved — worth documenting.

lessThan translates to a strict < (isSmallerThanValue) while greaterThan translates to >= (isBiggerOrEqualValue). This mirrors the legacy in-memory behavior in getThreadMessagesByParentId (lines 141–156: removeRange(lessThanIndex, ...) excludes the cutoff, removeRange(0, greaterThanIndex) keeps it), so parity is preserved — but the SQL now makes the inconsistency very explicit, and the PaginationParams.greaterThan name reads as strict.

Consider a short doc comment on getMessagesByCid clarifying that greaterThan is inclusive of the cursor message and lessThan is exclusive, so future readers don't "fix" one side and break parity.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
195 - 200, Add a short doc comment to getMessagesByCid explaining the asymmetric
cursor semantics: lessThan (mapped to messages.createdAt.isSmallerThanValue) is
exclusive while greaterThan (mapped to messages.createdAt.isBiggerOrEqualValue)
is inclusive of the cursor message, matching the legacy in-memory behavior in
getThreadMessagesByParentId (removeRange usage); mention
PaginationParams.greaterThan and lessThan explicitly so future maintainers
understand why one side is strict and the other is inclusive and don't
accidentally change parity.
packages/stream_chat_persistence/test/src/dao/message_dao_test.dart (1)

28-94: 💤 Low value

Nit: DateTime.now() in baseTime still gives non-deterministic absolute times.

The monotonic 1-second spacing fixes ordering ties as the comment explains, but baseTime = DateTime.now() is still wall-clock dependent — tests that compare against absolute timestamps (or run near a DST/leap boundary in some environments) could become flaky in the future. Consider pinning baseTime to a fixed DateTime.utc(...) constant to make the suite fully deterministic.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/test/src/dao/message_dao_test.dart` around
lines 28 - 94, Replace the wall-clock dependent baseTime assignment with a
deterministic fixed UTC timestamp so generated Message createdAt values are
stable; specifically change the baseTime variable (used to set createdAt in the
Message/quotedMessages/threadMessages lists) from DateTime.now() to a constant
UTC datetime (for example 2020-01-01T00:00:00Z) so tests no longer depend on
current time or local DST/leap boundaries.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart`:
- Around line 195-200: Add a short doc comment to getMessagesByCid explaining
the asymmetric cursor semantics: lessThan (mapped to
messages.createdAt.isSmallerThanValue) is exclusive while greaterThan (mapped to
messages.createdAt.isBiggerOrEqualValue) is inclusive of the cursor message,
matching the legacy in-memory behavior in getThreadMessagesByParentId
(removeRange usage); mention PaginationParams.greaterThan and lessThan
explicitly so future maintainers understand why one side is strict and the other
is inclusive and don't accidentally change parity.

In `@packages/stream_chat_persistence/test/src/dao/message_dao_test.dart`:
- Around line 28-94: Replace the wall-clock dependent baseTime assignment with a
deterministic fixed UTC timestamp so generated Message createdAt values are
stable; specifically change the baseTime variable (used to set createdAt in the
Message/quotedMessages/threadMessages lists) from DateTime.now() to a constant
UTC datetime (for example 2020-01-01T00:00:00Z) so tests no longer depend on
current time or local DST/leap boundaries.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: beebe991-8702-448d-9aab-7f3181f9e824

📥 Commits

Reviewing files that changed from the base of the PR and between 4103da3 and 4689606.

📒 Files selected for processing (7)
  • packages/stream_chat_persistence/CHANGELOG.md
  • packages/stream_chat_persistence/lib/src/dao/message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart
  • packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart

@codecov
Copy link
Copy Markdown

codecov Bot commented May 21, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 65.31%. Comparing base (4103da3) to head (edb86f7).

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2679      +/-   ##
==========================================
+ Coverage   65.27%   65.31%   +0.03%     
==========================================
  Files         423      423              
  Lines       26622    26629       +7     
==========================================
+ Hits        17377    17392      +15     
+ Misses       9245     9237       -8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +202 to 204
if (messagePagination != null) {
query.limit(messagePagination.limit);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we only limit this when either lessThanCutoff or greaterThanCutoff is not null? I don't know if it can happen that the target message is not in the cache, but in that case limiting the amount of messages might give issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually matches the previous behaviour of the method. The limit was always applied, regardless of whether we have a lessThan/greaterThan ID supplied, and regardless of whether the corresponding message was found in the cache.

if (msgList.isNotEmpty) {
      if (messagePagination?.lessThan != null) {
        final lessThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.lessThan,
        );
        if (lessThanIndex != -1) {
          msgList.removeRange(lessThanIndex, msgList.length);
        }
      }
      if (messagePagination?.greaterThan != null) {
        final greaterThanIndex = msgList.indexWhere(
          (m) => m.id == messagePagination!.greaterThan,
        );
        if (greaterThanIndex != -1) {
          msgList.removeRange(0, greaterThanIndex);
        }
      }
      if (messagePagination?.limit != null) {
        return msgList
            .skip(max(0, msgList.length - messagePagination!.limit))
            .toList();
      }
    }

But now that you mentioned it, we actually have several smaller issues that neither the 'old' nor the 'new' implementation cover:

  1. Limit is used wrongly in the greaterThan path: We always take the last items from the fetch, which means that we skip the first messages which are after the greaterThan ID. IMO, we should retrieve the limit amount of messages right after the cursor
  2. We are missing handling for greaterThanOrEqual/lessThanOrEqual in the method
  3. greaterThan case returns the cursor (the pivot message) as well, when it shouldn't

Should we fix these things in this PR? Or do you think it is better to handle them in a follow up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they can be fixed in this PR. You already change the method quite a bit, so would be good to make it correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will address this!

Comment on lines +28 to +32
// Strictly monotonic `createdAt` per message so SQL-side pagination
// filters (`WHERE createdAt < cutoff`, `ORDER BY createdAt ASC`) can't be
// confused by ties. Drift stores `DateTime` as integer Unix seconds by
// default, so the offset must be at least 1 second per row — otherwise
// sub-second offsets all round-trip onto the same second.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should migrate to storing in milliseconds instead of a datetime.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually something I already noted down to check after we do the offline storage optimisations.
In my opinion we should do this, as currently it seems like we might be losing some precision on dates (probably not that much of a big deal, but IMO it would be better to be precise as possible).

I think we should evaluate this separately, because I assume there would be other consequences of this change (maybe more storage consumed?)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, better to do this separately

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants