Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 278 additions & 0 deletions config/samples/simplekafkacluster_5broker.yaml

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions controllers/cruisecontroltask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,22 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr
}
}
case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) > 0:
var removeTask *CruiseControlTask
brokerIDs := make([]string, 0)
// gather brokers that are marked for removal as a result of downscale operation
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) {
removeTask = task
break
brokerIDs = append(brokerIDs, task.BrokerID)
}

cruiseControlOpRef, err := r.removeBroker(ctx, instance, operationTTLSecondsAfterFinished, removeTask.BrokerID)
cruiseControlOpRef, err := r.removeBrokers(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs)
if err != nil {
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerID: %s", removeTask.BrokerID), err)
return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerIDs: %s", brokerIDs), err)
}

removeTask.SetCruiseControlOperationRef(cruiseControlOpRef)
removeTask.SetStateScheduled()
// map the CC broker removal operation with each broker status
for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) {
task.SetCruiseControlOperationRef(cruiseControlOpRef)
task.SetStateScheduled()
}

case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) > 0:
brokerLogDirsToRemove := make(map[string][]string)
Expand Down Expand Up @@ -367,8 +370,8 @@ func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaClust
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false, nil)
}

func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false, nil)
func (r *CruiseControlTaskReconciler) removeBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIDs []string) (corev1.LocalObjectReference, error) {
return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, brokerIDs, false, nil)
}

func (r *CruiseControlTaskReconciler) removeDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIdsToRemovedLogDirs map[string][]string) (corev1.LocalObjectReference, error) {
Expand Down
11 changes: 11 additions & 0 deletions controllers/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,17 @@ func TestCreateCCOperation(t *testing.T) {
assert.Equal(t, "true", params[scale.ParamExcludeRemoved])
},
},
{
operationType: banzaiv1alpha1.OperationRemoveBroker,
brokerIDs: []string{"1", "2", "3"},
isJBOD: false,
brokerIdsToLogDirs: nil,
parameterCheck: func(t *testing.T, params map[string]string) {
assert.Equal(t, "1,2,3", params[scale.ParamBrokerID])
assert.Equal(t, "true", params[scale.ParamExcludeDemoted])
assert.Equal(t, "true", params[scale.ParamExcludeRemoved])
},
},
{
operationType: banzaiv1alpha1.OperationRemoveDisks,
brokerIDs: []string{"1", "2"},
Expand Down
54 changes: 54 additions & 0 deletions controllers/tests/cruisecontroltask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,60 @@ var _ = Describe("CruiseControlTaskReconciler", func() {
}, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue())
})
})
When("multiple brokers are removed", Serial, func() {
JustBeforeEach(func(ctx SpecContext) {
kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1())
err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error {
if err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
}, kafkaCluster); err != nil {
return err
}

for _, id := range []string{"1", "2"} {
brokerState := kafkaCluster.Status.BrokersState[id]
brokerState.GracefulActionState.CruiseControlState = v1beta1.GracefulDownscaleRequired
kafkaCluster.Status.BrokersState[id] = brokerState
}
return k8sClient.Status().Update(ctx, kafkaCluster)
})
Expect(err).NotTo(HaveOccurred())
})
It("should create exactly one remove_broker CruiseControlOperation for all brokers", func(ctx SpecContext) {
Eventually(ctx, func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{
Name: kafkaCluster.Name,
Namespace: kafkaCluster.Namespace,
}, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

brokerState1, ok1 := kafkaCluster.Status.BrokersState["1"]
brokerState2, ok2 := kafkaCluster.Status.BrokersState["2"]
if !ok1 || !ok2 {
return false
}
if brokerState1.GracefulActionState.CruiseControlOperationReference == nil ||
brokerState2.GracefulActionState.CruiseControlOperationReference == nil {
return false
}

operationList := &v1alpha1.CruiseControlOperationList{}
err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace)))
Expect(err).NotTo(HaveOccurred())

if len(operationList.Items) != 1 {
return false
}
operation = &operationList.Items[0]
return operation.CurrentTaskOperation() == v1alpha1.OperationRemoveBroker &&
brokerState1.GracefulActionState.CruiseControlOperationReference.Name == operation.Name &&
brokerState2.GracefulActionState.CruiseControlOperationReference.Name == operation.Name &&
brokerState1.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled &&
brokerState2.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled
}, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue())
})
})
})

