diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 1a14bd239a61a..dcc4800712e3f 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -473,6 +473,115 @@ fn create_local_dirs(local_dirs: &[PathBuf]) -> Result>> { .collect() } +pub struct FileSpillWriter { + file: std::fs::File, + disk_manager: Arc, + current_file_disk_usage: Arc, +} + +impl SpillWriter for FileSpillWriter { + fn write(&mut self, data: Bytes) -> Result<()> { + use std::io::Write; + + let len = data.len() as u64; + if len == 0 { + return Ok(()); + } + + self.file + .write_all(&data) + .map_err(DataFusionError::IoError)?; + self.current_file_disk_usage + .fetch_add(len, Ordering::Relaxed); + let new_global = self + .disk_manager + .used_disk_space + .fetch_add(len, Ordering::Relaxed) + + len; + + if new_global > self.disk_manager.max_temp_directory_size { + return resources_err!( + "The used disk space during the spilling process has exceeded the allowable limit of {}. \ + Please try increasing the config: `datafusion.runtime.max_temp_directory_size`.", + human_readable_size(self.disk_manager.max_temp_directory_size as usize) + ); + } + + Ok(()) + } + + fn flush(&mut self) -> Result<()> { + use std::io::Write; + self.file.flush().map_err(DataFusionError::IoError) + } + + fn finish(&mut self) -> Result<()> { + // flush() already called by SpillWriteAdapter before finish() + Ok(()) + } +} + +impl SpillFile for RefCountedTempFile { + fn path(&self) -> Option<&Path> { + Some(self.tempfile.path()) + } + + fn size(&self) -> Option { + Some(self.current_disk_usage()) + } + #[cfg(not(target_arch = "wasm32"))] + fn read_stream( + &self, + ) -> Result> + Send>>> + { + let path = self.path().to_owned(); + + let stream = + futures::stream::once(async move { + tokio::fs::File::open(&path) + .await + .map_err(DataFusionError::IoError) + }) + .flat_map( + |open_result| -> std::pin::Pin< + Box> + Send>, + > { + match open_result { + Ok(file) => Box::pin( + tokio_util::io::ReaderStream::new(file) + .map(|r| r.map_err(DataFusionError::IoError)), + ), + Err(e) => Box::pin(futures::stream::once(async move { Err(e) })), + } + }, + ); + + Ok(Box::pin(stream)) + } + + #[cfg(target_arch = "wasm32")] + fn read_stream( + &self, + ) -> Result> + Send>>> + { + datafusion_common::exec_err!( + "Default OS file spilling is not supported on WASM. Configure DiskManager with a Custom TempFileFactory." + ) + } + + fn open_writer(&self) -> Result> { + let file = self + .tempfile + .as_file() + .try_clone() + .map_err(DataFusionError::IoError)?; + Ok(Box::new(FileSpillWriter { + file, + disk_manager: Arc::clone(&self.disk_manager), + current_file_disk_usage: Arc::clone(&self.current_file_disk_usage), + })) + } +} #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/execution/src/spill_file.rs b/datafusion/execution/src/spill_file.rs new file mode 100644 index 0000000000000..44f44841838e6 --- /dev/null +++ b/datafusion/execution/src/spill_file.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Bytes; +use datafusion_common::Result; +use futures::Stream; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; + +/// Abstraction over a spill file backend. +/// Implementations handle their own quota enforcement and blocking concerns. +pub trait SpillFile: Send + Sync { + /// Returns the OS path if this is a local file, None otherwise. + fn path(&self) -> Option<&Path> { + None + } + + /// Returns current size in bytes if cheaply available. + fn size(&self) -> Option; + + /// Returns file contents as an async stream of byte chunks. + fn read_stream(&self) -> Result> + Send>>>; + + /// Opens a writer for appending data to this file. + fn open_writer(&self) -> Result>; +} + +/// Writer for spill file backends. +/// Receives zero-copy `Bytes` payloads from the IPCStreamWriter adapter. +pub trait SpillWriter: Send { + fn write(&mut self, data: Bytes) -> Result<()>; + fn flush(&mut self) -> Result<()>; + /// Finalizes the write after all data has been flushed. + /// + /// Implementations must not call `flush` internally. + /// Intended for close/sync/commit operations. + fn finish(&mut self) -> Result<()>; +} + +/// Factory for creating spill files. +pub trait TempFileFactory: + Send + Sync + std::panic::UnwindSafe + std::panic::RefUnwindSafe +{ + fn create_temp_file(&self, description: &str) -> Result>; +} diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs index ad7312426bd18..a7b9da1706ff5 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs @@ -119,8 +119,6 @@ //! factor than the pair-materialization approach. use std::cmp::Ordering; -use std::fs::File; -use std::io::BufReader; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -134,7 +132,6 @@ use crate::{EmptyRecordBatchStream, RecordBatchStream}; use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch}; use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not}; use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::StreamReader; use arrow::util::bit_chunk_iterator::UnalignedBitChunk; use arrow::util::bit_util::apply_bitwise_binary_op; use datafusion_common::{ @@ -259,6 +256,11 @@ pub(crate) struct BitwiseSortMergeJoinStream { inner_key_buffer: Vec, inner_key_spill: Option, + //Track the active spill_stream + spill_stream: Option, + //Prevents wiping out the buffer if we yield while evaluating the filter + inner_group_buffered: bool, + // True when buffer_inner_key_group returned Pending after partially // filling inner_key_buffer. On re-entry, buffer_inner_key_group // must skip clear() and resume from poll_next_inner_batch (the @@ -371,6 +373,8 @@ impl BitwiseSortMergeJoinStream { matched: BooleanBufferBuilder::new(0), inner_key_buffer: vec![], inner_key_spill: None, + spill_stream: None, + inner_group_buffered: false, buffering_inner_pending: false, pending_boundary: None, on_outer, @@ -468,6 +472,8 @@ impl BitwiseSortMergeJoinStream { fn clear_inner_key_group(&mut self) { self.inner_key_buffer.clear(); self.inner_key_spill = None; + self.spill_stream = None; + self.inner_group_buffered = false; self.inner_buffer_size = 0; } @@ -749,7 +755,10 @@ impl BitwiseSortMergeJoinStream { /// Process a key match with a filter. For each inner row in the buffered /// key group, evaluates the filter against the outer key group and ORs /// the results into the matched bitset using u64-chunked bitwise ops. - fn process_key_match_with_filter(&mut self) -> Result<()> { + fn process_key_match_with_filter( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { self.get_outer_self_cmp()?; let filter = self.filter.as_ref().unwrap(); let outer_batch = self.outer_batch.as_ref().unwrap(); @@ -785,24 +794,40 @@ impl BitwiseSortMergeJoinStream { ) .count_ones(); - // Process spilled inner batches first (read back from disk). - if let Some(spill_file) = &self.inner_key_spill { - let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; - for batch_result in reader { - let inner_slice = batch_result?; - matched_count = eval_filter_for_inner_slice( - self.outer_is_left, - filter, - &outer_slice, - &inner_slice, - &mut self.matched, - self.outer_offset, - outer_group_len, - matched_count, - )?; - if matched_count == outer_group_len { - break; + // Process spilled inner batches first asynchronously. + if self.inner_key_spill.is_some() || self.spill_stream.is_some() { + if self.spill_stream.is_none() + && let Some(spill_file) = &self.inner_key_spill + { + let stream = self + .spill_manager + .read_spill_as_stream(Arc::clone(spill_file), None)?; + self.spill_stream = Some(stream); + } + + while matched_count < outer_group_len { + let stream = self.spill_stream.as_mut().unwrap(); + match ready!(stream.poll_next_unpin(cx)) { + Some(Ok(inner_slice)) => { + matched_count = eval_filter_for_inner_slice( + self.outer_is_left, + filter, + &outer_slice, + &inner_slice, + &mut self.matched, + self.outer_offset, + outer_group_len, + matched_count, + )?; + } + Some(Err(e)) => { + self.spill_stream = None; + return Poll::Ready(Err(e)); + } + None => { + self.spill_stream = None; + break; + } } } } @@ -830,13 +855,16 @@ impl BitwiseSortMergeJoinStream { } self.outer_offset = outer_group_end; - Ok(()) + + self.spill_stream = None; + + Poll::Ready(Ok(())) } /// Continue processing an outer key group that spans multiple outer /// batches. Returns `true` if this outer batch was fully consumed /// by the key group and the caller should load another. - fn resume_boundary(&mut self) -> Result { + fn resume_boundary(&mut self, cx: &mut Context<'_>) -> Poll> { debug_assert!( self.outer_batch.is_some(), "caller must load outer_batch first" @@ -858,7 +886,7 @@ impl BitwiseSortMergeJoinStream { }); self.emit_outer_batch()?; self.outer_batch = None; - return Ok(true); + return Poll::Ready(Ok(true)); } } } @@ -874,7 +902,15 @@ impl BitwiseSortMergeJoinStream { self.null_equality, )?; if same_key { - self.process_key_match_with_filter()?; + match self.process_key_match_with_filter(cx) { + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => { + self.pending_boundary = + Some(PendingBoundary::Filtered { saved_keys }); + return Poll::Pending; + } + } let num_outer = self.outer_batch.as_ref().unwrap().num_rows(); if self.outer_offset >= num_outer { self.pending_boundary = Some(PendingBoundary::Filtered { @@ -882,14 +918,63 @@ impl BitwiseSortMergeJoinStream { }); self.emit_outer_batch()?; self.outer_batch = None; - return Ok(true); + return Poll::Ready(Ok(true)); } } self.clear_inner_key_group(); } None => {} } - Ok(false) + Poll::Ready(Ok(false)) + } + + /// Helper to process an Equal match across potential outer batch boundaries. + fn process_filtered_match_loop(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + ready!(self.process_key_match_with_filter(cx))?; + + let outer_batch = self.outer_batch.as_ref().unwrap(); + if self.outer_offset >= outer_batch.num_rows() { + let saved_keys = + slice_keys(&self.outer_key_arrays, outer_batch.num_rows() - 1); + + self.emit_outer_batch()?; + self.pending_boundary = Some(PendingBoundary::Filtered { saved_keys }); + + // Clear stale batch before polling + self.outer_batch = None; + + match ready!(self.poll_next_outer_batch(cx)) { + Err(e) => return Poll::Ready(Err(e)), + Ok(false) => { + self.pending_boundary = None; + break; + } + Ok(true) => { + let Some(PendingBoundary::Filtered { saved_keys }) = + self.pending_boundary.take() + else { + unreachable!() + }; + let same = keys_match( + &saved_keys, + &self.outer_key_arrays, + &self.sort_options, + self.null_equality, + )?; + if same { + continue; + } + break; + } + } + } else { + break; + } + } + + self.clear_inner_key_group(); // This resets inner_group_buffered to false + Poll::Ready(Ok(())) } /// Main loop: drive the merge-scan to produce output batches. @@ -911,14 +996,21 @@ impl BitwiseSortMergeJoinStream { } return Poll::Ready(Ok(None)); } - Ok(true) => { - if self.resume_boundary()? { - continue; - } - } + Ok(true) => {} // Loaded batch, move on to checks } } + // Handles pausing while fetching a NEW outer batch. + if self.pending_boundary.is_some() && ready!(self.resume_boundary(cx))? { + continue; + } + + // Handles pausing while reading the disk stream mid-batch. + if self.inner_group_buffered { + ready!(self.process_filtered_match_loop(cx))?; + continue; + } + // 2. Ensure we have an inner batch (unless inner is exhausted). // Skip this when resuming a pending boundary — inner was already // advanced past the key group before the boundary loop started. @@ -1043,65 +1135,17 @@ impl BitwiseSortMergeJoinStream { } Ordering::Equal => { if self.filter.is_some() { + debug_assert!(!self.inner_group_buffered); // Buffer inner key group (may span batches) match ready!(self.buffer_inner_key_group(cx)) { Err(e) => return Poll::Ready(Err(e)), - Ok(_inner_exhausted) => {} + Ok(_inner_exhausted) => { + self.inner_group_buffered = true; + } } - // Process outer rows against buffered inner group // (may need to handle outer batch boundary) - loop { - self.process_key_match_with_filter()?; - - let outer_batch = self.outer_batch.as_ref().unwrap(); - if self.outer_offset >= outer_batch.num_rows() { - let saved_keys = slice_keys( - &self.outer_key_arrays, - outer_batch.num_rows() - 1, - ); - - self.emit_outer_batch()?; - debug_assert!( - !self.inner_key_buffer.is_empty() - || self.inner_key_spill.is_some(), - "Filtered pending boundary requires inner key data in buffer or spill" - ); - self.pending_boundary = - Some(PendingBoundary::Filtered { saved_keys }); - - match ready!(self.poll_next_outer_batch(cx)) { - Err(e) => return Poll::Ready(Err(e)), - Ok(false) => { - self.pending_boundary = None; - self.outer_batch = None; - break; - } - Ok(true) => { - let Some(PendingBoundary::Filtered { - saved_keys, - }) = self.pending_boundary.take() - else { - unreachable!() - }; - let same = keys_match( - &saved_keys, - &self.outer_key_arrays, - &self.sort_options, - self.null_equality, - )?; - if same { - continue; - } - break; - } - } - } else { - break; - } - } - - self.clear_inner_key_group(); + ready!(self.process_filtered_match_loop(cx))?; } else { // No filter: advance inner past key group, then // mark all outer rows with this key as matched. diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs index 5d23046ec7726..14add2e838086 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs @@ -23,8 +23,7 @@ use std::cmp::Ordering; use std::collections::{HashMap, VecDeque}; -use std::fs::File; -use std::io::BufReader; +use std::fmt::Debug; use std::mem::size_of; use std::ops::Range; use std::pin::Pin; @@ -47,10 +46,9 @@ use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{types::UInt64Type, *}; use arrow::compute::{ self, BatchCoalescer, SortOptions, concat_batches, filter_record_batch, interleave, - take, take_arrays, + take_arrays, }; use arrow::datatypes::SchemaRef; -use arrow::ipc::reader::StreamReader; use datafusion_common::cast::as_uint64_array; use datafusion_common::{JoinType, NullEquality, Result, exec_err, internal_err}; use datafusion_execution::disk_manager::RefCountedTempFile; @@ -58,7 +56,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, ready}; /// State of SMJ stream #[derive(Debug, PartialEq, Eq)] @@ -382,6 +380,9 @@ pub(super) struct MaterializingSortMergeJoinStream { /// Manages the process of spilling and reading back intermediate data pub spill_manager: SpillManager, + /// Tracks the active stream when loading spilled buffered batches back in memory + pub spill_stream: Option, + // ======================================================================== // CACHED COMPARATORS: // Pre-built comparators to avoid per-row type dispatch in hot loops. @@ -684,14 +685,13 @@ impl Stream for MaterializingSortMergeJoinStream { self.state = SortMergeJoinState::Init; } SortMergeJoinState::JoinOutput => { - self.join_partial()?; + // If the batch size limit is reached, restore required spilled batches to memory and freeze. + // Guarding at the top of the loop safely handles re-entry from Poll::Pending. + if self.num_unfrozen_pairs() >= self.batch_size { + let needed = self + .get_required_batch_indices(self.buffered_data.batches.len()); + ready!(self.poll_spilled_batches(cx, &needed))?; - if self.num_unfrozen_pairs() < self.batch_size { - if self.buffered_data.scanning_finished() { - self.buffered_data.scanning_reset(); - self.state = SortMergeJoinState::EmitReadyThenInit; - } - } else { self.freeze_all()?; // Verify metadata alignment before checking if we have batches to output @@ -705,7 +705,6 @@ impl Stream for MaterializingSortMergeJoinStream { } // For non-filtered joins, only output if we have a completed batch - // (opportunistic output when target batch size is reached) if self .joined_record_batches .joined_batches @@ -720,10 +719,26 @@ impl Stream for MaterializingSortMergeJoinStream { .record_output(&self.join_metrics.baseline_metrics()); return Poll::Ready(Some(Ok(record_batch))); } + // Otherwise keep buffering (don't output yet) + continue; } + + self.join_partial()?; + + if self.num_unfrozen_pairs() < self.batch_size + && self.buffered_data.scanning_finished() + { + self.buffered_data.scanning_reset(); + self.state = SortMergeJoinState::EmitReadyThenInit; + } + // Note: If join_partial() reached the batch size, the loop repeats to freeze the data. } SortMergeJoinState::Exhausted => { + let needed = + self.get_required_batch_indices(self.buffered_data.batches.len()); + ready!(self.poll_spilled_batches(cx, &needed))?; + self.freeze_all()?; // Verify metadata alignment before final output @@ -843,6 +858,7 @@ impl MaterializingSortMergeJoinStream { reservation, runtime_env, spill_manager, + spill_stream: None, streamed_buffered_cmp: None, buffered_equality_cmp: None, streamed_batch_counter: AtomicUsize::new(0), @@ -917,6 +933,70 @@ impl MaterializingSortMergeJoinStream { Poll::Pending } + /// Identifies which buffered batches are needed for the upcoming freeze operation + fn get_required_batch_indices(&self, buffered_freeze_count: usize) -> Vec { + let mut needed = vec![]; + + // We need all batches that matched with streamed rows + for chunk in &self.streamed_batch.output_indices { + if let Some(idx) = chunk.buffered_batch_idx { + needed.push(idx); + } + } + + // Full Joins need to emit null-joined rows, so we need batches up to freeze_count + if self.join_type == JoinType::Full { + needed.extend(0..buffered_freeze_count); + } + + needed.sort_unstable(); + needed.dedup(); + needed + } + + /// Asynchronously reads spilled batches back into memory. + /// Only processes the required indices to avoid OOMs. + fn poll_spilled_batches( + &mut self, + cx: &mut Context<'_>, + required_indices: &[usize], + ) -> Poll> { + for &idx in required_indices { + // Guard against indices that might be out of bounds if the queue was cleared + if idx >= self.buffered_data.batches.len() { + continue; + } + + let bb = &mut self.buffered_data.batches[idx]; + + if let BufferedBatchState::Spilled(spill_file) = &bb.batch { + if self.spill_stream.is_none() { + let stream = self + .spill_manager + .read_spill_as_stream(Arc::clone(spill_file), None)?; + self.spill_stream = Some(stream); + } + + match ready!(self.spill_stream.as_mut().unwrap().poll_next_unpin(cx)) { + Some(Ok(batch)) => { + // Transition the batch back to InMemory + bb.batch = BufferedBatchState::InMemory(batch); + self.spill_stream = None; + } + Some(Err(e)) => { + self.spill_stream = None; + return Poll::Ready(Err(e)); + } + None => { + self.spill_stream = None; + return Poll::Ready(internal_err!("Spill file was empty")); + } + } + } + } + Poll::Ready(Ok(())) + } + /// Poll next streamed row fn poll_streamed_row(&mut self, cx: &mut Context) -> Poll>> { loop { @@ -931,33 +1011,41 @@ impl MaterializingSortMergeJoinStream { self.streamed_state = StreamedState::Polling; } } - StreamedState::Polling => match self.streamed.poll_next_unpin(cx)? { - Poll::Pending => { - return Poll::Pending; - } - Poll::Ready(None) => { - // Release the streamed input pipeline's resources. - let streamed_schema = self.streamed.schema(); - self.streamed = - Box::pin(EmptyRecordBatchStream::new(streamed_schema)); - self.streamed_state = StreamedState::Exhausted; + StreamedState::Polling => { + let needed = + self.get_required_batch_indices(self.buffered_data.batches.len()); + if let Err(e) = ready!(self.poll_spilled_batches(cx, &needed)) { + return Poll::Ready(Some(Err(e))); } - Poll::Ready(Some(batch)) => { - if batch.num_rows() > 0 { - self.freeze_streamed()?; - self.join_metrics.input_batches().add(1); - self.join_metrics.input_rows().add(batch.num_rows()); - self.streamed_batch = - StreamedBatch::new(batch, &self.on_streamed); - self.rebuild_streamed_buffered_cmp()?; - // Every incoming streaming batch should have its unique id - // Check `JoinedRecordBatches.self.streamed_batch_counter` documentation - self.streamed_batch_counter - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - self.streamed_state = StreamedState::Ready; + + match self.streamed.poll_next_unpin(cx)? { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(None) => { + // Release the streamed input pipeline's resources. + let streamed_schema = self.streamed.schema(); + self.streamed = + Box::pin(EmptyRecordBatchStream::new(streamed_schema)); + self.streamed_state = StreamedState::Exhausted; + } + Poll::Ready(Some(batch)) => { + if batch.num_rows() > 0 { + self.freeze_streamed()?; + self.join_metrics.input_batches().add(1); + self.join_metrics.input_rows().add(batch.num_rows()); + self.streamed_batch = + StreamedBatch::new(batch, &self.on_streamed); + self.rebuild_streamed_buffered_cmp()?; + // Every incoming streaming batch should have its unique id + // Check `JoinedRecordBatches.self.streamed_batch_counter` documentation + self.streamed_batch_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.streamed_state = StreamedState::Ready; + } } } - }, + } StreamedState::Ready => { return Poll::Ready(Some(Ok(()))); } @@ -1036,6 +1124,13 @@ impl MaterializingSortMergeJoinStream { let head_batch = self.buffered_data.head_batch(); // If the head batch is fully processed, dequeue it and produce output of it. if head_batch.range.end == head_batch.num_rows { + // load the spilled head batch before dequeuing + let needed = self.get_required_batch_indices(1); + if let Err(e) = ready!(self.poll_spilled_batches(cx, &needed)) + { + return Poll::Ready(Some(Err(e))); + } + self.freeze_dequeuing_buffered()?; if let Some(mut buffered_batch) = self.buffered_data.batches.pop_front() @@ -1590,17 +1685,15 @@ impl MaterializingSortMergeJoinStream { let num_right_cols = self.buffered_schema.fields().len(); let mut right_columns = Vec::with_capacity(num_right_cols); - // Read each source batch once (spilled batches require disk I/O). - let source_data: Vec> = source_batches + // Read each source batch once. + let source_data: Vec = source_batches .iter() .map(|&idx| { let bb = &self.buffered_data.batches[idx]; match &bb.batch { - BufferedBatchState::InMemory(batch) => Some(batch.clone()), - BufferedBatchState::Spilled(spill_file) => { - let file = BufReader::new(File::open(spill_file.path()).ok()?); - let reader = StreamReader::try_new(file, None).ok()?; - reader.into_iter().next()?.ok() + BufferedBatchState::InMemory(batch) => batch.clone(), + BufferedBatchState::Spilled(_) => { + unreachable!("Batches were unspilled") } } }) @@ -1615,14 +1708,7 @@ impl MaterializingSortMergeJoinStream { source_arrays.push(null_array.as_ref()); for data in &source_data { - match data { - Some(batch) => source_arrays.push(batch.column(col_idx).as_ref()), - None => { - return internal_err!( - "Failed to read spilled buffered batch during interleave" - ); - } - } + source_arrays.push(data.column(col_idx).as_ref()); } right_columns.push(interleave(&source_arrays, &interleave_indices)?); @@ -1817,32 +1903,17 @@ fn fetch_right_columns_from_batch_by_idxs( buffered_indices: &UInt64Array, ) -> Result> { match &buffered_batch.batch { - // In memory batch - // In memory batch BufferedBatchState::InMemory(batch) => { - // When indices form a contiguous range (common in SMJ since the - // buffered side is scanned sequentially), use zero-copy slice. if let Some(range) = is_contiguous_range(buffered_indices) { Ok(batch.slice(range.start, range.len()).columns().to_vec()) } else { Ok(take_arrays(batch.columns(), buffered_indices, None)?) } } - // If the batch was spilled to disk, less likely - BufferedBatchState::Spilled(spill_file) => { - let mut buffered_cols: Vec = - Vec::with_capacity(buffered_indices.len()); - - let file = BufReader::new(File::open(spill_file.path())?); - let reader = StreamReader::try_new(file, None)?; - - for batch in reader { - batch?.columns().iter().for_each(|column| { - buffered_cols.extend(take(column, &buffered_indices, None)) - }); - } - - Ok(buffered_cols) + BufferedBatchState::Spilled(_) => { + internal_err!( + "Buffered batch should have been unspilled before fetching columns" + ) } } }