feat(datafusion): enable parallel file-level scanning via one partition per file#2298
feat(datafusion): enable parallel file-level scanning via one partition per file#2298toutane wants to merge 6 commits intoapache:mainfrom
Conversation
timsaucer
left a comment
There was a problem hiding this comment.
I'm no expert on Iceberg but I've worked a lot on DataFusion, particularly table providers. I wrote a blog on the datafusion site recently, but since you first put this PR up. In case it's in any way useful: https://datafusion.apache.org/blog/2026/03/31/writing-table-providers/
Overall I think the approach here is definitely reasonable. My comments are mostly around opportunities to squeeze out a little more performance based on having done something similar at my work.
| self: Arc<Self>, | ||
| _children: Vec<Arc<dyn ExecutionPlan>>, | ||
| ) -> DFResult<Arc<dyn ExecutionPlan>> { | ||
| Ok(self) |
There was a problem hiding this comment.
Since this doesn't support children, I'd recommend an error if _children is not empty. Not a blocker for merge.
There was a problem hiding this comment.
Yes, you're right thanks! Pushed a fix that returns a DataFusionError::Internal, matching the pattern used in IcebergCommitExec::with_new_children.
Side note: IcebergTableScan::with_new_children has the same issue. This could be the subject of another PR.
| &self, | ||
| filters: &[&Expr], | ||
| ) -> DFResult<Vec<TableProviderFilterPushDown>> { | ||
| Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) |
There was a problem hiding this comment.
Can we do better than this? If we have partitioned scan and the filter is on the partitions I would expect to be able to get an exact pushdown. That would entirely remove a filter operation for cases where it matches, and I think that's a big win and common use case I've seen in other work.
There was a problem hiding this comment.
Yes, you're right there's something to do here, I agree.
I'd prefer to tackle this in a follow-up PR: doing it correctly requires a per-filter conversion API (currently convert_filters_to_predicate collapses everything into a single combined predicate and silently drops non-convertible filters) and, in a partition-spec-aware check, only Identity-transformed partition columns can be safely marked Exact; bucket, truncate, year/month/etc. are lossy and must stay Inexact to avoid incorrect results.
Happy to open a tracking issue. However, if you think it's simple enough, I can go ahead and make the changes directly in the PR.
| .map_err(to_datafusion_error)? | ||
| .try_collect::<Vec<_>>() | ||
| .await | ||
| .map_err(to_datafusion_error)?; |
There was a problem hiding this comment.
It looks like the number of output partitions will be the number of files, right? I'm wondering if there's an opportunity to do better than that. We're specifying that the output partitioning in the exec is unknown, but don't we have information about the partitioning we could utilize?
There was a problem hiding this comment.
By better I mean could we be more performant if we were to go ahead and get the target partitions from the session and output in those number of partitions already with hashing?
There was a problem hiding this comment.
Thanks for raising this, please push back if any of the below is off.
For context, the long-term direction for this is tracked in the EPIC #1604 (row-group-based parallel scan with a GroupPruner that can split/merge FileScanTask below the file grain). What I was hoping to land with this PR is a more immediate, scoped optimization that stays within the current file-grain contract, so we don't preempt the design choices in #1604. The file-grouping step you're pointing at is essentially what #2220 describes as the intermediate improvement on the path toward #1604.
If you think it's appropriate, I'd be happy to pick up a short-term follow-up along these lines:
- Switch
IcebergPartitionedScanfrom tasks:Vec<FileScanTask>to file_groups:Vec<Vec<FileScanTask>>, to follow the convention used by DataFusion's ownFileScanConfig, each group = one DataFusion partition that streams its files sequentially throughArrowReaderBuilder::read. - In
IcebergPartitionedTableProvider::scan, readstate.config().target_partitions()and group tasks intomin(n_files, target_partitions)buckets. - When
n_files<target_partitions, parallelism is still capped atn_files. I think that's inherent to the file grain, but let me know if I'm missing something.
I'm happy to open the follow-up issue/PR myself, or defer to you if you'd rather frame it, whatever works best.
There was a problem hiding this comment.
I suppose I'd need to understand those conversations. I think I mentioned in one of the other comments on this PR, but I found the whole discussion difficult to track. Maybe I can find some time this weekend to look through that sized based partitioning they mention.
|
Thanks for the PR, @toutane! One thing I noticed: |
…itionedScan for parallel file scanning
Co-authored-by: Tim Saucer <timsaucer@gmail.com>
…:with_new_children
0a7af45 to
fde61f6
Compare
|
More broadly, is adding in a second path really the best answer? It seems like now you're going to increase your maintenance load. Is there any reason not to have a single path and the fallback be that it's a partitioned scan of N=1? I am going to spend a little more time trying to understand the issues. It's difficult because some of them are marked as unplanned or stale and some of the links do not have good descriptions. I suppose I'll need to look at the java source to get a better idea of what the long term goal is. |
|
Hey Tim, I think you're absolutely right about consolidating everything into a single The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested. On a related note, it could be worth thinking about the next step: exposing |
I understand a desire to not introduce breaking changes. Is the concern that the API is changing or do you have implementation concerns? If it's just the API change, then I think a good upgrade documentation is often sufficient, especially since it looks like the change would be fairly straightforward for a downstream consumer. Please correct me if that's not correct. If it's concern about the implementation, then I think the real solution is to make sure there's robust testing both in the repo and against some real life workloads to verify performance at different scales and partitioning structures. With respect to the question about output partitioning, I think any time you can do that you should. Any time we can give more information about these kinds of things we're going to see performance gains, and sometimes significant gains. |
Which issue does this PR close?
What changes are included in this PR?
Approach
The issue proposed modifying
IcebergTableScandirectly to acceptVec<Vec<FileScanTask>>and returnUnknownPartitioning(n). This PR takes a different approach: rather than changing the existing scan path, it introduces two new types. This preserves full backward compatibility withIcebergTableProvider/IcebergTableScanand lets users explicitly choose parallel file scanning when they need it.Adds two new public types to
iceberg-datafusion:IcebergPartitionedScan: a DataFusionExecutionPlanwhere eachFileScanTaskmaps to exactly one partition, enabling DataFusion to dispatch file reads in parallelIcebergPartitionedTableProvider: a catalog-backedTableProviderthat builds anIcebergPartitionedScanon every query, always fetching the latest snapshotDesign choices
One file = one partition
IcebergTableScanusesUnknownPartitioning(1)and streams all files sequentially through a single partition.IcebergPartitionedScanusesUnknownPartitioning(n_files), giving DataFusion the information it needs to scheduleexecute(i)calls concurrently, one per file.Table reloaded on every scan
IcebergPartitionedTableProviderloads the table twice: once at construction to snapshot the Arrow schema for DataFusion planning, and once at scan time to guarantee the freshest snapshot. This mirrors the behavior ofIcebergTableProvider.No stored projection/predicate fields
The struct is intentionally self-contained: its full state is
(tasks, file_io, schema).Known limitations
No limit pushdown:
_limitis not forwarded toIcebergPartitionedScan. DataFusion inserts aGlobalLimitExecabove any leaf that does not implement pushdown, so correctness is maintainedNo writes:
insert_intoreturnsFeatureUnsupported. UseIcebergTableProviderfor write operationsSchema staleness on projection: projection indices are resolved against the schema captured at construction time. This is inherited behavior from
IcebergTableProviderAre these changes tested?
Two unit tests are added in
table/partitioned.rs:test_empty_table_zero_partitions: verifies that an empty table produces a zero-partition scan, guarding against an out-of-bounds panic onexecute(0)test_one_partition_per_file: verifies that N data files produce exactly N DataFusion partitions inIcebergPartitionedScan