[AURON #2090] Convert CoalesceExec to native implementation#2294
[AURON #2090] Convert CoalesceExec to native implementation#2294guixiaowen wants to merge 7 commits into
Conversation
|
|
||
| override protected def getPartitions: Array[Partition] = partitions | ||
|
|
||
| override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { |
There was a problem hiding this comment.
Is the coalesce meant to run natively, or as JVM iterator concatenation? The PR ships two implementations, and as written the native one never runs.
This compute override concatenates the parent partition iterators directly in the JVM. The base NativeRDD.compute (NativeRDD.scala:67-80) is the only thing that actually executes a native plan — it calls NativeHelper.executeNativePlan(...) — and this override never does. So the .setCoalesce(...) plan built in NativeCoalesceBase.doExecuteNative (:60-64 and :73-76) is constructed, serialized into the RDD, and then never invoked. That leaves the whole native chain off the execution path: coalesce_exec.rs, the CoalesceExecNode proto message, and the PhysicalPlanType::Coalesce arm in planner.rs:580-586. The coalesce ends up performed purely by JVM iterator concatenation, the same way Spark's own CoalescedRDD would.
And even if the native node were reached, CoalesceExec::execute (coalesce_exec.rs:73-113) is a pass-through — it streams input batches out unchanged, properties() returns the input's partitioning untouched, and num_partitions is stored but only ever read in fmt_as. So it neither repartitions nor does anything the input stream wouldn't already do. For contrast, NativeUnionBase.doExecuteNative wraps a real UnionExecNode in a plain NativeRDD with no compute override, so union genuinely executes natively.
The two designs are mutually exclusive, and right now the PR carries both scaffolds and lands neither cleanly:
- If the intent is native — coalesce N input partitions into one task, then run the rest of the native pipeline over the concatenated stream — then this override severs that, and
CoalesceExec::executeneeds to actually do the coalescing rather than pass through. - If the intent is JVM concatenation, then
coalesce_exec.rs, the proto field, and the planner arm are dead code that could be dropped, and the~245-line vendoredDefaultPartitionCoalescer(NativeCoalesceBase.scala:90-335) is doing partition layout for a coalesce that Spark's ownCoalescedRDDcould also provide — worth weighing whether the copy earns its maintenance cost in that case.
Which direction were you aiming for? The inline questions below are the correctness issues that hold regardless of which way it goes.
| } else { | ||
| new CoalesceNativeRDD( | ||
| sparkContext, | ||
| inputRDD.dependencies, |
There was a problem hiding this comment.
The two branches wire the dependency differently. The empty branch (:58) passes Seq(new OneToOneDependency(inputRDD)), so firstParent resolves to inputRDD. This non-empty branch passes inputRDD.dependencies, so firstParent (= dependencies.head.rdd) resolves to the grandparent, not inputRDD. Meanwhile the CoalescedRDDPartition objects (:47) carry parents resolved against inputRDD, so compute ends up calling grandparent.iterator(p) with an inputRDD partition — at best skipping inputRDD's own computation (and the native plan it carries), at worst handing the grandparent a foreign partition.
The empty branch looks like the correct pattern. Should this branch use Seq(new OneToOneDependency(inputRDD)) too?
| inputRDD.dependencies, | ||
| partitions, | ||
| (partition, taskContext) => { | ||
| val inputPartition = inputRDD.partitions(partition.index) |
There was a problem hiding this comment.
partition here is a CoalescedRDDPartition whose index is the coalesced ordinal i (set at :47). A coalesced partition can group many input partitions — that's what the parents/parentsIndices array is for — but inputRDD.partitions(partition.index) picks a single input partition by the coalesced ordinal and drops the rest. That contradicts compute, which iterates all parents. When numPartitions < inputRDD.getNumPartitions, this builds a native plan over one input partition while the grouping intent says several.
It produces no wrong rows today only because the native plan isn't executed (per the top-level question) — but the index error is latent the moment it is wired in. Was the intent to fold all of a group's input partitions in here rather than index by the coalesced ordinal?
| } | ||
| println(test.get) | ||
|
|
||
| assert(collectFirst(df.queryExecution.executedPlan) { |
There was a problem hiding this comment.
This asserts that a NativeCoalesceExec node appears in the plan, but never checks the one thing coalesce actually does — the output partition count. With a single-row, single-partition input, coalesce(2) collapses to a trivial case, so the firstParent and index issues above (and any future coalescer bug) would still pass green here.
Could the test cover the cases that exercise the grouping? Two that would catch the bugs above:
// many input partitions coalesced down, full row set preserved
val df = spark.range(0, 100, 1, numPartitions = 8).coalesce(2)
assert(df.rdd.getNumPartitions == 2)
checkAnswer(df.toDF(), (0 until 100).map(Row(_)))
// empty input still yields numPartitions partitions (the contract the :51 comment describes)
assert(spark.emptyDataFrame.coalesce(1).rdd.getNumPartitions == 1)
Which issue does this PR close?
Closes #2090
Rationale for this change
Convert CoalesceExec to native.
What changes are included in this PR?
Identify the CoalesceExec scenario and convert it to native execution.
Are there any user-facing changes?
Nothing.
How was this patch tested?
UT