diff --git a/.github/workflows/paimon.yml b/.github/workflows/paimon.yml
index 25188e81c..25c32f6fa 100644
--- a/.github/workflows/paimon.yml
+++ b/.github/workflows/paimon.yml
@@ -56,12 +56,29 @@ jobs:
java-version: ${{ matrix.javaver }}
cache: 'maven'
+ - name: Build dependencies (skip tests)
+ run: >
+ ./build/mvn -B install
+ -pl ${{ matrix.module }}
+ -am
+ -Pscala-${{ matrix.scalaver }}
+ -Ppaimon-${{ matrix.paimon }}
+ -P${{ matrix.sparkver }}
+ -Prelease
+ -DskipTests
+
- name: Test Paimon Module
- run: ./build/mvn -B test -pl ${{ matrix.module }} -am -Pscala-${{ matrix.scalaver }} -Ppaimon-${{ matrix.paimon }} -P${{ matrix.sparkver }} -Prelease
+ run: >
+ ./build/mvn -B test
+ -pl ${{ matrix.module }}
+ -Pscala-${{ matrix.scalaver }}
+ -Ppaimon-${{ matrix.paimon }}
+ -P${{ matrix.sparkver }}
+ -Prelease
- name: Upload reports
if: failure()
uses: actions/upload-artifact@v7
with:
- name: ${{ matrix.module }}-test-report
+ name: auron-paimon-test-report
path: ${{ matrix.module }}/target/surefire-reports
\ No newline at end of file
diff --git a/thirdparty/auron-paimon/pom.xml b/thirdparty/auron-paimon/pom.xml
index 2f93a035b..d5fb2b0fa 100644
--- a/thirdparty/auron-paimon/pom.xml
+++ b/thirdparty/auron-paimon/pom.xml
@@ -50,5 +50,37 @@
spark-hive_${scalaVersion}
provided
+
+
+
+ org.apache.paimon
+ paimon-spark-${shortSparkVersion}
+ ${paimonVersion}
+ test
+
+
+ org.apache.spark
+ spark-core_${scalaVersion}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-catalyst_${scalaVersion}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-sql_${scalaVersion}
+ test-jar
+ test
+
+
+ org.apache.auron
+ spark-extension-shims-spark_${scalaVersion}
+ ${project.version}
+ test
+
diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala
new file mode 100644
index 000000000..c0c9b5b14
--- /dev/null
+++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron.paimon
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.utils.RowDataToObjectArrayConverter
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.auron.NativeConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.connector.read.InputPartition
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.hive.auron.paimon.PaimonUtil
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+final case class PaimonFile(filePath: String, fileSize: Long, partitionValues: InternalRow)
+
+final case class PaimonScanPlan(
+ table: FileStoreTable,
+ files: Seq[PaimonFile],
+ fileFormat: String,
+ readSchema: StructType,
+ fileSchema: StructType,
+ partitionSchema: StructType)
+
+object PaimonScanSupport extends Logging {
+
+ private val PaimonBaseScanClassName = "org.apache.paimon.spark.PaimonBaseScan"
+ private val PaimonInputPartitionClassName = "org.apache.paimon.spark.PaimonInputPartition"
+
+ def isPaimonScan(exec: BatchScanExec): Boolean = isPaimonScan(exec.scan)
+
+ private def isPaimonScan(scan: AnyRef): Boolean = {
+ isInstanceOfClass(scan, PaimonBaseScanClassName)
+ }
+
+ def plan(exec: BatchScanExec): Option[PaimonScanPlan] = {
+ val scan = exec.scan
+ if (!isPaimonScan(scan)) {
+ return None
+ }
+
+ val table = paimonTable(scan) match {
+ case Some(t) => t
+ case None =>
+ logDebug("Skip native Paimon scan: cannot resolve FileStoreTable from PaimonScan.")
+ return None
+ }
+
+ if (!PaimonUtil.isPaimonCowTable(table)) {
+ logDebug("Skip native Paimon scan: only Paimon COW tables are supported.")
+ return None
+ }
+
+ val fileFormat = PaimonUtil.paimonFileFormat(table)
+ if (!fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) &&
+ !fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) {
+ logDebug(s"Skip native Paimon scan: unsupported file format $fileFormat.")
+ return None
+ }
+
+ val readSchema = scan.readSchema()
+ if (!readSchema.fields.forall(f => NativeConverters.isTypeSupported(f.dataType))) {
+ logDebug("Skip native Paimon scan: unsupported column data type in read schema.")
+ return None
+ }
+
+ val partitionKeys = table.schema().partitionKeys().asScala.toSet
+ val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name))
+ val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name))
+ val partitionSchema = StructType(partitionFields)
+ val fileSchema = StructType(fileFields)
+
+ val partitions = inputPartitions(exec) match {
+ case Some(p) => p
+ case None =>
+ logDebug("Skip native Paimon scan: failed to obtain input partitions.")
+ return None
+ }
+ if (partitions.isEmpty) {
+ logDebug("Paimon scan planned with empty input partitions.")
+ return Some(
+ PaimonScanPlan(table, Seq.empty, fileFormat, readSchema, fileSchema, partitionSchema))
+ }
+
+ val splitsOpt = collectSplits(partitions)
+ val splits = splitsOpt match {
+ case Some(s) => s
+ case None =>
+ logDebug("Skip native Paimon scan: cannot extract splits from input partitions.")
+ return None
+ }
+
+ // Only allow COW-style raw-readable splits; reject MOR/MOW or splits with deletion vectors.
+ val unsupported = splits.find { s =>
+ !s.rawConvertible() ||
+ (s.deletionFiles().isPresent && {
+ val list = s.deletionFiles().get()
+ list != null && list.asScala.exists(_ != null)
+ })
+ }
+ if (unsupported.isDefined) {
+ logDebug("Skip native Paimon scan: split is not raw-convertible or has deletion files.")
+ return None
+ }
+
+ val partitionConverter = new RowDataToObjectArrayConverter(
+ table.schema().logicalPartitionType())
+ val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone
+ val tzOption: String = {
+ val props = CaseInsensitiveMap(table.options().asScala.toMap)
+ props.getOrElse(DateTimeUtils.TIMEZONE_OPTION, sessionLocalTimeZone)
+ }
+
+ val files = splits.flatMap { split =>
+ val partitionValues = if (partitionSchema.isEmpty) {
+ InternalRow.empty
+ } else {
+ toPartitionRow(
+ partitionConverter.convert(split.partition()),
+ partitionSchema,
+ table.schema().partitionKeys().asScala.toSeq,
+ tzOption)
+ }
+ split.dataFiles().asScala.map { dataFile =>
+ val filePath = s"${split.bucketPath()}/${dataFile.fileName()}"
+ PaimonFile(filePath, dataFile.fileSize(), partitionValues)
+ }
+ }
+
+ Some(PaimonScanPlan(table, files, fileFormat, readSchema, fileSchema, partitionSchema))
+ }
+
+ private def containsName(names: Set[String], target: String): Boolean = {
+ val resolver = SQLConf.get.resolver
+ names.exists(n => resolver(n, target))
+ }
+
+ // Build a Spark InternalRow for partition values matching partitionSchema's data types.
+ // Partition values from Paimon are returned in the table's partition-key order; we reorder
+ // them to match partitionSchema and cast strings/temporals into the requested types.
+ private def toPartitionRow(
+ paimonValues: Array[AnyRef],
+ partitionSchema: StructType,
+ partitionKeys: Seq[String],
+ timeZoneId: String): InternalRow = {
+ val resolver = SQLConf.get.resolver
+ val indexByName = partitionKeys.zipWithIndex.toMap
+ InternalRow.fromSeq(partitionSchema.fields.map { field =>
+ val idx = indexByName
+ .find { case (k, _) => resolver(k, field.name) }
+ .map(_._2)
+ .getOrElse(-1)
+ val raw = if (idx >= 0 && idx < paimonValues.length) paimonValues(idx) else null
+ val literal: Literal = raw match {
+ case null => Literal(null, field.dataType)
+ case v => Literal(v.toString)
+ }
+ Cast(literal, field.dataType, Option(timeZoneId)).eval()
+ })
+ }
+
+ private def collectSplits(partitions: Seq[InputPartition]): Option[Seq[DataSplit]] = {
+ val buf = scala.collection.mutable.ArrayBuffer.empty[DataSplit]
+ partitions.foreach { p =>
+ if (!isInstanceOfClass(p, PaimonInputPartitionClassName)) {
+ return None
+ }
+ val splits = invokeMethod(p, "splits") match {
+ case Some(s: scala.collection.Seq[_]) => s.toSeq
+ case _ => return None
+ }
+ splits.foreach {
+ case ds: DataSplit => buf += ds
+ case _: Split => return None
+ case _ => return None
+ }
+ }
+ Some(buf.toSeq)
+ }
+
+ private def paimonTable(scan: AnyRef): Option[FileStoreTable] = {
+ invokeMethod(scan, "table") match {
+ case Some(t: FileStoreTable) => Some(t)
+ case Some(other) =>
+ logDebug(s"Unexpected Paimon table type: ${other.getClass.getName}")
+ None
+ case None => None
+ }
+ }
+
+ // DSv2 BatchScanExec exposes input partitions via Scan.toBatch (preferred) or a method on
+ // the exec itself; the latter varies across Spark versions, so we attempt both.
+ // Returns Some(partitions) on success (possibly empty if the table is empty), or None when
+ // partition planning fails - the caller falls back to Spark execution on None.
+ private def inputPartitions(exec: BatchScanExec): Option[Seq[InputPartition]] = {
+ try {
+ val batch = exec.scan.toBatch
+ if (batch != null) {
+ val parts = batch.planInputPartitions()
+ if (parts != null) return Some(parts.toSeq)
+ logWarning("Paimon Scan.toBatch.planInputPartitions() returned null.")
+ return None
+ }
+ logWarning("Paimon Scan.toBatch returned null.")
+ } catch {
+ case NonFatal(t) =>
+ logWarning("Failed to plan Paimon input partitions via Scan.toBatch.", t)
+ return None
+ }
+
+ val methods = exec.getClass.getMethods
+ val m =
+ methods.find(_.getName == "inputPartitions").orElse(methods.find(_.getName == "partitions"))
+ if (m.isEmpty) {
+ logWarning(
+ "BatchScanExec exposes no inputPartitions/partitions method; cannot plan Paimon scan.")
+ return None
+ }
+ try {
+ m.map(_.invoke(exec)) match {
+ case Some(s: scala.collection.Seq[_])
+ if s.nonEmpty && s.head.isInstanceOf[scala.collection.Seq[_]] =>
+ Some(
+ s.asInstanceOf[scala.collection.Seq[scala.collection.Seq[InputPartition]]]
+ .flatten
+ .toSeq)
+ case Some(s: scala.collection.Seq[_]) =>
+ Some(s.asInstanceOf[scala.collection.Seq[InputPartition]].toSeq)
+ case other =>
+ logWarning(
+ s"Unexpected return type from BatchScanExec partitions method: ${other.getClass}.")
+ None
+ }
+ } catch {
+ case NonFatal(t) =>
+ logWarning("Failed to read Paimon input partitions via reflection.", t)
+ None
+ }
+ }
+
+ private def isInstanceOfClass(obj: AnyRef, className: String): Boolean = {
+ if (obj == null) return false
+ var c: Class[_] = obj.getClass
+ while (c != null) {
+ if (c.getName == className) return true
+ c.getInterfaces.foreach { i =>
+ if (i.getName == className) return true
+ }
+ c = c.getSuperclass
+ }
+ false
+ }
+
+ private def invokeMethod(target: AnyRef, methodName: String): Option[Any] = {
+ try {
+ var cls: Class[_] = target.getClass
+ while (cls != null) {
+ cls.getDeclaredMethods.find(m =>
+ m.getName == methodName && m.getParameterCount == 0) match {
+ case Some(m) =>
+ m.setAccessible(true)
+ return Some(m.invoke(target))
+ case None =>
+ cls = cls.getSuperclass
+ }
+ }
+ None
+ } catch {
+ case NonFatal(t) =>
+ logDebug(s"Failed to invoke $methodName on ${target.getClass.getName}", t)
+ None
+ }
+ }
+}
diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala
new file mode 100644
index 000000000..b1cefcb26
--- /dev/null
+++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePaimonV2TableScanExec.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.auron.plan
+
+import java.net.URI
+import java.security.PrivilegedExceptionAction
+import java.util.Locale
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Partition
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims}
+import org.apache.spark.sql.auron.paimon.{PaimonFile, PaimonScanPlan}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.hive.auron.paimon.PaimonUtil
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.auron.{protobuf => pb}
+import org.apache.auron.jni.JniBridge
+import org.apache.auron.metric.SparkMetricNode
+
+case class NativePaimonV2TableScanExec(basedScan: BatchScanExec, plan: PaimonScanPlan)
+ extends LeafExecNode
+ with NativeSupports
+ with Logging {
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ NativeHelper.getNativeFileScanMetrics(sparkContext) ++ Seq(
+ "numPartitions" -> SQLMetrics.createMetric(sparkContext, "Native.partitions_read"),
+ "numFiles" -> SQLMetrics.createMetric(sparkContext, "Native.files_read"))
+
+ override val output = basedScan.output
+ override val outputPartitioning = basedScan.outputPartitioning
+
+ private lazy val fileSchema: StructType = plan.fileSchema
+ private lazy val partitionSchema: StructType = plan.partitionSchema
+ private lazy val files: Seq[PaimonFile] = plan.files
+
+ private lazy val partitions: Array[FilePartition] = {
+ val filePartitions = buildFilePartitions()
+ postDriverMetrics(filePartitions)
+ filePartitions
+ }
+ private lazy val fileSizes: Map[String, Long] =
+ files.map(f => f.filePath -> f.fileSize).toMap
+
+ private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
+ private lazy val nativePartitionSchema: pb.Schema =
+ NativeConverters.convertSchema(partitionSchema)
+
+ // Project the output attributes onto the (fileSchema ++ partitionSchema) layout used by the
+ // native scan. Index lookup follows SQLConf.caseSensitiveAnalysis so that the projection
+ // remains correct under case-insensitive analysis (mirrors NativeIcebergTableScanExec).
+ private lazy val combinedSchema: StructType =
+ StructType(fileSchema.fields ++ partitionSchema.fields)
+
+ private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis
+
+ private lazy val fieldIndexByName: Map[String, Int] = {
+ if (caseSensitive) {
+ combinedSchema.fieldNames.zipWithIndex.toMap
+ } else {
+ combinedSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
+ }
+ }
+
+ private def fieldIndexFor(name: String): Int = {
+ if (caseSensitive) {
+ fieldIndexByName.getOrElse(name, combinedSchema.fieldIndex(name))
+ } else {
+ fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), combinedSchema.fieldIndex(name))
+ }
+ }
+
+ private lazy val projection: Seq[Integer] =
+ output.map(attr => Integer.valueOf(fieldIndexFor(attr.name)))
+
+ override def doExecuteNative(): NativeRDD = {
+ if (partitions.isEmpty) {
+ return new EmptyNativeRDD(sparkContext)
+ }
+
+ val nativeMetrics = SparkMetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+
+ val fileFormat = plan.fileFormat
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
+ val nativeFileGroups = this.nativeFileGroups
+ val nativeFileSchema = this.nativeFileSchema
+ val nativePartitionSchema = this.nativePartitionSchema
+ val projection = this.projection
+
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ None,
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ val resourceId = s"NativePaimonV2TableScan:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+ val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeFileScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
+
+ if (fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) {
+ val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList())
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(nativeOrcScanExecBuilder.build())
+ .build()
+ } else {
+ val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList())
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(nativeParquetScanExecBuilder.build())
+ .build()
+ }
+ },
+ friendlyName = "NativeRDD.PaimonV2Scan")
+ }
+
+ override val nodeName: String = "NativePaimonV2TableScan"
+
+ override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized
+
+ private lazy val nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) =>
+ {
+ val nativePartitionedFile = (file: PartitionedFile) => {
+ val filePath = file.filePath.toString
+ val size = fileSizes.getOrElse(filePath, file.length)
+ val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
+ NativeConverters
+ .convertExpr(
+ org.apache.spark.sql.catalyst.expressions
+ .Literal(file.partitionValues.get(index, field.dataType), field.dataType))
+ .getLiteral
+ }
+ pb.PartitionedFile
+ .newBuilder()
+ .setPath(filePath)
+ .setSize(size)
+ .setLastModifiedNs(0)
+ .addAllPartitionValues(nativePartitionValues.asJava)
+ .setRange(
+ pb.FileRange
+ .newBuilder()
+ .setStart(file.start)
+ .setEnd(file.start + file.length)
+ .build())
+ .build()
+ }
+ pb.FileGroup
+ .newBuilder()
+ .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
+ .build()
+ }
+
+ private def postDriverMetrics(filePartitions: Array[FilePartition]): Unit = {
+ val numPartitions = filePartitions.length
+ metrics("numPartitions").add(numPartitions)
+ val numFiles = filePartitions.foldLeft(0L)(_ + _.files.length)
+ metrics("numFiles").add(numFiles)
+ val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(
+ sparkContext,
+ executionId,
+ Seq(metrics("numPartitions"), metrics("numFiles")))
+ }
+
+ private def buildFilePartitions(): Array[FilePartition] = {
+ if (files.isEmpty) {
+ return Array.empty
+ }
+
+ val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession
+ val isSplitable =
+ plan.fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) ||
+ plan.fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)
+ val maxSplitBytes = getMaxSplitBytes(sparkSession, files)
+ val partitionedFiles = files
+ .flatMap { f =>
+ if (isSplitable) {
+ (0L until f.fileSize by maxSplitBytes).map { offset =>
+ val remaining = f.fileSize - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+ Shims.get.getPartitionedFile(f.partitionValues, f.filePath, offset, size)
+ }
+ } else {
+ Seq(Shims.get.getPartitionedFile(f.partitionValues, f.filePath, 0, f.fileSize))
+ }
+ }
+ .sortBy(_.length)(Ordering[Long].reverse)
+ .toSeq
+
+ FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray
+ }
+
+ private def getMaxSplitBytes(sparkSession: SparkSession, fs: Seq[PaimonFile]): Long = {
+ val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+ val minPartitionNum = Shims.get.getMinPartitionNum(sparkSession)
+ val totalBytes = fs.map(_.fileSize + openCostInBytes).sum
+ val bytesPerCore = if (minPartitionNum > 0) totalBytes / minPartitionNum else totalBytes
+
+ Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ }
+
+ private def putJniBridgeResource(
+ resourceId: String,
+ broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
+ val sharedConf = broadcastedHadoopConf.value.value
+ JniBridge.putResource(
+ resourceId,
+ (location: String) => {
+ val getFsTimeMetric = metrics("io_time_getfs")
+ val currentTimeMillis = System.currentTimeMillis()
+ val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
+ override def run(): FileSystem = FileSystem.get(new URI(location), sharedConf)
+ })
+ getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
+ fs
+ })
+ }
+
+ private def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
+ val sparkSession = Shims.get.getSqlContext(basedScan).sparkSession
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(Map.empty)
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ }
+}
diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
index 21b2ed03a..47a3f5e2e 100644
--- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
+++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.hive.auron.paimon
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.AuronConverters
import org.apache.spark.sql.auron.AuronConvertProvider
+import org.apache.spark.sql.auron.paimon.PaimonScanSupport
import org.apache.spark.sql.auron.util.AuronLogUtils
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.auron.plan.NativePaimonV2TableScanExec
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.execution.auron.plan.NativePaimonTableScanExec
@@ -32,6 +35,8 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging {
exec match {
case _: HiveTableScanExec =>
SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
+ case _: BatchScanExec =>
+ SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
case _ => false
}
@@ -43,6 +48,8 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging {
if e.relation.tableMeta.storage.serde.isDefined
&& e.relation.tableMeta.storage.serde.get.contains("Paimon") =>
true
+ case e: BatchScanExec =>
+ PaimonScanSupport.isPaimonScan(e) && PaimonScanSupport.plan(e).nonEmpty
case _ => false
}
}
@@ -50,6 +57,7 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging {
override def convert(exec: SparkPlan): SparkPlan = {
exec match {
case hiveExec: HiveTableScanExec => convertPaimonTableScanExec(hiveExec)
+ case batchScan: BatchScanExec => convertPaimonV2BatchScanExec(batchScan)
case _ => exec
}
}
@@ -72,4 +80,19 @@ class PaimonConvertProvider extends AuronConvertProvider with Logging {
AuronConverters.addRenameColumnsExec(NativePaimonTableScanExec(exec))
}
+
+ private def convertPaimonV2BatchScanExec(exec: BatchScanExec): SparkPlan = {
+ PaimonScanSupport.plan(exec) match {
+ case Some(plan) =>
+ AuronLogUtils.logDebugPlanConversion(
+ exec,
+ Seq(
+ "scan" -> exec.scan.getClass,
+ "output" -> exec.output,
+ "fileFormat" -> plan.fileFormat,
+ "numFiles" -> plan.files.size))
+ AuronConverters.addRenameColumnsExec(NativePaimonV2TableScanExec(exec, plan))
+ case None => exec
+ }
+ }
}
diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala
new file mode 100644
index 000000000..8f32b4f53
--- /dev/null
+++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.paimon
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.auron.plan.NativePaimonV2TableScanExec
+import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
+
+class AuronPaimonV2IntegrationSuite
+ extends org.apache.spark.sql.QueryTest
+ with BaseAuronPaimonSuite {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sql("create database if not exists paimon.db")
+ }
+
+ test("paimon v2 native scan runs simple COW select") {
+ withTable("paimon.db.t1") {
+ sql("create table paimon.db.t1 (id int, v string) using paimon")
+ sql("insert into paimon.db.t1 values (1, 'a'), (2, 'b')")
+ val df = sql("select * from paimon.db.t1")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ assertNativePaimonScanApplied(df)
+ }
+ }
+
+ test("paimon v2 native scan supports projection") {
+ withTable("paimon.db.t_proj") {
+ sql("create table paimon.db.t_proj (id int, v string) using paimon")
+ sql("insert into paimon.db.t_proj values (1, 'a'), (2, 'b')")
+ val df = sql("select id from paimon.db.t_proj")
+ checkAnswer(df, Seq(Row(1), Row(2)))
+ assertNativePaimonScanApplied(df)
+ }
+ }
+
+ test("paimon v2 native scan supports partitioned table with predicate") {
+ withTable("paimon.db.t_part") {
+ sql("""
+ |create table paimon.db.t_part (id int, v string, p string)
+ |using paimon
+ |partitioned by (p)
+ |""".stripMargin)
+ sql("insert into paimon.db.t_part values (1, 'a', 'p1'), (2, 'b', 'p2')")
+ val df = sql("select * from paimon.db.t_part where p = 'p1'")
+ checkAnswer(df, Seq(Row(1, "a", "p1")))
+ assertNativePaimonScanApplied(df)
+ }
+ }
+
+ test("paimon v2 native scan supports ORC COW table") {
+ withTable("paimon.db.t_orc") {
+ sql("""
+ |create table paimon.db.t_orc (id int, v string)
+ |using paimon
+ |tblproperties ('file.format' = 'orc')
+ |""".stripMargin)
+ sql("insert into paimon.db.t_orc values (1, 'a'), (2, 'b')")
+ val df = sql("select * from paimon.db.t_orc")
+ checkAnswer(df, Seq(Row(1, "a"), Row(2, "b")))
+ assertNativePaimonScanApplied(df)
+ }
+ }
+
+ test("paimon v2 native scan handles empty table") {
+ withTable("paimon.db.t_empty") {
+ sql("create table paimon.db.t_empty (id int, v string) using paimon")
+ val df = sql("select * from paimon.db.t_empty")
+ checkAnswer(df, Seq.empty)
+ assertNativePaimonScanApplied(df)
+ }
+ }
+
+ test("paimon v2 scan exposes file scan driver metrics") {
+ withTable("paimon.db.t_metrics") {
+ sql("create table paimon.db.t_metrics (id int, v string) using paimon")
+ sql("insert into paimon.db.t_metrics values (1, 'a')")
+ withSQLConf("spark.sql.adaptive.enabled" -> "false") {
+ val df = sql("select * from paimon.db.t_metrics")
+ val nativeScan = executedNativeScan(df)
+ val metricIds = Map(
+ "numPartitions" -> nativeScan.metrics("numPartitions").id,
+ "numFiles" -> nativeScan.metrics("numFiles").id)
+ val driverMetricUpdates = new ConcurrentLinkedQueue[(Long, Long)]()
+ val driverMetricUpdatesPosted = new CountDownLatch(1)
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case SparkListenerDriverAccumUpdates(_, updates) =>
+ updates.foreach { case (metricId, value) =>
+ driverMetricUpdates.add(metricId -> value)
+ }
+ val updatedMetricIds = driverMetricUpdates.iterator().asScala.map(_._1).toSet
+ if (metricIds.values.forall(updatedMetricIds.contains)) {
+ driverMetricUpdatesPosted.countDown()
+ }
+ case _ =>
+ }
+ }
+
+ spark.sparkContext.addSparkListener(listener)
+ try {
+ checkAnswer(df, Seq(Row(1, "a")))
+ assert(driverMetricUpdatesPosted.await(30, TimeUnit.SECONDS))
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
+ }
+
+ val driverMetricValues = driverMetricUpdates
+ .iterator()
+ .asScala
+ .toSeq
+ .groupBy(_._1)
+ .mapValues(_.map(_._2).sum)
+ .toMap
+ assert(driverMetricValues.getOrElse(metricIds("numPartitions"), 0L) > 0)
+ assert(driverMetricValues.getOrElse(metricIds("numFiles"), 0L) > 0)
+ }
+ }
+ }
+
+ test("paimon v2 native scan falls back when spark.auron.enable.paimon.scan=false") {
+ withTable("paimon.db.t_disable") {
+ sql("create table paimon.db.t_disable (id int, v string) using paimon")
+ sql("insert into paimon.db.t_disable values (1, 'a')")
+ withSQLConf("spark.auron.enable.paimon.scan" -> "false") {
+ val df = sql("select * from paimon.db.t_disable")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativePaimonV2TableScan"))
+ }
+ }
+ }
+
+ test("paimon v2 native scan falls back for MOR (merge-on-read) table") {
+ withTable("paimon.db.t_mor") {
+ sql("""
+ |create table paimon.db.t_mor (id int, v string)
+ |using paimon
+ |tblproperties (
+ | 'primary-key' = 'id',
+ | 'bucket' = '2'
+ |)
+ |""".stripMargin)
+ sql("insert into paimon.db.t_mor values (1, 'a'), (2, 'b')")
+ val df = sql("select * from paimon.db.t_mor")
+ df.collect()
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(!plan.contains("NativePaimonV2TableScan"))
+ }
+ }
+
+ private def assertNativePaimonScanApplied(df: DataFrame): Unit = {
+ val plan = df.queryExecution.executedPlan.toString()
+ assert(
+ plan.contains("NativePaimonV2TableScan"),
+ s"plan should use native paimon scan:\n$plan")
+ }
+
+ private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = {
+ val nativeScan = df.queryExecution.executedPlan.collectFirst {
+ case scan: NativePaimonV2TableScanExec => scan
+ }
+ assert(nativeScan.nonEmpty, "expected NativePaimonV2TableScanExec in executed plan")
+ nativeScan.get
+ }
+}
diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala
new file mode 100644
index 000000000..8f324d296
--- /dev/null
+++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/BaseAuronPaimonSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.paimon
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+trait BaseAuronPaimonSuite extends SharedSparkSession {
+
+ protected lazy val paimonWarehouse: String = {
+ // java.io.tmpdir is set to target/tmp in the parent pom, which may not exist yet
+ val tmpDir = Files.createDirectories(Paths.get(System.getProperty("java.io.tmpdir")))
+ val dir = Files.createTempDirectory(tmpDir, "auron-paimon-warehouse-").toFile
+ dir.deleteOnExit()
+ dir.getAbsolutePath
+ }
+
+ override protected def sparkConf: SparkConf = {
+ val extraJavaOptions =
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+ "-Dio.netty.tryReflectionSetAccessible=true"
+ super.sparkConf
+ .set(
+ "spark.sql.extensions",
+ "org.apache.spark.sql.auron.AuronSparkSessionExtension," +
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .set("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
+ .set("spark.sql.catalog.paimon.warehouse", s"file:$paimonWarehouse")
+ .set("spark.auron.enabled", "true")
+ .set("spark.auron.enable.paimon.scan", "true")
+ .set(
+ "spark.shuffle.manager",
+ "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager")
+ .set("spark.auron.enable.shuffleExchange", "true")
+ .set("spark.auron.enable.project", "false")
+ .set("spark.auron.ui.enabled", "false")
+ .set("spark.ui.enabled", "false")
+ .set("spark.driver.extraJavaOptions", extraJavaOptions)
+ .set("spark.executor.extraJavaOptions", extraJavaOptions)
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ deleteRecursively(new File(paimonWarehouse))
+ }
+ }
+
+ private def deleteRecursively(file: File): Unit = {
+ if (file.isDirectory) {
+ val children = file.listFiles()
+ if (children != null) children.foreach(deleteRecursively)
+ }
+ file.delete()
+ }
+}