diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 9b376837f7..174992eb34 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -802,6 +802,23 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val DATETIME_ENGINE_RUST = "rust" + val DATETIME_ENGINE_JAVA = "java" + + val COMET_DATETIME_ENGINE: ConfigEntry[String] = + conf("spark.comet.exec.datetime.engine") + .category(CATEGORY_EXEC) + .doc( + "Selects the engine used to evaluate supported date/time expressions. " + + s"`$DATETIME_ENGINE_RUST` uses the native DataFusion datetime engine. " + + s"`$DATETIME_ENGINE_JAVA` is experimental and routes through a JVM-side " + + "UDF for bit-exact Spark semantics, at the cost of JNI roundtrips per " + + "batch. Expressions routed when set to java: hour, minute, and second.") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set(DATETIME_ENGINE_RUST, DATETIME_ENGINE_JAVA)) + .createWithDefault(DATETIME_ENGINE_RUST) + val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] = conf("spark.comet.metrics.updateInterval") .category(CATEGORY_EXEC) diff --git a/common/src/main/scala/org/apache/comet/udf/DateTimeFieldUDF.scala b/common/src/main/scala/org/apache/comet/udf/DateTimeFieldUDF.scala new file mode 100644 index 0000000000..dc97f0c20c --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/DateTimeFieldUDF.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets +import java.time.{Instant, LocalDateTime, ZoneId, ZoneOffset} + +import org.apache.arrow.vector.{IntVector, TimeStampMicroTZVector, TimeStampMicroVector, ValueVector, VarCharVector} + +import org.apache.comet.CometArrowAllocator + +/** + * Shared logic for date/time field-extraction UDFs (hour/minute/second). + * + * Inputs: + * - inputs(0): TimeStampMicroTZVector (for TimestampType) or TimeStampMicroVector (for + * TimestampNTZType) holding the timestamp column. + * - inputs(1): VarCharVector, length-1, holding the session timezone id. Used only for + * TimestampType. NTZ ignores it (wall-clock semantics). + * + * Output: IntVector of length `numRows` holding the extracted field. Null timestamps produce null + * output. + */ +abstract class DateTimeFieldUDF extends CometUDF { + + protected def extract(dt: LocalDateTime): Int + + override def evaluate(inputs: Array[ValueVector], numRows: Int): ValueVector = { + require( + inputs.length == 2, + s"${getClass.getSimpleName} expects 2 inputs, got ${inputs.length}") + val tsCol = inputs(0) + val tzVec = inputs(1).asInstanceOf[VarCharVector] + require( + tzVec.getValueCount >= 1 && !tzVec.isNull(0), + s"${getClass.getSimpleName} requires a non-null scalar timezone") + val zone = ZoneId.of(new String(tzVec.get(0), StandardCharsets.UTF_8)) + + val out = new IntVector(s"${getClass.getSimpleName}_result", CometArrowAllocator) + out.allocateNew(numRows) + + tsCol match { + case tz: TimeStampMicroTZVector => + var i = 0 + while (i < numRows) { + if (tz.isNull(i)) { + out.setNull(i) + } else { + val micros = tz.get(i) + val instant = Instant.ofEpochSecond( + Math.floorDiv(micros, 1000000L), + Math.floorMod(micros, 1000000L) * 1000L) + out.set(i, extract(LocalDateTime.ofInstant(instant, zone))) + } + i += 1 + } + case ntz: TimeStampMicroVector => + var i = 0 + while (i < numRows) { + if (ntz.isNull(i)) { + out.setNull(i) + } else { + val micros = ntz.get(i) + val seconds = Math.floorDiv(micros, 1000000L) + val nanos = (Math.floorMod(micros, 1000000L) * 1000L).toInt + val dt = LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC) + out.set(i, extract(dt)) + } + i += 1 + } + case other => + throw new IllegalArgumentException( + s"${getClass.getSimpleName}: unsupported timestamp vector type: " + + other.getClass.getName) + } + out.setValueCount(numRows) + out + } +} diff --git a/common/src/main/scala/org/apache/comet/udf/HourUDF.scala b/common/src/main/scala/org/apache/comet/udf/HourUDF.scala new file mode 100644 index 0000000000..16519f72e2 --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/HourUDF.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.time.LocalDateTime + +class HourUDF extends DateTimeFieldUDF { + override protected def extract(dt: LocalDateTime): Int = dt.getHour +} diff --git a/common/src/main/scala/org/apache/comet/udf/MinuteUDF.scala b/common/src/main/scala/org/apache/comet/udf/MinuteUDF.scala new file mode 100644 index 0000000000..ffaeae59f3 --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/MinuteUDF.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.time.LocalDateTime + +class MinuteUDF extends DateTimeFieldUDF { + override protected def extract(dt: LocalDateTime): Int = dt.getMinute +} diff --git a/common/src/main/scala/org/apache/comet/udf/SecondUDF.scala b/common/src/main/scala/org/apache/comet/udf/SecondUDF.scala new file mode 100644 index 0000000000..42ff4d452d --- /dev/null +++ b/common/src/main/scala/org/apache/comet/udf/SecondUDF.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.time.LocalDateTime + +class SecondUDF extends DateTimeFieldUDF { + override protected def extract(dt: LocalDateTime): Int = dt.getSecond +} diff --git a/docs/source/user-guide/latest/compatibility/expressions/datetime.md b/docs/source/user-guide/latest/compatibility/expressions/datetime.md index afd934dc04..14f975cb33 100644 --- a/docs/source/user-guide/latest/compatibility/expressions/datetime.md +++ b/docs/source/user-guide/latest/compatibility/expressions/datetime.md @@ -26,6 +26,24 @@ under the License. timezone is UTC. TimestampNTZ inputs are handled correctly (timezone-independent truncation). [#2649](https://github.com/apache/datafusion-comet/issues/2649) +## Engine Selection (experimental) + +Comet supports two engines for evaluating date/time field-extraction expressions +(`hour`, `minute`, `second`). The choice is governed by +`spark.comet.exec.datetime.engine`: + +- `rust` (default): native DataFusion implementations. Fastest, but applies timezone + conversion to `TimestampNTZ` inputs, which differs from Spark's semantics + ([#3180](https://github.com/apache/datafusion-comet/issues/3180)). Comet falls back + to Spark for incompatible cases by default. +- `java` (experimental): routes the affected expressions through a JVM-side UDF that + uses `java.time` directly, producing bit-exact Spark results for all input types. + Incurs a JNI round-trip per native batch. + +This is a prototype scoped to `hour`, `minute`, and `second`. The model is expected +to extend to other date/time expressions in follow-up work +([#4311](https://github.com/apache/datafusion-comet/issues/4311)). + ## Date and Time Functions Comet's native implementation of date and time functions may produce different results than Spark for dates diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index cb3be75717..7158a323c1 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.serde.CometGetDateField.CometGetDateField @@ -178,6 +179,36 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } } +private object DateTimeFieldUdfHelper { + def buildProto( + child: org.apache.spark.sql.catalyst.expressions.Expression, + timeZoneId: Option[String], + nullable: Boolean, + udfClassName: String, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childProto = exprToProtoInternal(child, inputs, binding).getOrElse(return None) + val timeZone = timeZoneId.getOrElse("UTC") + val tzProto = + exprToProtoInternal(Literal(UTF8String.fromString(timeZone), StringType), inputs, binding) + .getOrElse(return None) + val returnType = serializeDataType(IntegerType).getOrElse(return None) + Some( + ExprOuterClass.Expr + .newBuilder() + .setJvmScalarUdf( + ExprOuterClass.JvmScalarUdf + .newBuilder() + .setClassName(udfClassName) + .addArgs(childProto) + .addArgs(tzProto) + .setReturnType(returnType) + .setReturnNullable(nullable) + .build()) + .build()) + } +} + object CometHour extends CometExpressionSerde[Hour] { val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" + @@ -186,11 +217,13 @@ object CometHour extends CometExpressionSerde[Hour] { override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) override def getSupportLevel(expr: Hour): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + expr.child.dataType match { + case TimestampType | TimestampNTZType => Compatible() + case other => Unsupported(Some(s"engine=java does not support input type: $other")) + } + } else if (expr.child.dataType == TimestampNTZType) { + Incompatible(Some(incompatReason)) } else { Compatible() } @@ -200,6 +233,23 @@ object CometHour extends CometExpressionSerde[Hour] { expr: Hour, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + DateTimeFieldUdfHelper.buildProto( + expr.child, + expr.timeZoneId, + expr.nullable, + "org.apache.comet.udf.HourUDF", + inputs, + binding) + } else { + convertViaNative(expr, inputs, binding) + } + } + + private def convertViaNative( + expr: Hour, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { val childExpr = exprToProtoInternal(expr.child, inputs, binding) if (childExpr.isDefined) { @@ -223,16 +273,19 @@ object CometHour extends CometExpressionSerde[Hour] { object CometMinute extends CometExpressionSerde[Minute] { - override def getIncompatibleReasons(): Seq[String] = Seq( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)") + val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)" + + override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) override def getSupportLevel(expr: Minute): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + expr.child.dataType match { + case TimestampType | TimestampNTZType => Compatible() + case other => Unsupported(Some(s"engine=java does not support input type: $other")) + } + } else if (expr.child.dataType == TimestampNTZType) { + Incompatible(Some(incompatReason)) } else { Compatible() } @@ -242,6 +295,23 @@ object CometMinute extends CometExpressionSerde[Minute] { expr: Minute, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + DateTimeFieldUdfHelper.buildProto( + expr.child, + expr.timeZoneId, + expr.nullable, + "org.apache.comet.udf.MinuteUDF", + inputs, + binding) + } else { + convertViaNative(expr, inputs, binding) + } + } + + private def convertViaNative( + expr: Minute, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { val childExpr = exprToProtoInternal(expr.child, inputs, binding) if (childExpr.isDefined) { @@ -265,16 +335,19 @@ object CometMinute extends CometExpressionSerde[Minute] { object CometSecond extends CometExpressionSerde[Second] { - override def getIncompatibleReasons(): Seq[String] = Seq( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)") + val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)" + + override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) override def getSupportLevel(expr: Second): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + expr.child.dataType match { + case TimestampType | TimestampNTZType => Compatible() + case other => Unsupported(Some(s"engine=java does not support input type: $other")) + } + } else if (expr.child.dataType == TimestampNTZType) { + Incompatible(Some(incompatReason)) } else { Compatible() } @@ -284,6 +357,23 @@ object CometSecond extends CometExpressionSerde[Second] { expr: Second, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { + if (CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_JAVA) { + DateTimeFieldUdfHelper.buildProto( + expr.child, + expr.timeZoneId, + expr.nullable, + "org.apache.comet.udf.SecondUDF", + inputs, + binding) + } else { + convertViaNative(expr, inputs, binding) + } + } + + private def convertViaNative( + expr: Second, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { val childExpr = exprToProtoInternal(expr.child, inputs, binding) if (childExpr.isDefined) { diff --git a/spark/src/test/scala/org/apache/comet/CometDatetimeEngineDefaultSuite.scala b/spark/src/test/scala/org/apache/comet/CometDatetimeEngineDefaultSuite.scala new file mode 100644 index 0000000000..f5aa413039 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometDatetimeEngineDefaultSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class CometDatetimeEngineDefaultSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + test("default engine is rust: TimestampType hour still runs natively") { + assert(CometConf.COMET_DATETIME_ENGINE.get() == CometConf.DATETIME_ENGINE_RUST) + withTable("t") { + sql("CREATE TABLE t (ts TIMESTAMP) USING parquet") + sql("INSERT INTO t VALUES TIMESTAMP'2024-06-15 12:34:56 UTC'") + checkSparkAnswerAndOperator("SELECT hour(ts), minute(ts), second(ts) FROM t") + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometDatetimeJvmSuite.scala b/spark/src/test/scala/org/apache/comet/CometDatetimeJvmSuite.scala new file mode 100644 index 0000000000..fdff2c5a52 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometDatetimeJvmSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.CometProjectExec +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf + +class CometDatetimeJvmSuite extends CometTestBase with AdaptiveSparkPlanHelper { + + override protected def sparkConf: SparkConf = + super.sparkConf.set(CometConf.COMET_DATETIME_ENGINE.key, CometConf.DATETIME_ENGINE_JAVA) + + private val crossTimezones = Seq("UTC", "America/Los_Angeles", "Asia/Tokyo") + + test("hour: TimestampNTZ produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts_ntz TIMESTAMP_NTZ) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP_NTZ'2024-06-15 12:34:56', + | TIMESTAMP_NTZ'2024-01-01 00:00:00', + | TIMESTAMP_NTZ'2024-12-31 23:59:59', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT hour(ts_ntz) FROM t ORDER BY ts_ntz") + } + } + } + } + + test("hour: TimestampType produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts TIMESTAMP) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP'2024-06-15 12:34:56 UTC', + | TIMESTAMP'2024-01-01 00:00:00 UTC', + | TIMESTAMP'2024-12-31 23:59:59 UTC', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT hour(ts) FROM t ORDER BY ts") + } + } + } + } + + test("hour: engine=java causes the plan to use JvmScalarUdf path") { + withTable("t") { + sql("CREATE TABLE t (ts_ntz TIMESTAMP_NTZ) USING parquet") + sql("INSERT INTO t VALUES TIMESTAMP_NTZ'2024-06-15 12:34:56'") + val df = sql("SELECT hour(ts_ntz) FROM t") + checkSparkAnswerAndOperator(df) + val plan = df.queryExecution.executedPlan + assert( + collect(plan) { case p: CometProjectExec => p }.nonEmpty, + s"Expected CometProjectExec (native execution via JVM UDF) in:\n$plan") + } + } + + test("minute: TimestampNTZ produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts_ntz TIMESTAMP_NTZ) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP_NTZ'2024-06-15 12:34:56', + | TIMESTAMP_NTZ'2024-01-01 00:00:00', + | TIMESTAMP_NTZ'2024-12-31 23:59:59', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT minute(ts_ntz) FROM t ORDER BY ts_ntz") + } + } + } + } + + test("minute: TimestampType produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts TIMESTAMP) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP'2024-06-15 12:34:56 UTC', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT minute(ts) FROM t ORDER BY ts") + } + } + } + } + + test("second: TimestampNTZ produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts_ntz TIMESTAMP_NTZ) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP_NTZ'2024-06-15 12:34:56', + | TIMESTAMP_NTZ'2024-01-01 00:00:00', + | TIMESTAMP_NTZ'2024-12-31 23:59:59', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT second(ts_ntz) FROM t ORDER BY ts_ntz") + } + } + } + } + + test("second: TimestampType produces Spark-compatible results in all session timezones") { + withTable("t") { + sql("CREATE TABLE t (ts TIMESTAMP) USING parquet") + sql("""INSERT INTO t VALUES + | TIMESTAMP'2024-06-15 12:34:56 UTC', + | (NULL)""".stripMargin) + for (tz <- crossTimezones) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + checkSparkAnswerAndOperator("SELECT second(ts) FROM t ORDER BY ts") + } + } + } + } + + test("hour/minute/second: DST fall-back boundary in America/Los_Angeles") { + withTable("t") { + sql("CREATE TABLE t (ts TIMESTAMP) USING parquet") + // 2024-11-03 09:30:00 UTC is in the middle of the LA fall-back + // (clocks moved 02:00 -> 01:00 local at 09:00 UTC). + sql("""INSERT INTO t VALUES + | TIMESTAMP'2024-11-03 08:30:00 UTC', + | TIMESTAMP'2024-11-03 09:30:00 UTC', + | TIMESTAMP'2024-11-03 10:30:00 UTC'""".stripMargin) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + checkSparkAnswerAndOperator("SELECT hour(ts), minute(ts), second(ts) FROM t ORDER BY ts") + } + } + } +} diff --git a/spark/src/test/scala/org/apache/comet/udf/DateTimeFieldUDFSuite.scala b/spark/src/test/scala/org/apache/comet/udf/DateTimeFieldUDFSuite.scala new file mode 100644 index 0000000000..104bdaa865 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/udf/DateTimeFieldUDFSuite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.udf + +import java.nio.charset.StandardCharsets + +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.arrow.vector.{IntVector, TimeStampMicroTZVector, TimeStampMicroVector, VarCharVector} +import org.apache.arrow.vector.types.TimeUnit +import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} + +import org.apache.comet.CometArrowAllocator + +class DateTimeFieldUDFSuite extends AnyFunSuite { + + private def tzVector(tz: String): VarCharVector = { + val v = new VarCharVector("tz", CometArrowAllocator) + v.allocateNew(1) + v.setSafe(0, tz.getBytes(StandardCharsets.UTF_8)) + v.setValueCount(1) + v + } + + private def tsTzVector(tz: String, micros: Array[java.lang.Long]): TimeStampMicroTZVector = { + val fieldType = new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, tz), null) + val v = new TimeStampMicroTZVector("ts", fieldType, CometArrowAllocator) + v.allocateNew(micros.length) + for (i <- micros.indices) { + if (micros(i) == null) v.setNull(i) else v.set(i, micros(i)) + } + v.setValueCount(micros.length) + v + } + + private def tsNtzVector(micros: Array[java.lang.Long]): TimeStampMicroVector = { + val v = new TimeStampMicroVector("ts_ntz", CometArrowAllocator) + v.allocateNew(micros.length) + for (i <- micros.indices) { + if (micros(i) == null) v.setNull(i) else v.set(i, micros(i)) + } + v.setValueCount(micros.length) + v + } + + test("HourUDF on TimestampType in UTC returns the UTC hour") { + // 2024-06-15 12:34:56 UTC = 1718454896 seconds = 1718454896000000 micros + val micros = 1718454896000000L + val ts = tsTzVector("UTC", Array[java.lang.Long](micros)) + val tz = tzVector("UTC") + val udf = new HourUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.getValueCount == 1) + assert(out.get(0) == 12) + } + + test("HourUDF on TimestampType in America/Los_Angeles applies zone shift") { + val micros = 1718454896000000L // 2024-06-15 12:34:56 UTC = 05:34:56 PDT + val ts = tsTzVector("UTC", Array[java.lang.Long](micros)) + val tz = tzVector("America/Los_Angeles") + val udf = new HourUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 5) + } + + test("HourUDF on TimestampNTZType ignores the timezone arg") { + // 2024-06-15 12:34:56 (NTZ wall-clock) = 1718454896000000 micros + val micros = 1718454896000000L + val ts = tsNtzVector(Array[java.lang.Long](micros)) + val tz = tzVector("Asia/Tokyo") // should NOT shift + val udf = new HourUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 12) + } + + test("HourUDF preserves nulls") { + val ts = tsNtzVector(Array[java.lang.Long](null, 1718454896000000L, null)) + val tz = tzVector("UTC") + val udf = new HourUDF + val out = udf.evaluate(Array(ts, tz), 3).asInstanceOf[IntVector] + assert(out.isNull(0)) + assert(out.get(1) == 12) + assert(out.isNull(2)) + } + + test("MinuteUDF on TimestampType in UTC") { + val micros = 1718454896000000L // 2024-06-15 12:34:56 UTC + val ts = tsTzVector("UTC", Array[java.lang.Long](micros)) + val tz = tzVector("UTC") + val udf = new MinuteUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 34) + } + + test("MinuteUDF on TimestampNTZType") { + val micros = 1718454896000000L // 2024-06-15 12:34:56 NTZ + val ts = tsNtzVector(Array[java.lang.Long](micros)) + val tz = tzVector("UTC") + val udf = new MinuteUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 34) + } + + test("SecondUDF on TimestampType in UTC") { + val micros = 1718454896000000L // 2024-06-15 12:34:56 UTC + val ts = tsTzVector("UTC", Array[java.lang.Long](micros)) + val tz = tzVector("UTC") + val udf = new SecondUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 56) + } + + test("SecondUDF on TimestampNTZType") { + val micros = 1718454896000000L // 2024-06-15 12:34:56 NTZ + val ts = tsNtzVector(Array[java.lang.Long](micros)) + val tz = tzVector("UTC") + val udf = new SecondUDF + val out = udf.evaluate(Array(ts, tz), 1).asInstanceOf[IntVector] + assert(out.get(0) == 56) + } +}