Skip to content

feat(datafusion): enable parallel file-level scanning via one partition per file#2298

Draft
toutane wants to merge 6 commits intoapache:mainfrom
toutane:draft/partitioned-file-scanning-contribution
Draft

feat(datafusion): enable parallel file-level scanning via one partition per file#2298
toutane wants to merge 6 commits intoapache:mainfrom
toutane:draft/partitioned-file-scanning-contribution

Conversation

@toutane
Copy link
Copy Markdown
Contributor

@toutane toutane commented Mar 31, 2026

Which issue does this PR close?

What changes are included in this PR?

Approach

The issue proposed modifying IcebergTableScan directly to accept Vec<Vec<FileScanTask>> and return UnknownPartitioning(n). This PR takes a different approach: rather than changing the existing scan path, it introduces two new types. This preserves full backward compatibility with IcebergTableProvider / IcebergTableScan and lets users explicitly choose parallel file scanning when they need it.

Adds two new public types to iceberg-datafusion:

  • IcebergPartitionedScan: a DataFusion ExecutionPlan where each FileScanTask maps to exactly one partition, enabling DataFusion to dispatch file reads in parallel

  • IcebergPartitionedTableProvider: a catalog-backed TableProvider that builds an IcebergPartitionedScan on every query, always fetching the latest snapshot

Design choices

  • One file = one partition
    IcebergTableScan uses UnknownPartitioning(1) and streams all files sequentially through a single partition. IcebergPartitionedScan uses UnknownPartitioning(n_files), giving DataFusion the information it needs to schedule execute(i) calls concurrently, one per file.

  • Table reloaded on every scan
    IcebergPartitionedTableProvider loads the table twice: once at construction to snapshot the Arrow schema for DataFusion planning, and once at scan time to guarantee the freshest snapshot. This mirrors the behavior of IcebergTableProvider.

  • No stored projection/predicate fields
    The struct is intentionally self-contained: its full state is (tasks, file_io, schema).

Known limitations

  • No limit pushdown: _limit is not forwarded to IcebergPartitionedScan. DataFusion inserts a GlobalLimitExec above any leaf that does not implement pushdown, so correctness is maintained

  • No writes: insert_into returns FeatureUnsupported. Use IcebergTableProvider for write operations

  • Schema staleness on projection: projection indices are resolved against the schema captured at construction time. This is inherited behavior from IcebergTableProvider

Are these changes tested?

Two unit tests are added in table/partitioned.rs:

  • test_empty_table_zero_partitions: verifies that an empty table produces a zero-partition scan, guarding against an out-of-bounds panic on execute(0)

  • test_one_partition_per_file: verifies that N data files produce exactly N DataFusion partitions in IcebergPartitionedScan

Copy link
Copy Markdown
Member

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no expert on Iceberg but I've worked a lot on DataFusion, particularly table providers. I wrote a blog on the datafusion site recently, but since you first put this PR up. In case it's in any way useful: https://datafusion.apache.org/blog/2026/03/31/writing-table-providers/

Overall I think the approach here is definitely reasonable. My comments are mostly around opportunities to squeeze out a little more performance based on having done something similar at my work.

self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(self)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this doesn't support children, I'd recommend an error if _children is not empty. Not a blocker for merge.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right thanks! Pushed a fix that returns a DataFusionError::Internal, matching the pattern used in IcebergCommitExec::with_new_children.

Side note: IcebergTableScan::with_new_children has the same issue. This could be the subject of another PR.

Comment thread crates/integrations/datafusion/src/table/mod.rs Outdated
&self,
filters: &[&Expr],
) -> DFResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do better than this? If we have partitioned scan and the filter is on the partitions I would expect to be able to get an exact pushdown. That would entirely remove a filter operation for cases where it matches, and I think that's a big win and common use case I've seen in other work.

Copy link
Copy Markdown
Contributor Author

@toutane toutane Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right there's something to do here, I agree.

I'd prefer to tackle this in a follow-up PR: doing it correctly requires a per-filter conversion API (currently convert_filters_to_predicate collapses everything into a single combined predicate and silently drops non-convertible filters) and, in a partition-spec-aware check, only Identity-transformed partition columns can be safely marked Exact; bucket, truncate, year/month/etc. are lossy and must stay Inexact to avoid incorrect results.

Happy to open a tracking issue. However, if you think it's simple enough, I can go ahead and make the changes directly in the PR.

