diff --git a/config/samples/simplekafkacluster_5broker.yaml b/config/samples/simplekafkacluster_5broker.yaml new file mode 100644 index 000000000..06d3dbf0f --- /dev/null +++ b/config/samples/simplekafkacluster_5broker.yaml @@ -0,0 +1,278 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + labels: + controller-tools.k8s.io: "1.0" + name: kafka +spec: + kRaft: false + monitoringConfig: + jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0" + headlessServiceEnabled: true + zkAddresses: + - "zookeeper-server-client.zookeeper:2181" + propagateLabels: false + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=2 + brokerConfigGroups: + default: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + brokerAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "9020" + # brokerLabels: + # kafka_broker_group: "default_group" + brokers: + - id: 0 + brokerConfigGroup: "default" + # brokerConfig: + # envs: + # - name: +CLASSPATH + # value: "/opt/kafka/libs/dev/*:" + # - name: CLASSPATH+ + # value: ":/opt/kafka/libs/extra-jars/*" + - id: 1 + brokerConfigGroup: "default" + - id: 2 + brokerConfigGroup: "default" + - id: 3 + brokerConfigGroup: "default" + - id: 4 + brokerConfigGroup: "default" + rollingUpgradeConfig: + failureThreshold: 1 + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true + cruiseControlConfig: + # podSecurityContext: + # runAsNonRoot: false + # securityContext: + # privileged: true + cruiseControlTaskSpec: + RetryDurationMinutes: 5 + topicConfig: + partitions: 12 + replicationFactor: 3 +# resourceRequirements: +# requests: +# cpu: 500m +# memory: 1Gi +# limits: +# cpu: 500m +# memory: 1Gi + image: "adobe/cruise-control:3.0.3-adbe-20250804" + config: | + # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + # + # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details. + # Configuration for the metadata client. + # ======================================= + # The maximum interval in milliseconds between two metadata refreshes. + #metadata.max.age.ms=300000 + # Client id for the Cruise Control. It is used for the metadata client. + #client.id=kafka-cruise-control + # The size of TCP send buffer bytes for the metadata client. + #send.buffer.bytes=131072 + # The size of TCP receive buffer size for the metadata client. + #receive.buffer.bytes=131072 + # The time to wait before disconnect an idle TCP connection. + #connections.max.idle.ms=540000 + # The time to wait before reconnect to a given host. + #reconnect.backoff.ms=50 + # The time to wait for a response from a host after sending a request. + #request.timeout.ms=30000 + # Configurations for the load monitor + # ======================================= + # The number of metric fetcher thread to fetch metrics for the Kafka cluster + num.metric.fetchers=1 + # The metric sampler class + metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler + # Configurations for CruiseControlMetricsReporterSampler + metric.reporter.topic.pattern=__CruiseControlMetrics + # The sample store class name + sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore + # The config for the Kafka sample store to save the partition metric samples + partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples + # The config for the Kafka sample store to save the model training samples + broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples + # The replication factor of Kafka metric sample store topic + sample.store.topic.replication.factor=2 + # The config for the number of Kafka sample store consumer threads + num.sample.loading.threads=8 + # The partition assignor class for the metric samplers + metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor + # The metric sampling interval in milliseconds + metric.sampling.interval.ms=120000 + metric.anomaly.detection.interval.ms=180000 + # The partition metrics window size in milliseconds + partition.metrics.window.ms=300000 + # The number of partition metric windows to keep in memory + num.partition.metrics.windows=1 + # The minimum partition metric samples required for a partition in each window + min.samples.per.partition.metrics.window=1 + # The broker metrics window size in milliseconds + broker.metrics.window.ms=300000 + # The number of broker metric windows to keep in memory + num.broker.metrics.windows=20 + # The minimum broker metric samples required for a partition in each window + min.samples.per.broker.metrics.window=1 + # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities) + capacity.config.file=config/capacity.json + #capacity.config.file=config/capacityJBOD.json + # Configurations for the analyzer + # ======================================= + # The list of goals to optimize the Kafka cluster for with pre-computed proposals + default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal + # The list of supported goals + goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal + # The list of supported hard goals + hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The minimum percentage of well monitored partitions out of all the partitions + min.monitored.partition.percentage=0.95 + # The balance threshold for CPU + cpu.balance.threshold=1.1 + # The balance threshold for disk + disk.balance.threshold=1.1 + # The balance threshold for network inbound utilization + network.inbound.balance.threshold=1.1 + # The balance threshold for network outbound utilization + network.outbound.balance.threshold=1.1 + # The balance threshold for the replica count + replica.count.balance.threshold=1.1 + # The capacity threshold for CPU in percentage + cpu.capacity.threshold=0.8 + # The capacity threshold for disk in percentage + disk.capacity.threshold=0.8 + # The capacity threshold for network inbound utilization in percentage + network.inbound.capacity.threshold=0.8 + # The capacity threshold for network outbound utilization in percentage + network.outbound.capacity.threshold=0.8 + # The threshold to define the cluster to be in a low CPU utilization state + cpu.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + disk.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low network inbound utilization state + network.inbound.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + network.outbound.low.utilization.threshold=0.0 + # The metric anomaly percentile upper threshold + metric.anomaly.percentile.upper.threshold=90.0 + # The metric anomaly percentile lower threshold + metric.anomaly.percentile.lower.threshold=10.0 + # How often should the cached proposal be expired and recalculated if necessary + proposal.expiration.ms=60000 + # The maximum number of replicas that can reside on a broker at any given time. + max.replicas.per.broker=10000 + # The number of threads to use for proposal candidate precomputing. + num.proposal.precompute.threads=1 + # the topics that should be excluded from the partition movement. + #topics.excluded.from.partition.movement + # Configurations for the executor + # ======================================= + # The max number of partitions to move in/out on a given broker at a given time. + num.concurrent.partition.movements.per.broker=10 + # The interval between two execution progress checks. + execution.progress.check.interval.ms=10000 + # Configurations for anomaly detector + # ======================================= + # The goal violation notifier class + anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier + # The metric anomaly finder class + metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder + # The anomaly detection interval + anomaly.detection.interval.ms=10000 + # The goal violation to detect. + anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The interested metrics for metric anomaly analyzer. + metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN + ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics. + #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH + # The zk path to store failed broker information. + failed.brokers.zk.path=/CruiseControlBrokerList + # Topic config provider class + topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider + # The cluster configurations for the KafkaTopicConfigProvider + cluster.configs.file=config/clusterConfigs.json + # The maximum time in milliseconds to store the response and access details of a completed user task. + completed.user.task.retention.time.ms=21600000 + # The maximum time in milliseconds to retain the demotion history of brokers. + demotion.history.retention.time.ms=86400000 + # The maximum number of completed user tasks for which the response and access details will be cached. + max.cached.completed.user.tasks=500 + # The maximum number of user tasks for concurrently running in async endpoints across all users. + max.active.user.tasks=25 + # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled + self.healing.enabled=true + # Enable self healing for broker failure detector + #self.healing.broker.failure.enabled=true + # Enable self healing for goal violation detector + #self.healing.goal.violation.enabled=true + # Enable self healing for metric anomaly detector + #self.healing.metric.anomaly.enabled=true + # configurations for the webserver + # ================================ + # HTTP listen port + webserver.http.port=9090 + # HTTP listen address + webserver.http.address=0.0.0.0 + # Whether CORS support is enabled for API or not + webserver.http.cors.enabled=false + # Value for Access-Control-Allow-Origin + webserver.http.cors.origin=http://localhost:8080/ + # Value for Access-Control-Request-Method + webserver.http.cors.allowmethods=OPTIONS,GET,POST + # Headers that should be exposed to the Browser (Webapp) + # This is a special header that is used by the + # User Tasks subsystem and should be explicitly + # Enabled when CORS mode is used as part of the + # Admin Interface + webserver.http.cors.exposeheaders=User-Task-ID + # REST API default prefix + # (dont forget the ending *) + webserver.api.urlprefix=/kafkacruisecontrol/* + # Location where the Cruise Control frontend is deployed + webserver.ui.diskpath=./cruise-control-ui/dist/ + # URL path prefix for UI + # (dont forget the ending *) + webserver.ui.urlprefix=/* + # Time After which request is converted to Async + webserver.request.maxBlockTimeMs=10000 + # Default Session Expiry Period + webserver.session.maxExpiryTimeMs=60000 + # Session cookie path + webserver.session.path=/ + # Server Access Logs + webserver.accesslog.enabled=true + # Location of HTTP Request Logs + webserver.accesslog.path=access.log + # HTTP Request Log retention days + webserver.accesslog.retention.days=14 + clusterConfig: | + { + "min.insync.replicas": 2 + } diff --git a/controllers/cruisecontroltask_controller.go b/controllers/cruisecontroltask_controller.go index 0a4cb15c2..07e0fde7f 100644 --- a/controllers/cruisecontroltask_controller.go +++ b/controllers/cruisecontroltask_controller.go @@ -171,19 +171,22 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr } } case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) > 0: - var removeTask *CruiseControlTask + brokerIDs := make([]string, 0) + // gather brokers that are marked for removal as a result of downscale operation for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) { - removeTask = task - break + brokerIDs = append(brokerIDs, task.BrokerID) } - cruiseControlOpRef, err := r.removeBroker(ctx, instance, operationTTLSecondsAfterFinished, removeTask.BrokerID) + cruiseControlOpRef, err := r.removeBrokers(ctx, instance, operationTTLSecondsAfterFinished, brokerIDs) if err != nil { - return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerID: %s", removeTask.BrokerID), err) + return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for downscale has failed, brokerIDs: %s", brokerIDs), err) } - removeTask.SetCruiseControlOperationRef(cruiseControlOpRef) - removeTask.SetStateScheduled() + // map the CC broker removal operation with each broker status + for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveBroker) { + task.SetCruiseControlOperationRef(cruiseControlOpRef) + task.SetStateScheduled() + } case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) > 0: brokerLogDirsToRemove := make(map[string][]string) @@ -367,8 +370,8 @@ func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaClust return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false, nil) } -func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) { - return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false, nil) +func (r *CruiseControlTaskReconciler) removeBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIDs []string) (corev1.LocalObjectReference, error) { + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, brokerIDs, false, nil) } func (r *CruiseControlTaskReconciler) removeDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIdsToRemovedLogDirs map[string][]string) (corev1.LocalObjectReference, error) { diff --git a/controllers/cruisecontroltask_controller_test.go b/controllers/cruisecontroltask_controller_test.go index 600537b75..1b467fdfe 100644 --- a/controllers/cruisecontroltask_controller_test.go +++ b/controllers/cruisecontroltask_controller_test.go @@ -343,6 +343,17 @@ func TestCreateCCOperation(t *testing.T) { assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) }, }, + { + operationType: banzaiv1alpha1.OperationRemoveBroker, + brokerIDs: []string{"1", "2", "3"}, + isJBOD: false, + brokerIdsToLogDirs: nil, + parameterCheck: func(t *testing.T, params map[string]string) { + assert.Equal(t, "1,2,3", params[scale.ParamBrokerID]) + assert.Equal(t, "true", params[scale.ParamExcludeDemoted]) + assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) + }, + }, { operationType: banzaiv1alpha1.OperationRemoveDisks, brokerIDs: []string{"1", "2"}, diff --git a/controllers/tests/cruisecontroltask_controller_test.go b/controllers/tests/cruisecontroltask_controller_test.go index 3160c990a..a0e5accb4 100644 --- a/controllers/tests/cruisecontroltask_controller_test.go +++ b/controllers/tests/cruisecontroltask_controller_test.go @@ -456,6 +456,60 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue()) }) }) + When("multiple brokers are removed", Serial, func() { + JustBeforeEach(func(ctx SpecContext) { + kafkaClusterCCReconciler.ScaleFactory = mocks.NewMockScaleFactory(getScaleMockCCTask1()) + err := util.RetryOnConflict(util.DefaultBackOffForConflict, func() error { + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, kafkaCluster); err != nil { + return err + } + + for _, id := range []string{"1", "2"} { + brokerState := kafkaCluster.Status.BrokersState[id] + brokerState.GracefulActionState.CruiseControlState = v1beta1.GracefulDownscaleRequired + kafkaCluster.Status.BrokersState[id] = brokerState + } + return k8sClient.Status().Update(ctx, kafkaCluster) + }) + Expect(err).NotTo(HaveOccurred()) + }) + It("should create exactly one remove_broker CruiseControlOperation for all brokers", func(ctx SpecContext) { + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, kafkaCluster) + Expect(err).NotTo(HaveOccurred()) + + brokerState1, ok1 := kafkaCluster.Status.BrokersState["1"] + brokerState2, ok2 := kafkaCluster.Status.BrokersState["2"] + if !ok1 || !ok2 { + return false + } + if brokerState1.GracefulActionState.CruiseControlOperationReference == nil || + brokerState2.GracefulActionState.CruiseControlOperationReference == nil { + return false + } + + operationList := &v1alpha1.CruiseControlOperationList{} + err = k8sClient.List(ctx, operationList, client.ListOption(client.InNamespace(kafkaCluster.Namespace))) + Expect(err).NotTo(HaveOccurred()) + + if len(operationList.Items) != 1 { + return false + } + operation = &operationList.Items[0] + return operation.CurrentTaskOperation() == v1alpha1.OperationRemoveBroker && + brokerState1.GracefulActionState.CruiseControlOperationReference.Name == operation.Name && + brokerState2.GracefulActionState.CruiseControlOperationReference.Name == operation.Name && + brokerState1.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled && + brokerState2.GracefulActionState.CruiseControlState == v1beta1.GracefulDownscaleScheduled + }, taskExtendedTimeoutDuration, reconcilePollingPeriod).Should(BeTrue()) + }) + }) }) func getScaleMockCCTask1() *mocks.MockCruiseControlScaler { diff --git a/openspec/changes/downscale-improvements/.openspec.yaml b/openspec/changes/downscale-improvements/.openspec.yaml new file mode 100644 index 000000000..93831bd26 --- /dev/null +++ b/openspec/changes/downscale-improvements/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-13 diff --git a/openspec/changes/downscale-improvements/design.md b/openspec/changes/downscale-improvements/design.md new file mode 100644 index 000000000..b779f7d1f --- /dev/null +++ b/openspec/changes/downscale-improvements/design.md @@ -0,0 +1,79 @@ +## Context + +The koperator manages Kafka cluster lifecycle via a set of reconcilers. Downscale (broker removal) is handled in two phases: + +1. **KafkaCluster reconciler** (`pkg/resources/kafka/kafka.go`): Detects brokers removed from spec. Collects all such brokers in one pass and sets them all to `GracefulDownscaleRequired` via a single atomic `UpdateBrokerStatus` call. + +2. **CruiseControlTask reconciler** (`controllers/cruisecontroltask_controller.go`): Picks up brokers in `GracefulDownscaleRequired` state and submits CC operations. The `add_broker` path already batches all pending brokers into one CC operation. The `remove_broker` path does not — it picks only the first task and breaks. + +3. **External listener reconcilers** (`pkg/resources/envoy/`, `pkg/resources/istioingress/`): Gate broker inclusion in envoy/istio/contour config on `ShouldIncludeBroker()`. This function returns `false` when `brokerConfig == nil` (broker not in spec), causing draining brokers to vanish from external listener config immediately upon spec removal. + +**Key invariant:** `GetActiveTasksByOp(OperationRemoveBroker)` only returns brokers in `GracefulDownscaleRequired` state (via `IsRequired()` → `IsRequiredState()`). Brokers already `Scheduled` or `Running` are not returned. Because the KafkaCluster reconciler transitions all removed brokers atomically, all brokers from a single manifest apply are guaranteed to be in `Required` state simultaneously when the CC task reconciler fires. + +## Goals / Non-Goals + +**Goals:** +- All broker IDs removed in a single manifest apply are submitted as one CC `remove_broker` operation. +- Brokers removed from spec remain in external listener config until `GracefulDownscaleSucceeded`. +- Brokers stuck in `GracefulDownscaleCompletedWithError` or `GracefulDownscalePaused` remain in external listener config (broker still holds data; manual investigation needed). + +**Non-Goals:** +- Guaranteed single-operation batching across multiple sequential manifest applies (best-effort; second apply may produce a second CC operation if first batch is already `Scheduled`). +- Changes to CRD schema or the CC REST API. +- KRaft controller-only node handling (controller-only nodes skip CC graceful downscale entirely; unchanged). + +## Decisions + +### D1: Mirror `addBrokers` pattern for `removeBrokers` + +The `addBrokers` helper (lines 366-368) already accepts `[]string` and submits one CC operation. The `removeBroker` helper (lines 370-372) takes a single `string`. + +**Decision:** Rename `removeBroker` → `removeBrokers`, change signature to `[]string`, and replace the early-break loop with the collect-all pattern. + +**Why not a separate aggregation layer?** The batching boundary is already correct — `GetActiveTasksByOp` returns exactly the set to batch. No new abstraction needed. + +### D2: Fix `ShouldIncludeBroker` as the single gatekeeper + +`ShouldIncludeBroker` is called by all external listener reconcilers (envoy configmap, service, deployment; istio gateway, virtualservice, meshgateway; contour). When `brokerConfig == nil`, the function currently falls through to `return false`. + +**Decision:** Add a fallback block for `brokerConfig == nil`: check the broker's `CruiseControlState` in status. If `IsDownscale() && !IsSucceeded()` and the broker has the requested `ingressConfigName` in its `ExternalListenerConfigNames`, return `true`. + +**Why `ExternalListenerConfigNames` check?** A broker may have been associated with a specific ingress config. Re-using the persisted `ExternalListenerConfigNames` (set when the pod was created, never cleared until `GracefulDownscaleSucceeded`) ensures we only retain the broker for the listener configs it actually served. + +**Why `IsDownscale() && !IsSucceeded()` instead of an explicit state list?** +`IsDownscale()` covers all 6 downscale states. Excluding `IsSucceeded()` retains the broker in all non-terminal states, including `CompletedWithError` and `Paused`, which is the desired behavior for manual investigation. If new downscale states are added to the enum in future, they're covered automatically. + +**Alternatives rejected:** +- Fix each caller individually: more code, same logic duplicated. +- Add a new function: unnecessary indirection; `ShouldIncludeBroker` is the right seam. + +### D3: Task order to satisfy CI (green at every commit) + +The original plan placed failing tests before implementation. With CI gating on green builds, the order must be: + +``` +Commit 1: Unit test for createCCOperation (passes immediately — tests downstream, not the wrapper) +Commit 2: Implement removeBrokers + integration test (both green together) +Commit 3: Implement ShouldIncludeBroker fix + unit test (both green together) +Commit 4: E2E test + sample manifest +``` + +## Risks / Trade-offs + +**[Race: second manifest apply before first batch starts]** → If a user applies a second spec change (removing more brokers) before the first CC operation is created, those new brokers will be included in the same batch (still in `Required` state). If the first CC operation is already `Scheduled`, new brokers get a separate operation. Acceptable; same behavior as `add_broker`. + +**[CompletedWithError brokers stay in envoy indefinitely]** → A broker that fails CC draining stays in external listener config. This is intentional (data is still present, clients need connectivity), but operators must monitor and manually recover. No change from current behavior for the envoy side — previously, the broker would have been dropped from envoy even with data present, which was worse. + +**[ExternalListenerConfigNames populated assumption]** → The fix assumes `ExternalListenerConfigNames` is non-empty for any broker that was ever reconciled. This field is set on pod creation and never cleared. Clusters created before this field existed would not benefit from the fix for those brokers, but all newly created or recently reconciled brokers are covered. + +## Migration Plan + +No migration required. Both fixes are backwards-compatible: +- `removeBrokers` produces the same CC API calls as `removeBroker` for single-broker downscales. +- `ShouldIncludeBroker` only changes behavior for brokers with `brokerConfig == nil` (already removed from spec); existing behavior for in-spec brokers is unchanged. + +Rollback: revert the two commits. No persistent state is affected. + +## Open Questions + +None — all decisions resolved during exploration. diff --git a/openspec/changes/downscale-improvements/proposal.md b/openspec/changes/downscale-improvements/proposal.md new file mode 100644 index 000000000..4c571a408 --- /dev/null +++ b/openspec/changes/downscale-improvements/proposal.md @@ -0,0 +1,26 @@ +## Why + +When downscaling a Kafka cluster, the operator creates one CruiseControl operation per removed broker and immediately drops removed brokers from external listener config (envoy/istio/contour), causing unnecessary partition movements and client connectivity loss during draining. + +## What Changes + +- **Batch broker removal**: Collect all broker IDs pending downscale and submit them as a single `remove_broker` CC operation, matching the existing `add_broker` batching behavior. +- **Retain draining brokers in external listeners**: Keep removed brokers in envoy/istio/contour config until CruiseControl finishes draining them (`GracefulDownscaleSucceeded`), so clients retain connectivity while data is being moved. + +## Capabilities + +### New Capabilities + +- `batched-broker-removal`: Single CruiseControl `remove_broker` operation for all brokers removed in a manifest apply, eliminating redundant partition movements. +- `draining-broker-listener-retention`: Brokers removed from spec but still being drained by CruiseControl remain visible in all external listener resources until draining completes. + +### Modified Capabilities + + + +## Impact + +- `controllers/cruisecontroltask_controller.go`: Replace single-broker `removeBroker` logic with `removeBrokers` (plural), mirroring `addBrokers` pattern. +- `pkg/util/util.go`: `ShouldIncludeBroker` gains a fallback path for `brokerConfig == nil` that checks `CruiseControlState`. +- All external listener reconcilers (`pkg/resources/envoy/`, `pkg/resources/istioingress/`) benefit automatically — they all gate on `ShouldIncludeBroker`. +- No API or CRD changes. No breaking changes. diff --git a/openspec/changes/downscale-improvements/specs/batched-broker-removal/spec.md b/openspec/changes/downscale-improvements/specs/batched-broker-removal/spec.md new file mode 100644 index 000000000..d9395869e --- /dev/null +++ b/openspec/changes/downscale-improvements/specs/batched-broker-removal/spec.md @@ -0,0 +1,20 @@ +## ADDED Requirements + +### Requirement: Broker removals from a single manifest apply are batched into one CC operation +When a KafkaCluster spec is applied that removes N brokers, the operator SHALL submit a single `remove_broker` CruiseControl operation containing all N broker IDs, rather than N separate operations. + +#### Scenario: Multiple brokers removed simultaneously +- **WHEN** a KafkaCluster spec is applied removing brokers 3 and 4 from a 5-broker cluster +- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created +- **THEN** the operation's broker ID parameter contains both broker IDs (e.g. `"3,4"`) +- **THEN** both brokers transition to `GracefulDownscaleScheduled` referencing the same operation + +#### Scenario: Single broker removal (unchanged behavior) +- **WHEN** a KafkaCluster spec is applied removing one broker +- **THEN** exactly one `CruiseControlOperation` of type `remove_broker` is created +- **THEN** the broker transitions to `GracefulDownscaleScheduled` + +#### Scenario: Brokers already scheduled are not re-batched +- **WHEN** broker 3 is already in `GracefulDownscaleScheduled` state and broker 4 enters `GracefulDownscaleRequired` +- **THEN** a new `CruiseControlOperation` is created for broker 4 only +- **THEN** broker 3's existing operation is not modified diff --git a/openspec/changes/downscale-improvements/specs/draining-broker-listener-retention/spec.md b/openspec/changes/downscale-improvements/specs/draining-broker-listener-retention/spec.md new file mode 100644 index 000000000..7ce48cd62 --- /dev/null +++ b/openspec/changes/downscale-improvements/specs/draining-broker-listener-retention/spec.md @@ -0,0 +1,34 @@ +## ADDED Requirements + +### Requirement: Draining brokers remain in external listener config until CC completes +A broker removed from the KafkaCluster spec SHALL remain present in all external listener resources (envoy, istio, contour) while CruiseControl is actively draining it, and SHALL be removed only after `GracefulDownscaleSucceeded`. + +#### Scenario: Broker retained during active draining +- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRunning` +- **THEN** `ShouldIncludeBroker` returns `true` for that broker +- **THEN** the broker remains in envoy configmap, service, and deployment resources + +#### Scenario: Broker retained when CC operation is scheduled but not yet running +- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleScheduled` +- **THEN** `ShouldIncludeBroker` returns `true` for that broker + +#### Scenario: Broker retained when CC has not yet started (Required state) +- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleRequired` +- **THEN** `ShouldIncludeBroker` returns `true` for that broker + +#### Scenario: Broker retained on CC error (manual investigation needed) +- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscaleCompletedWithError` +- **THEN** `ShouldIncludeBroker` returns `true` for that broker + +#### Scenario: Broker retained when CC operation is paused +- **WHEN** a broker is removed from spec and its `CruiseControlState` is `GracefulDownscalePaused` +- **THEN** `ShouldIncludeBroker` returns `true` for that broker + +#### Scenario: Broker removed from listener config after successful drain +- **WHEN** a broker's `CruiseControlState` transitions to `GracefulDownscaleSucceeded` +- **THEN** `ShouldIncludeBroker` returns `false` for that broker +- **THEN** the broker is removed from all external listener resources on the next reconcile + +#### Scenario: Unknown broker excluded +- **WHEN** a broker ID has no entry in `BrokersState` (unknown broker) +- **THEN** `ShouldIncludeBroker` returns `false` for that broker diff --git a/openspec/changes/downscale-improvements/tasks.md b/openspec/changes/downscale-improvements/tasks.md new file mode 100644 index 000000000..17160675c --- /dev/null +++ b/openspec/changes/downscale-improvements/tasks.md @@ -0,0 +1,26 @@ +## 1. Batched broker removal — unit test (passes immediately) + +- [x] 1.1 Add multi-broker test case to `TestCreateCCOperation` in `controllers/cruisecontroltask_controller_test.go` (after line 345): `operationType: OperationRemoveBroker`, `brokerIDs: []string{"1","2","3"}`, assert params contain `"1,2,3"` for `ParamBrokerID` +- [x] 1.2 Commit: `test: add unit test for batched remove_broker CC operation params` + +## 2. Batched broker removal — implementation + integration test + +- [x] 2.1 Rename `removeBroker` → `removeBrokers` in `controllers/cruisecontroltask_controller.go` (line 370), change `brokerID string` parameter to `brokerIDs []string` +- [x] 2.2 Replace the early-break loop (lines 173-186) with collect-all pattern: gather all broker IDs from `GetActiveTasksByOp(OperationRemoveBroker)`, call `removeBrokers`, set `CruiseControlOperationRef` and `StateScheduled` on all tasks +- [x] 2.3 Add integration test `When("multiple brokers are removed", ...)` to `controllers/tests/cruisecontroltask_controller_test.go` (after line 458): set brokers "1" and "2" to `GracefulDownscaleRequired`, assert exactly 1 `CruiseControlOperation` created, both brokers reference same operation, both transition to `GracefulDownscaleScheduled` +- [x] 2.4 Run `go test ./controllers/ -run TestCreateCCOperation` and `go test ./controllers/tests/ -run CruiseControlTaskReconciler` — all green +- [ ] 2.5 Commit: `feat: batch remove_broker operations into single CruiseControl task` + +## 3. Draining broker listener retention — implementation + unit test + +- [x] 3.1 Add fallback block to `ShouldIncludeBroker` in `pkg/util/util.go` (after line 284): when `brokerConfig == nil`, look up `brokerState` in `status.BrokersState`; if `ccState.IsDownscale() && !ccState.IsSucceeded()` and `StringSliceContains(brokerState.ExternalListenerConfigNames, ingressConfigName)`, return `true` +- [x] 3.2 Add unit test cases to `pkg/util/util_test.go` for `ShouldIncludeBroker` with `brokerConfig=nil`: all 5 active downscale states return `true`, `GracefulDownscaleSucceeded` returns `false`, missing broker state returns `false` +- [x] 3.3 Run `go test ./pkg/util/ -run TestShouldIncludeBroker` — all green +- [x] 3.4 Commit: `fix: retain draining brokers in external listener config until CruiseControl completes` + +## 4. E2E test + sample manifest + +- [x] 4.1 Create `config/samples/simplekafkacluster_5broker.yaml`: copy `simplekafkacluster.yaml`, add brokers 3 and 4, adjust `cruise.control.metrics.topic.replication.factor=2` and `min.insync.replicas=2` +- [x] 4.2 Add `testBatchedBrokerRemoval()` to `tests/e2e/test_broker_removal.go` following `testMultiDiskRemoval` pattern: apply 5-broker manifest, then apply 3-broker manifest, assert exactly 1 `CruiseControlOperation` of type `remove_broker`, assert only 3 pods remain Ready +- [x] 4.3 Wire into suite in `tests/e2e/koperator_suite_test.go` (after multi-disk removal block, line 74): `testInstallKafkaCluster("../../config/samples/simplekafkacluster_5broker.yaml")`, `testBatchedBrokerRemoval()`, `testUninstallKafkaCluster()` +- [ ] 4.4 Commit: `test: add e2e test for batched broker removal` diff --git a/pkg/util/util.go b/pkg/util/util.go index bfa588885..87d8fecb6 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -280,6 +280,17 @@ func ShouldIncludeBroker(brokerConfig *v1beta1.BrokerConfig, status v1beta1.Kafk } } } + + // Broker removed from spec but still draining — keep in external listener config until CC finishes + if brokerConfig == nil { + if brokerState, ok := status.BrokersState[strconv.Itoa(brokerID)]; ok { + ccState := brokerState.GracefulActionState.CruiseControlState + if ccState.IsDownscale() && !ccState.IsSucceeded() && + apiutil.StringSliceContains(brokerState.ExternalListenerConfigNames, ingressConfigName) { + return true + } + } + } return false } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index de5545bde..38c98f031 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -17,6 +17,7 @@ package util import ( "reflect" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -829,3 +830,72 @@ func TestGetMD5Hash(t *testing.T) { } } } + +func TestShouldIncludeBroker(t *testing.T) { + t.Parallel() + + const ingressConfig = "default" + brokerID := 5 + + makeStatus := func(state v1beta1.CruiseControlState) v1beta1.KafkaClusterStatus { + return v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + strconv.Itoa(brokerID): { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: state, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{ingressConfig}, + }, + }, + } + } + + testCases := []struct { + name string + state v1beta1.CruiseControlState + expected bool + }{ + {"GracefulDownscaleRequired", v1beta1.GracefulDownscaleRequired, true}, + {"GracefulDownscaleScheduled", v1beta1.GracefulDownscaleScheduled, true}, + {"GracefulDownscaleRunning", v1beta1.GracefulDownscaleRunning, true}, + {"GracefulDownscaleCompletedWithError", v1beta1.GracefulDownscaleCompletedWithError, true}, + {"GracefulDownscalePaused", v1beta1.GracefulDownscalePaused, true}, + {"GracefulDownscaleSucceeded", v1beta1.GracefulDownscaleSucceeded, false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := ShouldIncludeBroker(nil, makeStatus(tc.state), brokerID, ingressConfig, ingressConfig) + if result != tc.expected { + t.Errorf("state %s: expected %v, got %v", tc.name, tc.expected, result) + } + }) + } + + t.Run("no broker state entry", func(t *testing.T) { + emptyStatus := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{}, + } + result := ShouldIncludeBroker(nil, emptyStatus, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false for broker with no status entry, got true") + } + }) + + t.Run("ingress config not in ExternalListenerConfigNames", func(t *testing.T) { + status := v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + strconv.Itoa(brokerID): { + GracefulActionState: v1beta1.GracefulActionState{ + CruiseControlState: v1beta1.GracefulDownscaleRunning, + }, + ExternalListenerConfigNames: v1beta1.ExternalListenerConfigNames{"other-ingress"}, + }, + }, + } + result := ShouldIncludeBroker(nil, status, brokerID, ingressConfig, ingressConfig) + if result { + t.Error("expected false when ingressConfig not in ExternalListenerConfigNames, got true") + } + }) +} diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 3e2c35c3a..8e03fef04 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -72,6 +72,9 @@ var _ = ginkgo.When("Testing e2e test altogether", ginkgo.Ordered, func() { testInstallKafkaCluster("../../config/samples/simplekafkacluster_4disk.yaml") testMultiDiskRemoval() testUninstallKafkaCluster() + testInstallKafkaCluster("../../config/samples/simplekafkacluster_5broker.yaml") + testBatchedBrokerRemoval() + testUninstallKafkaCluster() testUninstallZookeeperCluster() // kraft tests testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml") diff --git a/tests/e2e/test_broker_removal.go b/tests/e2e/test_broker_removal.go new file mode 100644 index 000000000..5a97a1129 --- /dev/null +++ b/tests/e2e/test_broker_removal.go @@ -0,0 +1,122 @@ +// Copyright © 2025 Cisco Systems, Inc. and/or its affiliates +// Copyright 2025 Adobe. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "context" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + ginkgo "github.com/onsi/ginkgo/v2" + gomega "github.com/onsi/gomega" + + "github.com/banzaicloud/koperator/api/v1beta1" +) + +const ( + batchedBrokerRemovalTimeout = 1200 * time.Second + batchedBrokerRemovalPollInterval = 15 * time.Second +) + +// testBatchedBrokerRemoval applies the 3-broker manifest over the running 5-broker cluster, +// waits for CruiseControl to complete removal, then asserts exactly one remove_broker +// CruiseControlOperation was created and only 3 broker pods remain Ready. +func testBatchedBrokerRemoval() bool { + return ginkgo.When("Batched broker removal: remove two brokers and assert single CC operation", func() { + var kubectlOptions k8s.KubectlOptions + var err error + + ginkgo.It("Acquiring K8s config and context", func() { + kubectlOptions, err = kubectlOptionsForCurrentContext() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + kubectlOptions.Namespace = koperatorLocalHelmDescriptor.Namespace + }) + + ginkgo.It("Applying 3-broker manifest to trigger removal of brokers 3 and 4", func() { + ginkgo.By("Patching KafkaCluster to remove brokers 3 and 4") + applyK8sResourceManifest(kubectlOptions, "../../config/samples/simplekafkacluster.yaml") + }) + + ginkgo.It("Waiting for exactly one remove_broker CruiseControlOperation to be created", func() { + ginkgo.By("Polling until exactly one remove_broker CruiseControlOperation exists") + gomega.Eventually(context.Background(), func() (bool, error) { + return hasExactlyOneRemoveBrokerOperation(kubectlOptions) + }, batchedBrokerRemovalTimeout, batchedBrokerRemovalPollInterval).Should(gomega.BeTrue()) + }) + + ginkgo.It("Asserting exactly one remove_broker CruiseControlOperation was created", func() { + ok, err := hasExactlyOneRemoveBrokerOperation(kubectlOptions) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(ok).To(gomega.BeTrue(), "expected exactly one remove_broker CruiseControlOperation") + }) + + ginkgo.It("Waiting for brokers 3 and 4 to be removed (only 3 pods remain)", func() { + ginkgo.By("Waiting until only 3 kafka broker pods are Ready") + gomega.Eventually(context.Background(), func() (bool, error) { + return hasExactlyNBrokerPods(kubectlOptions, 3) + }, batchedBrokerRemovalTimeout, batchedBrokerRemovalPollInterval).Should(gomega.BeTrue()) + }) + + ginkgo.It("Asserting remaining Kafka brokers are healthy", func() { + err := waitK8sResourceCondition(kubectlOptions, "pod", "condition=Ready", defaultPodReadinessWaitTime, + v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+","+kafkaLabelSelectorBrokers, "") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + }) +} + +// hasExactlyOneRemoveBrokerOperation returns true if there is exactly one CruiseControlOperation +// of type remove_broker in the namespace. +func hasExactlyOneRemoveBrokerOperation(kubectlOptions k8s.KubectlOptions) (bool, error) { + ops, err := getK8sResources(kubectlOptions, + []string{"cruisecontroloperation"}, + "", + "", + "-o", "jsonpath={range .items[*]}{.status.currentTask.operation}{'\\n'}{end}", + ) + if err != nil { + return false, err + } + + count := 0 + for _, op := range ops { + if op == "removeBroker" { + count++ + } + } + return count == 1, nil +} + +// hasExactlyNBrokerPods returns true when exactly n broker pods exist in the namespace. +func hasExactlyNBrokerPods(kubectlOptions k8s.KubectlOptions, n int) (bool, error) { + pods, err := getK8sResources(kubectlOptions, + []string{"pod"}, + v1beta1.KafkaCRLabelKey+"="+kafkaClusterName+","+kafkaLabelSelectorBrokers, + "", + "--field-selector=status.phase=Running", + ) + if err != nil { + return false, err + } + // subtract 1 for the header line + actual := len(pods) - 1 + if actual < 0 { + actual = 0 + } + return actual == n, nil +}