Skip to content

[AURON #1865] Merge native operators: fuse Flink source and Calc into one native plan#2327

Open
weiqingy wants to merge 4 commits into
apache:masterfrom
weiqingy:AURON-1865-impl-2
Open

[AURON #1865] Merge native operators: fuse Flink source and Calc into one native plan#2327
weiqingy wants to merge 4 commits into
apache:masterfrom
weiqingy:AURON-1865-impl-2

Conversation

@weiqingy

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #1865

Rationale for this change

A SELECT proj … FROM kafka WHERE pred plan runs today as two independent native operators: the native Kafka source converts its columnar output to RowData, then the shadowed StreamExecCalc operator converts it back to Arrow, runs Project[Filter] natively, and converts back to RowData — paying multiple row/column conversions where one would do. This PR fuses the source and a convertible Calc into a single native plan Project[Filter?[KafkaScan]] that runs inside the source, so the data stays columnar end-to-end and converts to RowData once, at the chain tail. This is the whole-stage native operator merging direction for the Flink integration, mirroring the plan-merge already used on the Spark side.

What changes are included in this PR?

Strict gating in the shadowed StreamExecCalc: a Calc converts to native only when it can fuse into a native upstream chain — its input is the native Auron Kafka source, the source has no event-time watermark, and every expression converts. When fusible, the Calc hands its logical Project[Filter?] sub-plan to the source and returns the source transformation re-typed to the projected output, eliminating the Calc operator. Every other case (non-native upstream, watermarked source, any unconvertible expression) runs as a stock Flink Calc.

The source splices its KafkaScan into the handed-off sub-plan and prepends the partition/offset/timestamp metadata columns, so its per-record offset-commit and timestamp bookkeeping are unchanged; native column resolution is by name, so no column-index rewriting is needed.

The standalone FFI-reader native-Calc path is no longer emitted from the Calc decision. Detection and hand-off go through the source function, so no source-operator type is introduced; operator-level native identity is left to the future ExecNode-graph merge that would consume it. Event-time-watermark fusion is out of scope and tracked as a follow-up.

This PR is stacked on #2325 (no-watermark source emit fix), which it requires; that commit drops out of the diff once #2325 merges and this branch is rebased.

Are there any user-facing changes?

Performance: a source → Calc chain over the native Kafka source with no event-time watermark now runs as a single native pipeline with one columnar-to-row conversion. A Calc that cannot fuse (non-native source, watermarked source, or unsupported expression) runs as Flink's codegen Calc.

How was this patch tested?

Unit tests in StreamExecCalcTest assert the fusion decision (native source + no watermark + convertible → merged source transformation with no separate Calc operator; each negative case → stock Flink Calc). An end-to-end AuronKafkaSourceMergeITCase runs fused SQL over a no-watermark auron-kafka table and asserts row-set correctness; a native boolean-to-string cast renders lowercase, proving the fused path executes natively (Flink codegen renders uppercase). The full auron-flink-planner (130) and auron-flink-runtime (137) module test suites pass.

weiqingy added 4 commits June 10, 2026 19:43
…t a watermark

AuronKafkaSourceFunction.open() set isRunning=true only inside the
watermarkStrategy != null branch, but run() collects records only while
isRunning is true on both the watermark and no-watermark paths. A source
configured without an event-time watermark therefore emitted nothing. Set
isRunning=true unconditionally once open() completes so no-watermark sources
emit records (and snapshot offsets and discover partitions) like watermarked
ones, while a partial-init failure still leaves the source not-running.

Add a no-watermark mock table and an end-to-end test asserting the source
emits its records.
…AuronKafkaSourceFunction

Enable AuronKafkaSourceFunction to run a fused Project[Filter?[KafkaScan]]
native plan when given a logical Calc sub-plan via setMergedCalcPlan, splicing
the KafkaScan into the FFIReader-placeholder leaf and prepending the three
metadata passthrough columns so the fused output stays metadata-prefixed. The
path is dormant until the planner wires it: with no merged plan set, open(),
run(), and getOutputType() behave exactly as before.
Apply strict gating in the shadowed StreamExecCalc: a Calc converts to native
only when it can fuse into a native upstream chain -- its input is the Auron
native Kafka source, the source has no event-time watermark, and every Rex
converts. When fusible, the Calc's logical Project[Filter?] sub-plan is handed
to the source via setMergedCalcPlan and the source transformation is returned
re-typed to the Calc's projected output, eliminating the Calc operator so the
source -> Calc chain runs as one native pipeline. Every other case (non-native
upstream, watermarked source, any unconvertible Rex) runs as a stock Flink
Calc; the standalone FFIReader native-Calc emission is removed.

A Calc over a non-native source (e.g. the values connector) now runs as a
Flink codegen Calc, so testCastBooleanToString asserts Flink's uppercase
boolean-to-string rendering.
…c fusion

Add AuronKafkaSourceMergeITCase exercising the fused path end-to-end over the
no-watermark auron-kafka table T5: filter + projection fusion, projection-only
fusion, and a native boolean-to-string cast whose lowercase rendering proves
the fused Project[Filter?[KafkaScan]] ran natively (Flink codegen renders
uppercase). A control case over the watermarked source T2 confirms the
non-fused path stays correct.

Operator elimination is covered by the StreamExecCalc unit tests; the
integration tests assert row-set correctness and native execution.
@github-actions github-actions Bot added the flink label Jun 11, 2026
@weiqingy

Copy link
Copy Markdown
Contributor Author

Hi @Tartarus0zm, could you please help review this PR when you get a chance? Thanks!

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auron StreamExecCalc → Kafka source fusion review

Nice direction. I verified the core mechanism is sound: native resolves projection/filter columns by name (auron-planner/src/planner.rs:876, proto index ignored), so prepending the 3 meta columns and the resulting ordinal shift are harmless; meta columns stay first so offset-commit bookkeeping is unchanged; meta names/types/nullability match the KafkaScan schema; and the isRunning move is a genuine no-watermark emit fix. The new non-transient fields serialize fine.

The findings cluster around one root cause: the fused Calc returns the upstream source Transformation directly and mutates the shared AuronKafkaSourceFunction, with no guard against the same source being reached more than once (ExecNodeBase.translateToPlan memoizes it). The top three correctness items — shared-source plan overwrite, double setOutputType, and stacked-Calc double-fuse — are all closed by only fusing when the source has a single consumer and isn't already merged. Details inline.

Cleared (not issues): meta column order/types/reader offsets, run() projected-type sizing, field serialization, and the boolean-cast lowercase/uppercase test flips.

// failure, so it goes to Flink's Calc regardless of the fallback flag.
final AuronKafkaSourceFunction source = asNativeKafkaSource(upstream);
if (source != null && !source.hasWatermark()) {
source.setMergedCalcPlan(plan.get(), outputRowType);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness (high): a source reached by two Calcs gets its merged plan overwritten.

setMergedCalcPlan mutates the shared AuronKafkaSourceFunction, but ExecNodeBase.translateToPlan memoizes the source transformation, so one source instance can be handed to more than one Calc:

  • Source reuse (default table.optimizer.reuse-source-enabled=true): this source declares no projection/filter push-down, so two scans of the same table collapse into one shared source node. Two fusible Calcs above it both call setMergedCalcPlan — last write wins, and since both return the same re-typed upstream, the branches collapse to one stream. e.g. SELECT age FROM T5 WHERE age>20 UNION ALL SELECT age+1 FROM T5 WHERE age>10 → silently wrong results.
  • Stacked Calcs (source→Calc1→Calc2 left unmerged by CalcMergeRule): Calc1 returns the source transformation, so Calc2 re-matches the same source, overwrites Calc1's plan, and builds exprs against Calc1's projected columns the KafkaScan never emits → native name-resolution failure.

Suggest fusing only when the source has exactly one consumer/output edge, and refusing to re-fuse an already-merged source (guard on mergedCalcPlan != null).

// the source and this Calc has read getOutputType() yet — Flink's Transformation locks
// the type on first read. An ExecNode inserted between source and Calc that reads the
// type before this point would lock the original RowType and make this re-type a no-op.
upstream.setOutputType(InternalTypeInfo.of(outputRowType));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness: this is a second setOutputType on the source transformation and can throw.

CommonExecTableSourceScan already typed this transformation. Transformation.setOutputType throws IllegalStateException once typeUsed is set (any prior getOutputType() read). The single-consumer happy path works because nothing reads the type in between — but in the shared/stacked-source case (see the setMergedCalcPlan comment), a downstream of the first consumer locks the type and this re-type aborts job translation instead of falling back. A single-consumer guard also resolves this.

// KafkaScan when the sub-plan is fused into the Auron Kafka source.
final FFIReaderExecNode ffiReader = FFIReaderExecNode.newBuilder()
.setNumPartitions(1)
.setSchema(SchemaConverters.convertToAuronSchema(inputRowType, false))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup (efficiency): this builds a full Arrow input schema for the FFIReader leaf, but the whole leaf (schema included) is discarded by spliceScanIntoLeaf and replaced with the KafkaScan on the fused path. The conversion is wasted on every fused Calc, and the surviving FFIReader carries a real schema the splice silently drops — inviting a future change to rely on it. A bare marker leaf would express intent and save the work.

// timestamps the downstream Calc must not strip, so fusion is disabled there. Any other
// upstream has no source to fuse into. A convertible-but-unfusible Calc is not a conversion
// failure, so it goes to Flink's Calc regardless of the fallback flag.
final AuronKafkaSourceFunction source = asNativeKafkaSource(upstream);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Altitude (follow-up): detecting the fuse target via instanceof LegacySourceTransformation + unwrapping getOperator()/getUserFunction(), then mutating the source function from the planner, is a Kafka-specific point solution. Any future wrapper between source and Calc (Source-API migration off LegacySourceTransformation, a partitioner/rebalance, a second native source type) silently fails the instanceof and falls back with no signal — an invisible perf regression. The PR notes a general ExecNode-merge is the intended direction; worth tracking.

// Set at plan time, so these must survive operator serialization to the TaskManager
// (non-transient, like watermarkStrategy below).
private PhysicalPlanNode mergedCalcPlan;
private RowType mergedProjectedOutputType;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplification: mergedProjectedOutputType is derivable from mergedCalcPlan's outer ProjectionExecNode (field names + arrow types) and drives a duplicated mergedProjectedOutputType != null ? … : (RowType) outputType ternary in both getOutputType() (L435) and run() (L329). Deriving it on demand removes the field and collapses both ternaries to one source of truth.

*
* @return {@code true} if a {@link WatermarkStrategy} was set
*/
public boolean hasWatermark() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness (latent): this gate depends on AuronKafkaDynamicTableSource.copy() ordering.

copy() does not carry over watermarkStrategy (resets it to null). It's safe today because Flink's PushWatermarkIntoTableSourceScanRuleBase applies the watermark spec to the copy after copy(). But the gate that blocks fusing over a watermarked source (fusion would strip the per-record kafka timestamps event-time needs) now silently relies on that exact ordering — any path that copies after applyWatermark without re-applying drops the watermark, lets the source fuse, and produces wrong event-time results. Preserving watermarkStrategy in copy() would make the gate self-contained.

* {@code Project[FFIReader]} and {@code Project[Filter[FFIReader]]} (the Calc always wraps its
* output in a projection).
*/
private static PhysicalPlanNode spliceScanIntoLeaf(PhysicalPlanNode node, PhysicalPlanNode kafkaScan) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup (reuse + dead code): spliceScanIntoLeaf is a near-verbatim duplicate of FlinkAuronCalcOperator.injectFfiReaderLeaf (same recursive Project/Filter/FFIReader walk and error). After this PR, FlinkAuronCalcOperator and its (large) unit test are orphaned — nothing emits the standalone FFIReader native-Calc path anymore. Either share one parameterized tree-rewriter or remove the now-dead operator + test, so the two walkers can't drift apart.

* data_type} lists, so the fused output stays {@code [partition, offset, timestamp,
* ...projected-logical]}.
*/
private static PhysicalPlanNode prependMetadataPassthrough(PhysicalPlanNode merged) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness: prependMetadataPassthrough adds meta columns to the outermost projection only and blindly does merged.getProjection()/setProjection(). Correct for the single Project[Filter?[FFIReader]] shape tryBuildAuronPlan emits today, but a nested Project[Project[…]] would leave the inner projection without the meta columns (outer meta lookup fails by name), and a non-Projection root would be silently overwritten by a 3-column meta-only projection (dropping the Filter) rather than failing fast. An explicit shape assertion would harden this if the Calc plan shape ever broadens.

return merged.toBuilder().setProjection(rebuilt.build()).build();
}

private static void addMetadataColumn(ProjectionExecNode.Builder builder, String name, LogicalType type) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup + minor correctness:

  • (reuse) This hand-encodes the 3 Kafka meta columns a third time — SchemaConverters.convertToAuronSchema(.., true) and run() (L304-306) already define the same name/order/type/nullability set. The contract now lives in three spots; adding/renaming a meta column needs all three edited in lockstep.
  • (correctness, low-likelihood) Native resolves these passthroughs by name (planner.rs:876). A user logical column literally named serialized_kafka_records_partition/offset/timestamp would collide and resolve to the meta column. Unguarded.

* so it runs as a stock Flink Calc above the native source. The row set must still be correct:
* only age 21 and 22 survive the filter. */
@Test
public void testCalcOverWatermarkedSourceDoesNotFuseButIsCorrect() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test validity: this claims to cover the hasWatermark() gate, but T2's watermark is on a computed column, so Flink inserts a watermark-assigner/projection between the source and the Calc — the Calc's upstream is no longer a LegacySourceTransformation, so asNativeKafkaSource() returns null and fusion is blocked by the non-native-upstream branch, not the watermark branch. If hasWatermark() were deleted this test would still pass (false confidence — and it's the only guard against the copy() watermark-drop risk noted on hasWatermark). A directly-watermarked native source (no computed column) would actually exercise the gate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support merge native operators

2 participants