diff --git a/rust/lance/src/io/exec/fts.rs b/rust/lance/src/io/exec/fts.rs index 240fc70bebc..0d4c602ffa8 100644 --- a/rust/lance/src/io/exec/fts.rs +++ b/rust/lance/src/io/exec/fts.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow::array::{AsArray, BooleanBuilder}; use arrow::datatypes::{Float32Type, UInt64Type}; @@ -19,7 +19,8 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use arrow_schema::SortOptions; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion_physical_plan::metrics::{BaselineMetrics, Count}; use futures::future::try_join_all; @@ -189,6 +190,34 @@ impl MetricsCollector for FtsIndexMetrics { } } +/// Plan properties shared by the indexed FTS execs ([`MatchQueryExec`], +/// [`PhraseQueryExec`], [`BoostQueryExec`], [`BooleanQueryExec`]). Each runs a +/// complete search and emits its results sorted by `_score` descending, so the +/// ordering is declared here to let DataFusion elide a downstream sort. +static FTS_SORTED_PROPERTIES: LazyLock> = LazyLock::new(|| { + let score_ordering = PhysicalSortExpr::new( + Arc::new(Column::new(SCORE_COL, 1)), + SortOptions { descending: true, nulls_first: true }, + ); + Arc::new(PlanProperties::new( + EquivalenceProperties::new_with_orderings(FTS_SCHEMA.clone(), [[score_ordering]]), + Partitioning::RoundRobinBatch(1), + EmissionType::Final, + Boundedness::Bounded, + )) +}); + +/// Plan properties for the brute-force [`FlatMatchQueryExec`], which streams +/// matches incrementally and unsorted, so no score ordering is declared. +static FTS_STREAMING_PROPERTIES: LazyLock> = LazyLock::new(|| { + Arc::new(PlanProperties::new( + EquivalenceProperties::new(FTS_SCHEMA.clone()), + Partitioning::RoundRobinBatch(1), + EmissionType::Incremental, + Boundedness::Bounded, + )) +}); + #[derive(Debug)] pub struct MatchQueryExec { dataset: Arc, @@ -245,12 +274,7 @@ impl MatchQueryExec { params: FtsSearchParams, prefilter_source: PreFilterSource, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); let params = Self::effective_params(&query, params); Self { dataset, @@ -280,12 +304,7 @@ impl MatchQueryExec { prefilter_source: PreFilterSource, segments: Vec, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); let params = Self::effective_params(&query, params); Self { dataset, @@ -873,12 +892,7 @@ impl FlatMatchQueryExec { params: FtsSearchParams, unindexed_input: Arc, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Incremental, - Boundedness::Bounded, - )); + let properties = FTS_STREAMING_PROPERTIES.clone(); Self { dataset, query, @@ -899,12 +913,7 @@ impl FlatMatchQueryExec { unindexed_input: Arc, segments: Vec, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Incremental, - Boundedness::Bounded, - )); + let properties = FTS_STREAMING_PROPERTIES.clone(); Self { dataset, query, @@ -1122,12 +1131,7 @@ impl PhraseQueryExec { mut params: FtsSearchParams, prefilter_source: PreFilterSource, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); params = params.with_phrase_slop(Some(query.slop)); Self { @@ -1150,12 +1154,7 @@ impl PhraseQueryExec { prefilter_source: PreFilterSource, segments: Vec, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); params = params.with_phrase_slop(Some(query.slop)); Self { @@ -1417,12 +1416,7 @@ impl BoostQueryExec { positive: Arc, negative: Arc, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); Self { query, params, @@ -1680,12 +1674,7 @@ impl BooleanQueryExec { must: Option>, must_not: Arc, ) -> Self { - let properties = Arc::new(PlanProperties::new( - EquivalenceProperties::new(FTS_SCHEMA.clone()), - Partitioning::RoundRobinBatch(1), - EmissionType::Final, - Boundedness::Bounded, - )); + let properties = FTS_SORTED_PROPERTIES.clone(); Self { query, params, diff --git a/rust/lance/src/io/exec/take.rs b/rust/lance/src/io/exec/take.rs index 977a9c88dce..2943b63c5cc 100644 --- a/rust/lance/src/io/exec/take.rs +++ b/rust/lance/src/io/exec/take.rs @@ -20,7 +20,9 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, }; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::projection::ProjectionMapping; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use futures::FutureExt; use futures::stream::{FuturesOrdered, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; @@ -477,12 +479,31 @@ impl TakeExec { &projection, )); let output_arrow = Arc::new(ArrowSchema::from(output_schema.as_ref())); + // Propagate input ordering through the schema change. TakeExec always + // places input fields first in the same order, so input field at index i + // maps to output field at index i. New dataset fields appended at the end + // have no ordering and are simply not included in the mapping. + let input_schema_ref = input.schema(); + let mapping_exprs = input_schema_ref.fields().iter().enumerate().map(|(i, f)| { + ( + Arc::new(Column::new(f.name(), i)) as Arc, + f.name().to_string(), + ) + }); + let eq_props = ProjectionMapping::try_new(mapping_exprs, &input_schema_ref) + .map(|m| { + input + .properties() + .equivalence_properties() + .project(&m, output_arrow.clone()) + }) + .unwrap_or_else(|_| EquivalenceProperties::new(output_arrow.clone())); let properties = Arc::new( input .properties() .as_ref() .clone() - .with_eq_properties(EquivalenceProperties::new(output_arrow.clone())), + .with_eq_properties(eq_props), ); Ok(Some(Self {