Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughSubscription checkpoint updates are made thread-safe via ChangesSubscription Checkpoint Race Condition and Iterator Boundary Fix
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 3❌ Failed checks (3 warnings)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.1)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request optimizes the event store by implementing SST file filtering based on transaction commit timestamp ranges, allowing Pebble to skip irrelevant files during scans. It refactors key encoding and decoding logic, introduces shared Pebble caches with proper lifecycle management, and improves concurrency by using atomic types for checkpoint timestamps. Additionally, it refines the handling of subscription state updates to ensure monotonic advancement. Feedback was provided to improve the clarity of a log message regarding stale state updates.
| log.Panic("should not happen", | ||
| subState := tableState.Subscriptions[targetIndex] | ||
| if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs { | ||
| log.Warn("ignore stale subscription state update", |
There was a problem hiding this comment.
The log message "ignore stale subscription state update" is slightly misleading because the code does not ignore the update. It proceeds to update any timestamps that are advancing monotonically. A more accurate log message would be "received stale subscription state update", which correctly indicates that a stale value was detected while still processing the valid parts of the update. This would be clearer for anyone debugging potential message ordering issues.
| log.Warn("ignore stale subscription state update", | |
| log.Warn("received stale subscription state update", |
926bb42 to
4bf750b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@logservice/eventstore/event_store.go`:
- Around line 715-716: The dispatcher checkpoint write at
dispatcherStat.checkpointTs.Store(checkpointTs) can regress a newer
per-dispatcher value; change the update to only advance the per-dispatcher
checkpoint if checkpointTs is greater than the currently stored value by reading
dispatcherStat.checkpointTs (via its Load or atomic read) and using an atomic
compare-and-swap loop to store the new checkpoint only when it is strictly
larger (i.e., retry CAS until success or current >= checkpointTs) so
dispatcherStat.checkpointTs never moves backward and min recomputation reflects
true progress.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fd2ec042-f448-4c1a-ba09-be650be4b428
📒 Files selected for processing (2)
logservice/eventstore/event_store.gologservice/eventstore/event_store_test.go
| dispatcherStat.checkpointTs.Store(checkpointTs) | ||
|
|
There was a problem hiding this comment.
Keep the per-dispatcher checkpoint monotonic too.
Line 715 can still overwrite a newer dispatcher checkpoint with a stale one. After that, the min recomputation can stay pinned below the real progress, so the subscription checkpoint and GC stop advancing until this dispatcher reports again.
Suggested fix
- dispatcherStat.checkpointTs.Store(checkpointTs)
+ util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| dispatcherStat.checkpointTs.Store(checkpointTs) | |
| util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@logservice/eventstore/event_store.go` around lines 715 - 716, The dispatcher
checkpoint write at dispatcherStat.checkpointTs.Store(checkpointTs) can regress
a newer per-dispatcher value; change the update to only advance the
per-dispatcher checkpoint if checkpointTs is greater than the currently stored
value by reading dispatcherStat.checkpointTs (via its Load or atomic read) and
using an atomic compare-and-swap loop to store the new checkpoint only when it
is strictly larger (i.e., retry CAS until success or current >= checkpointTs) so
dispatcherStat.checkpointTs never moves backward and min recomputation reflects
true progress.
What problem does this PR solve?
Issue Number: close #4992
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
Bug Fixes
Changes