Skip to content

feat(shard): add v2 consumer-offset commands with AckLevel#3151

Open
krishvishal wants to merge 5 commits intoapache:masterfrom
krishvishal:extend-cons-offset
Open

feat(shard): add v2 consumer-offset commands with AckLevel#3151
krishvishal wants to merge 5 commits intoapache:masterfrom
krishvishal:extend-cons-offset

Conversation

@krishvishal
Copy link
Copy Markdown
Contributor

Summary

  • Introduce AckLevel { NoAck, Quorum } and v2 wire commands StoreConsumerOffset2 / DeleteConsumerOffset2 that carry an explicit ack byte; v1 commands and wire format are untouched.
  • Unify the server's two offset-write paths so PollMessages auto-commit and explicit stores share a single shard entry point, with AckLevel selecting the policy.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 91.25964% with 34 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.65%. Comparing base (e1b1b22) to head (80165a5).

Files with missing lines Patch % Lines
core/simulator/src/client.rs 0.00% 28 Missing ⚠️
core/partitions/src/iggy_partition.rs 40.00% 3 Missing ⚠️
core/consensus/src/observability.rs 0.00% 2 Missing ⚠️
core/binary_protocol/src/primitives/ack_level.rs 97.61% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3151      +/-   ##
============================================
+ Coverage     73.60%   73.65%   +0.04%     
  Complexity      943      943              
============================================
  Files          1141     1144       +3     
  Lines        100863   101211     +348     
  Branches      78044    78409     +365     
============================================
+ Hits          74243    74549     +306     
- Misses        23960    23975      +15     
- Partials       2660     2687      +27     
Components Coverage Δ
Rust Core 74.65% <91.25%> (+0.09%) ⬆️
Java SDK 62.30% <ø> (ø)
C# SDK 69.09% <ø> (-0.29%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.40% <ø> (-0.13%) ⬇️
Go SDK 39.43% <ø> (ø)
Files with missing lines Coverage Δ
core/binary_protocol/src/codes.rs 100.00% <ø> (ø)
core/binary_protocol/src/consensus/operation.rs 96.11% <100.00%> (+0.15%) ⬆️
core/binary_protocol/src/dispatch.rs 91.45% <100.00%> (+0.30%) ⬆️
...uests/consumer_offsets/delete_consumer_offset_2.rs 100.00% <100.00%> (ø)
...quests/consumer_offsets/store_consumer_offset_2.rs 100.00% <100.00%> (ø)
core/consensus/src/client_table.rs 93.72% <100.00%> (ø)
core/binary_protocol/src/primitives/ack_level.rs 97.61% <97.61%> (ø)
core/consensus/src/observability.rs 37.26% <0.00%> (-0.18%) ⬇️
core/partitions/src/iggy_partition.rs 36.45% <40.00%> (+0.05%) ⬆️
core/simulator/src/client.rs 45.27% <0.00%> (-7.33%) ⬇️

... and 32 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread core/common/src/traits/consumer_offset_client.rs Outdated
@numinnex
Copy link
Copy Markdown
Contributor

This command suppose to be implemented only in the components that are used by the cluster which is in development, not by our core server.

Comment on lines 1407 to 1438
fn parse_consumer_offset_payload(
operation: Operation,
body: &[u8],
) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
let consumer_id = body
.get(1..5)
.ok_or(IggyError::InvalidCommand)
.and_then(|bytes| {
<[u8; 4]>::try_from(bytes)
.map(u32::from_le_bytes)
.map_err(|_| IggyError::InvalidCommand)
})?;
let kind = ConsumerKind::from_code(consumer_kind)?;
match operation {
Operation::StoreConsumerOffset => {
Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => {
let offset =
body.get(5..13)
.ok_or(IggyError::InvalidCommand)
.and_then(|bytes| {
<[u8; 8]>::try_from(bytes)
.map(u64::from_le_bytes)
.map_err(|_| IggyError::InvalidCommand)
})?;
Ok((kind, consumer_id, Some(offset)))
}
Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => {
Ok((kind, consumer_id, None))
}
_ => Err(IggyError::InvalidCommand),
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place that actually processes v2 commands (both parse_consumer_offset_request and parse_staged_consumer_offset_commit delegate here), and it handles StoreConsumerOffset2 in the same branch as v1 without ever reading the trailing ack byte. For store, parsing stops at body[5..13] (byte 12). For delete, it stops at body[1..5] (byte 4). The ack byte just falls off the end.

Two concerns here. First, AckLevel has literally no effect on behavior right now, but the PR description says it "selects the policy." I think the description should be explicit that the ack byte is wire-reserved only and behavioral branching comes later. Otherwise future contributors might assume this already works.

Second, and more importantly from a protocol correctness angle: an invalid ack value (e.g. 0xFF) would be silently accepted with no validation error. Wouldn't it be better to at least validate the byte here, even if you don't use it yet? Something like reading the ack byte and calling AckLevel::from_code() to reject garbage early. That way you get fail-fast behavior for malformed messages instead of silently swallowing bad data through the pipeline.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to make sure we use the AckLevel as that is the main difference between the old request and new one.

Comment on lines +189 to +220
pub fn store_consumer_offset_v2(
&self,
namespace: IggyNamespace,
consumer_kind: u8,
consumer_id: u32,
offset: u64,
ack: AckLevel,
) -> Message<RequestHeader> {
let mut payload = Vec::with_capacity(14);
payload.push(consumer_kind);
payload.extend_from_slice(&consumer_id.to_le_bytes());
payload.extend_from_slice(&offset.to_le_bytes());
payload.push(ack.as_u8());

self.build_request_with_namespace(Operation::StoreConsumerOffset2, &payload, namespace)
}

/// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
pub fn delete_consumer_offset_v2(
&self,
namespace: IggyNamespace,
consumer_kind: u8,
consumer_id: u32,
ack: AckLevel,
) -> Message<RequestHeader> {
let mut payload = Vec::with_capacity(6);
payload.push(consumer_kind);
payload.extend_from_slice(&consumer_id.to_le_bytes());
payload.push(ack.as_u8());

self.build_request_with_namespace(Operation::DeleteConsumerOffset2, &payload, namespace)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store_consumer_offset_v2 and delete_consumer_offset_v2 are defined here but never called anywhere in the codebase. Similarly, StoreConsumerOffset2Request / DeleteConsumerOffset2Request in binary_protocol are only exercised in their own unit tests.

This means the v2 path has zero end-to-end coverage. The wire types have roundtrip tests which is great, but there's nothing proving a v2 message built by the simulator actually flows through the consensus pipeline and gets parsed correctly by parse_consumer_offset_payload. I think at minimum there should be a simulator-level test (or even just a unit test in iggy_partition.rs) that builds a v2 store/delete message and runs it through the partition parser to confirm the bytes land in the right places. Otherwise this is a lot of scaffolding with no integration confidence.

Comment on lines +142 to +151
// v2 consumer-offset ops are registered in the dispatch table for the
// consensus/simulator pathway but are not wired into the legacy binary
// server's dispatch. They'll move into server-ng alongside the rest of
// the v2 surface; re-enable these codes here once that lands.
if matches!(
code,
STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE
) {
continue;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "re-enable these codes here once that lands" but there's no issue/ticket reference to track it. In my experience these continue skips tend to get forgotten once the PR is merged. Would it be worth adding a // TODO(#XXXX) with an issue number so this shows up in tracking? Or even a compile_error! behind a feature flag that fires when server-ng v2 dispatch lands, forcing someone to come back here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants