diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaNoWatermarkITCase.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaNoWatermarkITCase.java new file mode 100644 index 000000000..f5f2292e3 --- /dev/null +++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaNoWatermarkITCase.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.table.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.Test; + +/** + * Regression IT for an Auron Kafka source without an event-time watermark strategy. + * The source must emit all records on the no-watermark path; a source that only marks + * itself running when a watermark strategy is present would produce an empty result. + */ +public class AuronKafkaNoWatermarkITCase extends AuronKafkaSourceTestBase { + + @Test + public void testNoWatermarkSourceEmitsAllRows() { + environment.setParallelism(1); + List rows = CollectionUtil.iteratorToList( + tableEnvironment.executeSql("SELECT * FROM T5").collect()); + assertThat(rows).extracting(row -> row.getField(1)).containsExactlyInAnyOrder(20, 21, 22); + } +} diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java index 494ab38b7..52720ec19 100644 --- a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java +++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java @@ -124,6 +124,21 @@ public void before() { + "\n 'properties.group.id' = 'flink-test-mock'," + "\n 'format' = 'JSON' " + "\n )"); + + // T5: single-partition source with NO watermark strategy. Exercises the + // no-watermark emit path, where the source must still produce all records. + tableEnvironment.executeSql(" CREATE TABLE T5 ( " + + "\n `event_time` BIGINT, " + + "\n `age` INT, " + + "\n `name` STRING " + + "\n ) WITH ( " + + "\n 'connector' = 'auron-kafka'," + + "\n 'kafka.mock.data' = '" + jsonArray + "'," + + "\n 'topic' = 'mock_topic'," + + "\n 'properties.bootstrap.servers' = '127.0.0.1:9092'," + + "\n 'properties.group.id' = 'flink-test-mock'," + + "\n 'format' = 'JSON' " + + "\n )"); } protected void assertRowsContains(List actualRows, Object[]... expectedRows) { diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index 178f611e2..d32ef73de 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -278,8 +278,12 @@ public void open(Configuration config) throws Exception { watermarkStrategy.createWatermarkGenerator(() -> metricGroup); partitionWatermarkTrackers.put(partitionId, new PartitionWatermarkTracker(generator)); } - this.isRunning = true; } + + // Mark the source as running only after initialization completes. The run() loop + // collects rows only while isRunning is true on both the watermark and no-watermark + // paths, so this must be set regardless of whether a watermark strategy is present. + this.isRunning = true; } @Override