.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the number of output partitions will be the number of files, right? I'm wondering if there's an opportunity to do better than that. We're specifying that the output partitioning in the exec is unknown, but don't we have information about the partitioning we could utilize?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By better I mean could we be more performant if we were to go ahead and get the target partitions from the session and output in those number of partitions already with hashing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this, please push back if any of the below is off.

For context, the long-term direction for this is tracked in the EPIC #1604 (row-group-based parallel scan with a GroupPruner that can split/merge FileScanTask below the file grain). What I was hoping to land with this PR is a more immediate, scoped optimization that stays within the current file-grain contract, so we don't preempt the design choices in #1604. The file-grouping step you're pointing at is essentially what #2220 describes as the intermediate improvement on the path toward #1604.

If you think it's appropriate, I'd be happy to pick up a short-term follow-up along these lines:

  1. Switch IcebergPartitionedScan from tasks: Vec<FileScanTask> to file_groups: Vec<Vec<FileScanTask>>, to follow the convention used by DataFusion's own FileScanConfig, each group = one DataFusion partition that streams its files sequentially through ArrowReaderBuilder::read.
  2. In IcebergPartitionedTableProvider::scan, read state.config().target_partitions() and group tasks into min(n_files, target_partitions) buckets.
  3. When n_files < target_partitions, parallelism is still capped at n_files. I think that's inherent to the file grain, but let me know if I'm missing something.

I'm happy to open the follow-up issue/PR myself, or defer to you if you'd rather frame it, whatever works best.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I'd need to understand those conversations. I think I mentioned in one of the other comments on this PR, but I found the whole discussion difficult to track. Maybe I can find some time this weekend to look through that sized based partitioning they mention.

@mbutrovich
Copy link
Copy Markdown
Collaborator

Thanks for the PR, @toutane! One thing I noticed: IcebergPartitionedScan::execute() creates a bare ArrowReaderBuilder::new(file_io).build() with no configuration. The existing path through IcebergTableScan wires through row group filtering, row selection, concurrency limits, and batch size. Might be worth plumbing those through here too so users don't silently lose those optimizations when switching to the partitioned scan.

@toutane toutane force-pushed the draft/partitioned-file-scanning-contribution branch from 0a7af45 to fde61f6 Compare April 21, 2026 09:56
@timsaucer
Copy link
Copy Markdown
Member

More broadly, is adding in a second path really the best answer? It seems like now you're going to increase your maintenance load. Is there any reason not to have a single path and the fallback be that it's a partitioned scan of N=1?

I am going to spend a little more time trying to understand the issues. It's difficult because some of them are marked as unplanned or stale and some of the links do not have good descriptions. I suppose I'll need to look at the java source to get a better idea of what the long term goal is.

@toutane
Copy link
Copy Markdown
Contributor Author

toutane commented Apr 22, 2026

Hey Tim, I think you're absolutely right about consolidating everything into a single TableProvider long term.

The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested.

On a related note, it could be worth thinking about the next step: exposing Partitioning::Hash as output-partitioned when the Iceberg data uses bucket partitioning. Do you think that fits naturally in the same path, or would a separate provider be a better fit?

@timsaucer
Copy link
Copy Markdown
Member

Hey Tim, I think you're absolutely right about consolidating everything into a single TableProvider long term.

The only reason I kept separate paths was to avoid introducing breaking changes. I am going to explore a design where partitioned file scan becomes the default behavior, with the current provider's logic as a fallback as you suggested.

On a related note, it could be worth thinking about the next step: exposing Partitioning::Hash as output-partitioned when the Iceberg data uses bucket partitioning. Do you think that fits naturally in the same path, or would a separate provider be a better fit?

I understand a desire to not introduce breaking changes. Is the concern that the API is changing or do you have implementation concerns? If it's just the API change, then I think a good upgrade documentation is often sufficient, especially since it looks like the change would be fairly straightforward for a downstream consumer. Please correct me if that's not correct.

If it's concern about the implementation, then I think the real solution is to make sure there's robust testing both in the repo and against some real life workloads to verify performance at different scales and partitioning structures.

With respect to the question about output partitioning, I think any time you can do that you should. Any time we can give more information about these kinds of things we're going to see performance gains, and sometimes significant gains.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable parallel file-level scanning for IcebergTableScan Datafusion Integration

3 participants