Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/ffi/src/physical_expr/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl From<&Partitioning> for FFI_Partitioning {
.collect();
Self::Hash(exprs, *size)
}
// FFI does not yet expose expression partition metadata.
Partitioning::Expr(expr) => Self::UnknownPartitioning(expr.partition_count()),
Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size),
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub use analysis::{AnalysisContext, ExprBoundaries, analyze};
pub use equivalence::{
AcrossPartitions, ConstExpr, EquivalenceProperties, calculate_union,
};
pub use partitioning::{Distribution, Partitioning};
pub use partitioning::{Distribution, ExprPartitioning, Partitioning};
pub use physical_expr::{
add_offset_to_expr, add_offset_to_physical_sort_exprs, create_lex_ordering,
create_ordering, create_physical_sort_expr, create_physical_sort_exprs,
Expand Down
180 changes: 168 additions & 12 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub enum Partitioning {
/// Allocate rows based on a hash of one of more expressions and the specified number of
/// partitions
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Partition rows by source-declared expression domains
Expr(ExprPartitioning),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}
Expand All @@ -133,13 +135,94 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
Partitioning::Expr(expr) => write!(f, "{expr}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}

/// Physical expression partitioning.
///
/// Partition `i` contains rows where `partition_exprs[i]` evaluates to true.
/// The source declaring partitioning is responsible for ensuring that, for every
/// row emitted, exactly one partition expression evaluates to true and that row
/// is emitted by the corresponding partition. The expressions do not need to
/// cover values that the plan cannot emit.
///
/// For example, a scan that can only emit rows for `2022` can declare two date
/// partitions as:
///
/// ```text
/// partition_exprs[0] = date >= 2022-01-01 AND date < 2022-07-01
/// partition_exprs[1] = date >= 2022-07-01 AND date < 2023-01-01
/// ```
///
/// This is valid even though values outside `2022` are not covered, as long as
/// the source does not emit rows outside those ranges. It would not be valid
/// for this plan to emit a row from `partition[i]` whose date is not within
/// `partition_exprs[i]`, or to emit a row whose date matches multiple
/// partition expressions.
///
/// More complex partitioning can be represented using normal expression
/// composition. For example, one partition in a date and city range can be
/// represented as `date >= 2021-01-01 AND date < 2022-07-01 AND city < 'Boston'`.
///
/// NOTE: Optimizer and execution behavior for this partitioning is intentionally
/// not implemented and will be introduced incrementally.
#[derive(Debug, Clone)]
pub struct ExprPartitioning {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as Range Partitioning https://www.waitingforcode.com/apache-spark-sql/range-partitioning-apache-spark-sql/read#range_partitioning

Wouldn't it be better to use that naming?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see the issue already refers to it as range partititioning. Any reason of why not using the terminology here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that we aim to be more flexible here. This can support Range partitioning but also extens beyond that to any physical expr the source wants to provide. I just gave range in the description as one concrete example of how this could be used.

Someone could partition using this scheme on something like city column where:

partition 1 -> city = "New York"
partition 2 -> city = "London"

and so on.

partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
}

impl ExprPartitioning {
/// Creates expression partitioning metadata from one predicate expression
/// per partition.
pub fn new(partition_exprs: Vec<Arc<dyn PhysicalExpr>>) -> Self {
Self { partition_exprs }
}

/// Returns the partition predicate expressions.
pub fn partition_exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
&self.partition_exprs
}

/// Returns the number of partitions.
pub fn partition_count(&self) -> usize {
self.partition_exprs.len()
}

fn project(
&self,
mapping: &ProjectionMapping,
input_eq_properties: &EquivalenceProperties,
) -> Option<Self> {
let partition_exprs = input_eq_properties
.project_expressions(&self.partition_exprs, mapping)
.collect::<Option<Vec<_>>>()?;

Some(Self { partition_exprs })
}
}

impl Display for ExprPartitioning {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Expr({}, {})",
format_physical_expr_list(&self.partition_exprs),
self.partition_count()
)
}
}

impl PartialEq for ExprPartitioning {
fn eq(&self, other: &Self) -> bool {
physical_exprs_equal(&self.partition_exprs, &other.partition_exprs)
}
}

/// Represents how a [`Partitioning`] satisfies a [`Distribution`] requirement.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitioningSatisfaction {
Expand Down Expand Up @@ -167,6 +250,7 @@ impl Partitioning {
use Partitioning::*;
match self {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
Expr(expr) => expr.partition_count(),
}
}

