feat(shard): add v2 consumer-offset commands with AckLevel#3151
feat(shard): add v2 consumer-offset commands with AckLevel#3151krishvishal wants to merge 5 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3151 +/- ##
============================================
+ Coverage 73.60% 73.65% +0.04%
Complexity 943 943
============================================
Files 1141 1144 +3
Lines 100863 101211 +348
Branches 78044 78409 +365
============================================
+ Hits 74243 74549 +306
- Misses 23960 23975 +15
- Partials 2660 2687 +27
🚀 New features to boost your workflow:
|
|
This command suppose to be implemented only in the components that are used by the cluster which is in development, not by our core server. |
bbb3eee to
a3b9e66
Compare
| fn parse_consumer_offset_payload( | ||
| operation: Operation, | ||
| body: &[u8], | ||
| ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> { | ||
| let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; | ||
| let consumer_id = body | ||
| .get(1..5) | ||
| .ok_or(IggyError::InvalidCommand) | ||
| .and_then(|bytes| { | ||
| <[u8; 4]>::try_from(bytes) | ||
| .map(u32::from_le_bytes) | ||
| .map_err(|_| IggyError::InvalidCommand) | ||
| })?; | ||
| let kind = ConsumerKind::from_code(consumer_kind)?; | ||
| match operation { | ||
| Operation::StoreConsumerOffset => { | ||
| Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { | ||
| let offset = | ||
| body.get(5..13) | ||
| .ok_or(IggyError::InvalidCommand) | ||
| .and_then(|bytes| { | ||
| <[u8; 8]>::try_from(bytes) | ||
| .map(u64::from_le_bytes) | ||
| .map_err(|_| IggyError::InvalidCommand) | ||
| })?; | ||
| Ok((kind, consumer_id, Some(offset))) | ||
| } | ||
| Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)), | ||
| Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { | ||
| Ok((kind, consumer_id, None)) | ||
| } | ||
| _ => Err(IggyError::InvalidCommand), | ||
| } | ||
| } |
There was a problem hiding this comment.
This is the only place that actually processes v2 commands (both parse_consumer_offset_request and parse_staged_consumer_offset_commit delegate here), and it handles StoreConsumerOffset2 in the same branch as v1 without ever reading the trailing ack byte. For store, parsing stops at body[5..13] (byte 12). For delete, it stops at body[1..5] (byte 4). The ack byte just falls off the end.
Two concerns here. First, AckLevel has literally no effect on behavior right now, but the PR description says it "selects the policy." I think the description should be explicit that the ack byte is wire-reserved only and behavioral branching comes later. Otherwise future contributors might assume this already works.
Second, and more importantly from a protocol correctness angle: an invalid ack value (e.g. 0xFF) would be silently accepted with no validation error. Wouldn't it be better to at least validate the byte here, even if you don't use it yet? Something like reading the ack byte and calling AckLevel::from_code() to reject garbage early. That way you get fail-fast behavior for malformed messages instead of silently swallowing bad data through the pipeline.
There was a problem hiding this comment.
Yeah, we need to make sure we use the AckLevel as that is the main difference between the old request and new one.
| pub fn store_consumer_offset_v2( | ||
| &self, | ||
| namespace: IggyNamespace, | ||
| consumer_kind: u8, | ||
| consumer_id: u32, | ||
| offset: u64, | ||
| ack: AckLevel, | ||
| ) -> Message<RequestHeader> { | ||
| let mut payload = Vec::with_capacity(14); | ||
| payload.push(consumer_kind); | ||
| payload.extend_from_slice(&consumer_id.to_le_bytes()); | ||
| payload.extend_from_slice(&offset.to_le_bytes()); | ||
| payload.push(ack.as_u8()); | ||
|
|
||
| self.build_request_with_namespace(Operation::StoreConsumerOffset2, &payload, namespace) | ||
| } | ||
|
|
||
| /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte. | ||
| pub fn delete_consumer_offset_v2( | ||
| &self, | ||
| namespace: IggyNamespace, | ||
| consumer_kind: u8, | ||
| consumer_id: u32, | ||
| ack: AckLevel, | ||
| ) -> Message<RequestHeader> { | ||
| let mut payload = Vec::with_capacity(6); | ||
| payload.push(consumer_kind); | ||
| payload.extend_from_slice(&consumer_id.to_le_bytes()); | ||
| payload.push(ack.as_u8()); | ||
|
|
||
| self.build_request_with_namespace(Operation::DeleteConsumerOffset2, &payload, namespace) | ||
| } |
There was a problem hiding this comment.
store_consumer_offset_v2 and delete_consumer_offset_v2 are defined here but never called anywhere in the codebase. Similarly, StoreConsumerOffset2Request / DeleteConsumerOffset2Request in binary_protocol are only exercised in their own unit tests.
This means the v2 path has zero end-to-end coverage. The wire types have roundtrip tests which is great, but there's nothing proving a v2 message built by the simulator actually flows through the consensus pipeline and gets parsed correctly by parse_consumer_offset_payload. I think at minimum there should be a simulator-level test (or even just a unit test in iggy_partition.rs) that builds a v2 store/delete message and runs it through the partition parser to confirm the bytes land in the right places. Otherwise this is a lot of scaffolding with no integration confidence.
| // v2 consumer-offset ops are registered in the dispatch table for the | ||
| // consensus/simulator pathway but are not wired into the legacy binary | ||
| // server's dispatch. They'll move into server-ng alongside the rest of | ||
| // the v2 surface; re-enable these codes here once that lands. | ||
| if matches!( | ||
| code, | ||
| STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE | ||
| ) { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
The comment says "re-enable these codes here once that lands" but there's no issue/ticket reference to track it. In my experience these continue skips tend to get forgotten once the PR is merged. Would it be worth adding a // TODO(#XXXX) with an issue number so this shows up in tracking? Or even a compile_error! behind a feature flag that fires when server-ng v2 dispatch lands, forcing someone to come back here.
Summary
AckLevel { NoAck, Quorum }and v2 wire commandsStoreConsumerOffset2/DeleteConsumerOffset2that carry an explicit ack byte; v1 commands and wire format are untouched.PollMessagesauto-commit and explicit stores share a single shard entry point, withAckLevelselecting the policy.