func getScaleMockCCTask1() *mocks.MockCruiseControlScaler {
Expand Down
2 changes: 2 additions & 0 deletions openspec/changes/downscale-improvements/.openspec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-05-13
79 changes: 79 additions & 0 deletions openspec/changes/downscale-improvements/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
## Context

The koperator manages Kafka cluster lifecycle via a set of reconcilers. Downscale (broker removal) is handled in two phases:

1. **KafkaCluster reconciler** (`pkg/resources/kafka/kafka.go`): Detects brokers removed from spec. Collects all such brokers in one pass and sets them all to `GracefulDownscaleRequired` via a single atomic `UpdateBrokerStatus` call.

2. **CruiseControlTask reconciler** (`controllers/cruisecontroltask_controller.go`): Picks up brokers in `GracefulDownscaleRequired` state and submits CC operations. The `add_broker` path already batches all pending brokers into one CC operation. The `remove_broker` path does not — it picks only the first task and breaks.

3. **External listener reconcilers** (`pkg/resources/envoy/`, `pkg/resources/istioingress/`): Gate broker inclusion in envoy/istio/contour config on `ShouldIncludeBroker()`. This function returns `false` when `brokerConfig == nil` (broker not in spec), causing draining brokers to vanish from external listener config immediately upon spec removal.

**Key invariant:** `GetActiveTasksByOp(OperationRemoveBroker)` only returns brokers in `GracefulDownscaleRequired` state (via `IsRequired()` → `IsRequiredState()`). Brokers already `Scheduled` or `Running` are not returned. Because the KafkaCluster reconciler transitions all removed brokers atomically, all brokers from a single manifest apply are guaranteed to be in `Required` state simultaneously when the CC task reconciler fires.

## Goals / Non-Goals

**Goals:**
- All broker IDs removed in a single manifest apply are submitted as one CC `remove_broker` operation.
- Brokers removed from spec remain in external listener config until `GracefulDownscaleSucceeded`.
- Brokers stuck in `GracefulDownscaleCompletedWithError` or `GracefulDownscalePaused` remain in external listener config (broker still holds data; manual investigation needed).

**Non-Goals:**
- Guaranteed single-operation batching across multiple sequential manifest applies (best-effort; second apply may produce a second CC operation if first batch is already `Scheduled`).
- Changes to CRD schema or the CC REST API.
- KRaft controller-only node handling (controller-only nodes skip CC graceful downscale entirely; unchanged).

## Decisions

### D1: Mirror `addBrokers` pattern for `removeBrokers`

The `addBrokers` helper (lines 366-368) already accepts `[]string` and submits one CC operation. The `removeBroker` helper (lines 370-372) takes a single `string`.

**Decision:** Rename `removeBroker` → `removeBrokers`, change signature to `[]string`, and replace the early-break loop with the collect-all pattern.

**Why not a separate aggregation layer?** The batching boundary is already correct — `GetActiveTasksByOp` returns exactly the set to batch. No new abstraction needed.

### D2: Fix `ShouldIncludeBroker` as the single gatekeeper

`ShouldIncludeBroker` is called by all external listener reconcilers (envoy configmap, service, deployment; istio gateway, virtualservice, meshgateway; contour). When `brokerConfig == nil`, the function currently falls through to `return false`.

**Decision:** Add a fallback block for `brokerConfig == nil`: check the broker's `CruiseControlState` in status. If `IsDownscale() && !IsSucceeded()` and the broker has the requested `ingressConfigName` in its `ExternalListenerConfigNames`, return `true`.

**Why `ExternalListenerConfigNames` check?** A broker may have been associated with a specific ingress config. Re-using the persisted `ExternalListenerConfigNames` (set when the pod was created, never cleared until `GracefulDownscaleSucceeded`) ensures we only retain the broker for the listener configs it actually served.

**Why `IsDownscale() && !IsSucceeded()` instead of an explicit state list?**
`IsDownscale()` covers all 6 downscale states. Excluding `IsSucceeded()` retains the broker in all non-terminal states, including `CompletedWithError` and `Paused`, which is the desired behavior for manual investigation. If new downscale states are added to the enum in future, they're covered automatically.

**Alternatives rejected:**
- Fix each caller individually: more code, same logic duplicated.
- Add a new function: unnecessary indirection; `ShouldIncludeBroker` is the right seam.

### D3: Task order to satisfy CI (green at every commit)

The original plan placed failing tests before implementation. With CI gating on green builds, the order must be:

```
Commit 1: Unit test for createCCOperation (passes immediately — tests downstream, not the wrapper)
Commit 2: Implement removeBrokers + integration test (both green together)
Commit 3: Implement ShouldIncludeBroker fix + unit test (both green together)
Commit 4: E2E test + sample manifest
```

## Risks / Trade-offs

**[Race: second manifest apply before first batch starts]** → If a user applies a second spec change (removing more brokers) before the first CC operation is created, those new brokers will be included in the same batch (still in `Required` state). If the first CC operation is already `Scheduled`, new brokers get a separate operation. Acceptable; same behavior as `add_broker`.

**[CompletedWithError brokers stay in envoy indefinitely]** → A broker that fails CC draining stays in external listener config. This is intentional (data is still present, clients need connectivity), but operators must monitor and manually recover. No change from current behavior for the envoy side — previously, the broker would have been dropped from envoy even with data present, which was worse.

**[ExternalListenerConfigNames populated assumption]** → The fix assumes `ExternalListenerConfigNames` is non-empty for any broker that was ever reconciled. This field is set on pod creation and never cleared. Clusters created before this field existed would not benefit from the fix for those brokers, but all newly created or recently reconciled brokers are covered.

## Migration Plan

No migration required. Both fixes are backwards-compatible:
- `removeBrokers` produces the same CC API calls as `removeBroker` for single-broker downscales.
- `ShouldIncludeBroker` only changes behavior for brokers with `brokerConfig == nil` (already removed from spec); existing behavior for in-spec brokers is unchanged.

Rollback: revert the two commits. No persistent state is affected.

## Open Questions

None — all decisions resolved during exploration.
26 changes: 26 additions & 0 deletions openspec/changes/downscale-improvements/proposal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
## Why

When downscaling a Kafka cluster, the operator creates one CruiseControl operation per removed broker and immediately drops removed brokers from external listener config (envoy/istio/contour), causing unnecessary partition movements and client connectivity loss during draining.

## What Changes

- **Batch broker removal**: Collect all broker IDs pending downscale and submit them as a single `remove_broker` CC operation, matching the existing `add_broker` batching behavior.
- **Retain draining brokers in external listeners**: Keep removed brokers in envoy/istio/contour config until CruiseControl finishes draining them (`GracefulDownscaleSucceeded`), so clients retain connectivity while data is being moved.

## Capabilities

### New Capabilities

- `batched-broker-removal`: Single CruiseControl `remove_broker` operation for all brokers removed in a manifest apply, eliminating redundant partition movements.
- `draining-broker-listener-retention`: Brokers removed from spec but still being drained by CruiseControl remain visible in all external listener resources until draining completes.

### Modified Capabilities

<!-- none — no existing specs to delta -->

## Impact

- `controllers/cruisecontroltask_controller.go`: Replace single-broker `removeBroker` logic with `removeBrokers` (plural), mirroring `addBrokers` pattern.
- `pkg/util/util.go`: `ShouldIncludeBroker` gains a fallback path for `brokerConfig == nil` that checks `CruiseControlState`.
- All external listener reconcilers (`pkg/resources/envoy/`, `pkg/resources/istioingress/`) benefit automatically — they all gate on `ShouldIncludeBroker`.
- No API or CRD changes. No breaking changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## ADDED Requirements

### Requirement: Broker removals from a single manifest apply are batched into one CC operation
When a KafkaCluster spec is applied that removes N brokers, the operator SHALL submit a single `remove_broker` CruiseControl operation containing all N broker IDs, rather than N separate operations.

#### Scenario: Multiple brokers removed simultaneously
- **WHEN** a KafkaCluster spec is applied removing brokers 3 and 4 from a 5-broker cluster
- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created
- **THEN** the operation's broker ID parameter contains both broker IDs (e.g. `"3,4"`)
- **THEN** both brokers transition to `GracefulDownscaleScheduled` referencing the same operation

#### Scenario: Single broker removal (unchanged behavior)
- **WHEN** a KafkaCluster spec is applied removing one broker
- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created
- **THEN** the broker transitions to `GracefulDownscaleScheduled`

#### Scenario: Brokers already scheduled are not re-batched
- **WHEN** broker 3 is already in `GracefulDownscaleScheduled` state and broker 4 enters `GracefulDownscaleRequired`
- **THEN** a new `CruiseControlOperation` is created for broker 4 only
- **THEN** broker 3's existing operation is not modified
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## ADDED Requirements

### Requirement: Draining brokers remain in external listener config until CC completes
A broker removed from the KafkaCluster spec SHALL remain present in all external listener resources (envoy, istio, contour) while CruiseControl is actively draining it, and SHALL be removed only after `GracefulDownscaleSucceeded`.

#### Scenario: Broker retained during active draining
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRunning`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker
- **THEN** the broker remains in envoy configmap, service, and deployment resources

#### Scenario: Broker retained when CC operation is scheduled but not yet running
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleScheduled`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained when CC has not yet started (Required state)
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRequired`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained on CC error (manual investigation needed)
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleCompletedWithError`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker retained when CC operation is paused
- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscalePaused`
- **THEN** `ShouldIncludeBroker` returns `true` for that broker

#### Scenario: Broker removed from listener config after successful drain
- **WHEN** a broker's `CruiseControlState` transitions to `GracefulDownscaleSucceeded`
- **THEN** `ShouldIncludeBroker` returns `false` for that broker
- **THEN** the broker is removed from all external listener resources on the next reconcile

#### Scenario: Unknown broker excluded
- **WHEN** a broker ID has no entry in `BrokersState` (unknown broker)
- **THEN** `ShouldIncludeBroker` returns `false` for that broker
Loading
Loading