Add MessageBufferConfig to allow custom back-pressure config for message.new events#6472
Add MessageBufferConfig to allow custom back-pressure config for message.new events#6472VelikovPetar wants to merge 2 commits into
MessageBufferConfig to allow custom back-pressure config for message.new events#6472Conversation
…nfig for message.new events Co-Authored-By: Claude <noreply@anthropic.com>
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
SDK Size Comparison 📏
|
WalkthroughThis PR introduces configurable bounded buffering for socket ChangesMessage Buffering and Overflow Handling
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`:
- Around line 133-136: Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.
- Around line 218-221: Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: c56d6018-28d0-4d38-a129-e44964a672b9
📒 Files selected for processing (7)
stream-chat-android-client/api/stream-chat-android-client.apistream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.ktstream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.ktstream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/plugin/factory/StreamStatePluginFactory.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/TotalUnreadCountTest.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialTest.ktstream-chat-android-client/src/test/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.kt
| public data class MessageLimitConfig( | ||
| public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(), | ||
| public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(), | ||
| ) |
There was a problem hiding this comment.
Keep MessageLimitConfig binary compatible.
Adding messageBufferConfig to the primary constructor changes the generated public data-class ABI: the existing one-arg constructor disappears and the copy / componentN signatures change as well. That will break already-compiled SDK consumers on upgrade, so this needs an additive shape instead of a primary-constructor change.
As per coding guidelines, "Favour additive API changes and mark deprecations with clear migration paths; validate public APIs and maintain binary compatibility".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 133 - 136, Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.
| public data class MessageBufferConfig( | ||
| public val channelTypes: Set<String> = emptySet(), | ||
| public val capacity: Int = Int.MAX_VALUE, | ||
| public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -nC3 'MessageBufferConfig|capacity|MutableSharedFlow|extraBufferCapacity|onBufferOverflow|require\s*\(' stream-chat-android-client/src/mainRepository: GetStream/stream-chat-android
Length of output: 27801
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig + its KDoc
nl -ba "$FILE" | sed -n '150,270p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
nl -ba "$FILE2" | sed -n '140,230p'
# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android
Length of output: 116
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig KDoc and data class
cat -n "$FILE" | sed -n '150,260p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
cat -n "$FILE2" | sed -n '150,230p'
# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android
Length of output: 9460
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
cat -n "$FILE" | sed -n '150,260p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
cat -n "$FILE2" | sed -n '150,230p'
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android
Length of output: 9460
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Look for any require/validation mentioning MessageBufferConfig or capacity
rg -n --hidden --no-ignore-vcs \
"MessageBufferConfig|bufferConfig\.capacity|capacity must|capacity must be|require\s*\(.*capacity|capacity\s*[><=]\s*\d" \
stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S
# 2) Find where MessageBufferConfig is instantiated/propagated
rg -n "MessageBufferConfig\(" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android
Length of output: 2527
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n --hidden --no-ignore-vcs \
"(MessageBufferConfig\s*\.?\s*capacity|bufferConfig\.capacity|capacity must be|require\s*\(.*capacity|capacity\s*[><]=?\s*[0-9]+)" \
. -SRepository: GetStream/stream-chat-android
Length of output: 274
🌐 Web query:
Kotlinx coroutines MutableSharedFlow extraBufferCapacity negative value behavior
💡 Result:
In kotlinx.coroutines, MutableSharedFlow(extraBufferCapacity=…) does not have any runtime “negative value” behavior: it validates inputs and throws IllegalArgumentException if extraBufferCapacity is negative. Specifically, the MutableSharedFlow(replay, extraBufferCapacity, …) constructor function contains a check require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }, so passing a negative extraBufferCapacity will fail immediately with IllegalArgumentException (before any flow emissions). [1] Related notes: - The documentation for MutableSharedFlow also states that extraBufferCapacity “cannot be negative”. [2] - There is additional validation logic for the combination of (replay, extraBufferCapacity, onBufferOverflow): if you choose a non-default onBufferOverflow strategy other than BufferOverflow.SUSPEND, then replay or extraBufferCapacity must be > 0; otherwise it will also throw. [1][2] So for your query “MutableSharedFlow extraBufferCapacity negative value behavior”: the behavior is “constructor rejects the value and throws IllegalArgumentException,” not a special buffering/overflow semantics. [1][2]
Citations:
- 1: https://github.com/kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
- 2: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-mutable-shared-flow.html
Reject non-positive capacity values at the API boundary.
MessageBufferConfig.capacity is passed directly as extraBufferCapacity when constructing the buffered MutableSharedFlow with onBufferOverflow = DROP_OLDEST/DROP_LATEST. MutableSharedFlow rejects negative values and also requires extraBufferCapacity > 0 for DROP_* (replay defaults to 0), so capacity <= 0 will surface as an IllegalArgumentException at runtime when buffering becomes active.
Suggested guard
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
-)
+) {
+ init {
+ require(capacity > 0) { "capacity must be > 0" }
+ }
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public data class MessageBufferConfig( | |
| public val channelTypes: Set<String> = emptySet(), | |
| public val capacity: Int = Int.MAX_VALUE, | |
| public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST, | |
| public data class MessageBufferConfig( | |
| public val channelTypes: Set<String> = emptySet(), | |
| public val capacity: Int = Int.MAX_VALUE, | |
| public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST, | |
| ) { | |
| init { | |
| require(capacity > 0) { "capacity must be > 0" } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 218 - 221, Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.
MessageBufferConfig to allow custom back-pressure config for message.new events
|


Goal
Ports V6 PR #6406 (commit
3b6c6546418d010ba4dc29847459bece845bd878) todevelop.Linear issue: AND-1166
Manual port (not a clean cherry-pick) — v7 restructured
stream-chat-android-stateintostream-chat-android-clientand mergedStatePluginConfigfields intoChatClientConfig. As a consequence the file paths, packages, and theMessageLimitConfiglocation all moved:stream-chat-android-statemodulestream-chat-android-clientmoduleio.getstream.chat.android.state.plugin.config.MessageLimitConfigio.getstream.chat.android.client.api.MessageLimitConfigStatePluginConfig.messageLimitConfigChatClientConfig.messageLimitConfigSame intent as the V6 PR — high-traffic channel types (e.g. livestreams) can produce a flood of
message.newevents that arrive faster than the sequential event-handling pipeline can process them. The current implementation funnels every socket event through a singleMutableSharedFlowwithextraBufferCapacity = Int.MAX_VALUE, which means there is no back-pressure: a burst of new-message events queues up unbounded memory and starves more important signals (reads, bans, member updates) of timely processing.This PR introduces a
MessageBufferConfig(underMessageLimitConfig.messageBufferConfig) that lets integrators opt specific channel types into a bounded buffer forNewMessageEvents, with a configurable overflow strategy (DROP_OLDEST/DROP_LATEST). Signal-critical events and events for non-opted-in channel types are unaffected. Default is a no-op.Implementation
MessageBufferConfig(underMessageLimitConfig.messageBufferConfig) exposing:channelTypes: Set<String>— channel types whoseNewMessageEvents go through the bounded buffer (empty by default → feature is a no-op).capacity: Int— buffer capacity (defaults toInt.MAX_VALUE).overflow: MessageBufferOverflow— overflow strategy (DROP_OLDEST/DROP_LATEST, defaults toDROP_OLDEST).EventHandlerSequentialnow allocates a secondaryMutableSharedFlow(bufferedNewMessageEvents) lazily, only when buffering is enabled, so the default configuration pays no cost for it.defaultSocketEventListener— the existing unbuffered path; used when no channel types are opted in.bufferedSocketEventListener— routesNewMessageEvents for opted-in channel types to the bounded flow, and everything else (including non-opted-inNewMessageEvents and all other event types) to the unbuffered flow.startListening()picks the listener based onbufferConfig.channelTypes.isNotEmpty()and only collects frombufferedNewMessageEventswhen buffering is enabled.StreamStatePluginFactorywires the config fromChatClientConfig.messageLimitConfig.messageBufferConfigintoEventHandlerSequential.The bounded flow shares the same downstream pipeline (
socketEventCollector→handleBatchEvent) as the unbuffered flow, so ordering inside each flow is preserved and back-pressure is applied independently per flow.Files changed (7)
stream-chat-android-client/api/stream-chat-android-client.api— auto-regenerated viaapiDumpstream-chat-android-client/.../api/ChatClientConfig.ktstream-chat-android-client/.../internal/state/event/handler/internal/EventHandlerSequential.ktstream-chat-android-client/.../internal/state/plugin/factory/StreamStatePluginFactory.ktstream-chat-android-client/.../test/.../internal/state/event/TotalUnreadCountTest.ktstream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialTest.ktstream-chat-android-client/.../test/.../internal/state/event/handler/internal/EventHandlerSequentialUserMessagesDeletedTest.ktStats match V6 exactly: +392 / −26.
UI Changes
No UI changes.
Testing
EventHandlerSequentialTestcovering:NewMessageEvents for opted-in channel types are routed through the bounded buffer.NewMessageEvents for non-opted-in channel types and all non-NewMessageEventevents keep using the unbuffered path.DROP_OLDEST/DROP_LATESToverflow strategies behave as expected when the buffer is full.TotalUnreadCountTest,EventHandlerSequentialUserMessagesDeletedTest) updated to pass the newbufferConfigargument.Verification on develop:
EventHandlerSequentialTest,EventHandlerSequentialUserMessagesDeletedTest,TotalUnreadCountTest,StateRegistryTest)Summary by CodeRabbit