diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..f0dfe34544982 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -52,8 +52,7 @@ use datafusion_common::stats::Precision; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::transpose; use datafusion_common::{ - ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, - internal_datafusion_err, internal_err, + ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err, }; use datafusion_common::{Result, not_impl_err}; use datafusion_common_runtime::SpawnedTask; @@ -681,46 +680,8 @@ impl BatchPartitioner { // Finished building index-arrays for output partitions timer.done(); - // Borrowing partitioner timer to prevent moving `self` to closure - let partitioner_timer = &self.timer; - - let mut partitioned_batches = vec![]; - for (partition, p_indices) in indices.iter_mut().enumerate() { - if !p_indices.is_empty() { - let taken_indices = std::mem::take(p_indices); - let indices_array: PrimitiveArray = - taken_indices.into(); - - // Tracking time required for repartitioned batches construction - let _timer = partitioner_timer.timer(); - - // Produce batches based on indices - let columns = - take_arrays(batch.columns(), &indices_array, None)?; - - let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices_array.len())); - let batch = RecordBatch::try_new_with_options( - batch.schema(), - columns, - &options, - ) - .unwrap(); - - partitioned_batches.push(Ok((partition, batch))); - - // Return the taken vec - let (_, buffer, _) = indices_array.into_parts(); - let mut vec = - buffer.into_inner().into_vec::().map_err(|e| { - internal_datafusion_err!( - "Could not convert buffer to vec: {e:?}" - ) - })?; - vec.clear(); - *p_indices = vec; - } - } + let partitioned_batches = + Self::partition_grouped_take(&batch, indices, &self.timer)?; Box::new(partitioned_batches.into_iter()) } @@ -736,6 +697,66 @@ impl BatchPartitioner { BatchPartitionerState::Hash { indices, .. } => indices.len(), } } + + /// Build repartitioned hash output batches using one `take` per input batch. + /// + /// The hash router first fills one index vector per output partition. This method + /// concatenates those index vectors, performs one grouped `take_arrays`, and + /// then returns each output partition as a slice of the reordered batch. + /// + /// For example, given partition indices: + /// + /// ```text + /// partition 0: [2, 5] + /// partition 1: [] + /// partition 2: [0, 3, 4] + /// ``` + /// + /// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns + /// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`. + fn partition_grouped_take( + batch: &RecordBatch, + indices: &mut [Vec], + timer: &metrics::Time, + ) -> Result>> { + let mut partition_ranges = Vec::with_capacity(indices.len()); + let mut reordered_indices = Vec::with_capacity(batch.num_rows()); + + for (partition, p_indices) in indices.iter_mut().enumerate() { + if p_indices.is_empty() { + continue; + } + + let start = reordered_indices.len(); + reordered_indices.extend_from_slice(p_indices); + partition_ranges.push((partition, start, p_indices.len())); + p_indices.clear(); + } + + if reordered_indices.is_empty() { + return Ok(vec![]); + } + + let batches = { + let _timer = timer.timer(); + let indices_array: PrimitiveArray = reordered_indices.into(); + let columns = take_arrays(batch.columns(), &indices_array, None)?; + + let mut options = RecordBatchOptions::new(); + options = options.with_row_count(Some(indices_array.len())); + let reordered_batch = + RecordBatch::try_new_with_options(batch.schema(), columns, &options)?; + + partition_ranges + .into_iter() + .map(|(partition, start, len)| { + Ok((partition, reordered_batch.slice(start, len))) + }) + .collect() + }; + + Ok(batches) + } } /// Maps `N` input partitions to `M` output partitions based on a