Expand Down Expand Up @@ -277,19 +361,27 @@ impl Partitioning {
mapping: &ProjectionMapping,
input_eq_properties: &EquivalenceProperties,
) -> Self {
if let Partitioning::Hash(exprs, part) = self {
let normalized_exprs = input_eq_properties
.project_expressions(exprs, mapping)
.zip(exprs)
.map(|(proj_expr, expr)| {
proj_expr.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
match self {
Partitioning::Hash(exprs, part) => {
let normalized_exprs = input_eq_properties
.project_expressions(exprs, mapping)
.zip(exprs)
.map(|(proj_expr, expr)| {
proj_expr.unwrap_or_else(|| {
Arc::new(UnKnownColumn::new(&expr.to_string()))
})
})
})
.collect();
Partitioning::Hash(normalized_exprs, *part)
} else {
self.clone()
.collect();
Partitioning::Hash(normalized_exprs, *part)
}
Partitioning::Expr(expr) => {
if let Some(projected) = expr.project(mapping, input_eq_properties) {
Partitioning::Expr(projected)
} else {
Partitioning::UnknownPartitioning(expr.partition_count())
}
}
_ => self.clone(),
}
}
}
Expand All @@ -306,6 +398,7 @@ impl PartialEq for Partitioning {
{
true
}
(Partitioning::Expr(left), Partitioning::Expr(right)) => left == right,
_ => false,
}
}
Expand Down Expand Up @@ -845,4 +938,67 @@ mod tests {

Ok(())
}

#[test]
fn test_expr_partitioning_metadata() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);

let expr_partitioning =
ExprPartitioning::new(vec![Arc::clone(&col_a), Arc::clone(&col_a)]);
let partitioning = Partitioning::Expr(expr_partitioning);

assert_eq!(partitioning.partition_count(), 2);
assert_eq!(partitioning.to_string(), "Expr([a@0, a@0], 2)");

Ok(())
}

#[test]
fn test_expr_partitioning_project_degrades_when_expr_dropped() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]));
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);
let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let projection_mapping = ProjectionMapping::from_indices(&[0], &schema)?;
let expr_partitioning = Partitioning::Expr(ExprPartitioning::new(vec![col_b]));

let projected = expr_partitioning.project(&projection_mapping, &eq_properties);
let Partitioning::UnknownPartitioning(partition_count) = projected else {
panic!("expected UnknownPartitioning, got {projected:?}");
};
assert_eq!(partition_count, 1);

Ok(())
}

#[test]
fn test_multi_partition_expr_does_not_satisfy_hash_distribution() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
]));
let col_a: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("a", &schema)?);
let col_b: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema("b", &schema)?);

let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
let expr_partitioning = Partitioning::Expr(ExprPartitioning::new(vec![
Arc::clone(&col_a),
Arc::clone(&col_b),
]));
let required = Distribution::HashPartitioned(vec![col_a, col_b]);

assert_eq!(
expr_partitioning.satisfaction(&required, &eq_properties, false),
PartitioningSatisfaction::NotSatisfied
);

Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ pub fn adjust_right_output_partitioning(
.collect::<Result<_>>()?;
Partitioning::Hash(new_exprs, *size)
}
Partitioning::Expr(_) => {
return not_impl_err!(
"Expression partitioning is not supported for join output partitioning"
);
}
result => result.clone(),
};
Ok(result)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use datafusion_expr::{Accumulator, ColumnarValue};
use datafusion_physical_expr::PhysicalSortExpr;
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
Distribution, Partitioning, PhysicalExpr, expressions,
Distribution, ExprPartitioning, Partitioning, PhysicalExpr, expressions,
};

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
Expand Down
54 changes: 54 additions & 0 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ impl BatchPartitioner {
num_input_partitions,
))
}
Partitioning::Expr(_) => {
not_impl_err!(
"Expression partitioning is not supported by RepartitionExec"
)
}
Comment on lines +603 to +607
Copy link
Copy Markdown
Contributor

@stuhood stuhood May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, it's worth discussing this in more detail I think.

Expr partitioning is much, much more general than Range partitioning.

In Range partitioning, deciding which partition a row maps to involves either a binary search or sorted map lookup. But in Expr partitioning, it will always be a linear scan through the expressions, unless the consumer has reverse-engineered the fact that it is actually Range partitioning under the hood.

So this operator will be much more expensive than it might be otherwise.

What is the reasoning around using expressions here, and not literally ranges?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent wasn't for ExprPartitioning to be efficient execution format for physically repartitioning rows. I was thinking of this as partitioning for sources/plans that already have known partitioning and declare it to preserve in the plan to unlock optimizations.

In follow-ups:

  • add explicit compatibility/satisfaction APIs around this metadata we can ask structured questions without doing row-wise linear scans. This would eliminate uneeded repartitions in cases where different partitioning types satisfy one another.
  • keep hash repartitioning as the preferred general execution path when DataFusion needs to repartition arbitrary input, unless we later add a more specialized repartitioning strategy.

Let me know thoughts on that 👍

Copy link
Copy Markdown
Contributor

@stuhood stuhood May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent wasn't for ExprPartitioning to be efficient execution format for physically repartitioning rows. I was thinking of this as partitioning for sources/plans that already have known partitioning and declare it to preserve in the plan to unlock optimizations.

That works for the first join, but not for followup joins. For example:

If you have a 3 table join, the first join will be able to use an equality match on range partitioning to say: no re-partitioning needed at all because the two tables are partitioned the same way! Great.

But its very likely that the second join does need to re-partition one of its inputs (assuming different join keys between the two joins): the output of join one needs to be re-partitioned to match the third table. Now, technically you can just repartition both sides (i.e. switch to hash or something). But if you instead re-partition to match the third table, then you might be able to significantly cut down on data movement.


So, yes: I think that it is important to be able to efficiently re-partition by this strategy. If we don't have concrete use-cases for generic expression partitioning, then it would not be my first choice here.

other => {
not_impl_err!("Unsupported repartitioning scheme {other:?}")
}
Expand Down Expand Up @@ -1260,6 +1265,11 @@ impl ExecutionPlan for RepartitionExec {
}
Partitioning::Hash(new_partitions, *size)
}
Partitioning::Expr(_) => {
return not_impl_err!(
"Expression partitioning is not supported for projection pushdown through RepartitionExec"
);
}
others => others.clone(),
};

Expand Down Expand Up @@ -1296,6 +1306,11 @@ impl ExecutionPlan for RepartitionExec {
if !self.maintains_input_order()[0] {
return Ok(SortOrderPushdownResult::Unsupported);
}
if matches!(self.partitioning(), Partitioning::Expr(_)) {
return not_impl_err!(
"Expression partitioning is not supported for sort pushdown through RepartitionExec"
);
}

// Delegate to the child and wrap with a new RepartitionExec
self.input.try_pushdown_sort(order)?.try_map(|new_input| {
Expand All @@ -1319,6 +1334,11 @@ impl ExecutionPlan for RepartitionExec {
RoundRobinBatch(_) => RoundRobinBatch(target_partitions),
Hash(hash, _) => Hash(hash, target_partitions),
UnknownPartitioning(_) => UnknownPartitioning(target_partitions),
Expr(_) => {
return not_impl_err!(
"Expression partitioning is not supported for changing RepartitionExec partition counts"
);
}
};
Ok(Some(Arc::new(Self {
input: Arc::clone(&self.input),
Expand Down Expand Up @@ -1447,6 +1467,11 @@ impl RepartitionExec {
num_input_partitions,
)
}
Partitioning::Expr(_) => {
return not_impl_err!(
"Expression partitioning is not supported by RepartitionExec"
);
}
other => {
return not_impl_err!("Unsupported repartitioning scheme {other:?}");
}
Expand Down Expand Up @@ -1863,6 +1888,7 @@ mod tests {
use datafusion_common_runtime::JoinSet;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::ExprPartitioning;
use insta::assert_snapshot;

#[test]
Expand Down Expand Up @@ -2155,6 +2181,34 @@ mod tests {
);
}

#[tokio::test]
async fn unsupported_expr_partitioning() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch = RecordBatch::try_from_iter(vec![(
"my_awesome_field",
Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef,
)])?;

let schema = batch.schema();
let expr = col("my_awesome_field", &schema)?;
let input = MockExec::new(vec![Ok(batch)], Arc::clone(&schema));
let partitioning = Partitioning::Expr(ExprPartitioning::new(vec![expr]));
let exec = RepartitionExec::try_new(Arc::new(input), partitioning)?;
let output_stream = exec.execute(0, task_ctx)?;

let result_string = crate::common::collect(output_stream)
.await
.unwrap_err()
.to_string();
assert!(
result_string
.contains("Expression partitioning is not supported by RepartitionExec"),
"actual: {result_string}"
);

Ok(())
}

#[tokio::test]
async fn error_for_input_exec() {
// This generates an error on a call to execute. The error
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1505,11 +1505,7 @@ mod tests {
let task_ctx = Arc::new(TaskContext::default());
let schema = Schema::new(vec![Field::new("c1", DataType::UInt64, false)]);
let properties = CongestedExec::compute_properties(Arc::new(schema.clone()));
let &partition_count = match properties.output_partitioning() {
Partitioning::RoundRobinBatch(partitions) => partitions,
Partitioning::Hash(_, partitions) => partitions,
Partitioning::UnknownPartitioning(partitions) => partitions,
};
let partition_count = properties.output_partitioning().partition_count();
let source = CongestedExec {
schema: schema.clone(),
cache: Arc::new(properties),
Expand Down
Loading
Loading