Skip to content
Merged
177 changes: 139 additions & 38 deletions docs/content/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,26 @@ under the License.

# Ray Data

## Read

This requires `ray` to be installed.

You can convert the splits into a Ray Dataset and handle it by Ray Data API for distributed processing:
`pypaimon.ray` exposes a top-level `read_paimon` / `write_paimon` facade that
takes a table identifier and catalog options directly, mirroring the shape of
Ray's built-in Iceberg integration. The lower-level `TableRead.to_ray()` and
`TableWrite.write_ray()` entry points remain available for callers that have
already resolved a `(read_builder, splits)` pair or constructed a
`table_write` via the regular pypaimon API.

## Read

### `read_paimon` (recommended)

```python
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)
from pypaimon.ray import read_paimon

ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)

print(ray_dataset)
# MaterializedDataset(num_blocks=1, num_rows=9, schema={f0: int32, f1: string})
Expand All @@ -52,74 +63,161 @@ print(ray_dataset.to_pandas())
# ...
```

The `to_ray()` method supports Ray Data API parameters for distributed processing:
`read_paimon` opens its own catalog and resolves the table, so it is the
single-call equivalent of the four-step `CatalogFactory.create → get_table →
new_read_builder → to_ray` boilerplate.

**Projection and limit:**

```python
# Basic usage
ray_dataset = table_read.to_ray(splits)
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
projection=["id", "score"],
limit=1000,
)
```

# Specify number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=4)
**Distribution / scheduling:**

# Configure Ray remote arguments
```python
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3},
concurrency=8,
)
```

**Parameters:**
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
e.g. `{"warehouse": "/path/to/warehouse"}`.
- `filter`: optional `Predicate` to push down into the scan.
- `projection`: optional list of column names to read.
- `limit`: optional row limit applied at scan planning time.
- `override_num_blocks`: optional override for the number of output blocks.
Must be `>= 1`.
- `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks
(e.g. `{"num_cpus": 2, "max_retries": 3}`).
- `concurrency`: optional max number of Ray tasks to run concurrently.
- `**read_args`: additional kwargs forwarded to `ray.data.read_datasource`
(e.g. `per_task_row_limit` in Ray 2.52.0+).

### `TableRead.to_ray()` (lower-level)

If you already have a `read_builder` and `splits`, you can convert them to a
Ray Dataset directly:

```python
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=4,
ray_remote_args={"num_cpus": 2, "max_retries": 3}
ray_remote_args={"num_cpus": 2, "max_retries": 3},
)

# Use Ray Data operations
mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
filtered_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
df = ray_dataset.to_pandas()
```

**Parameters:**
- `override_num_blocks`: Optional override for the number of output blocks. By default,
Ray automatically determines the optimal number.
- `ray_remote_args`: Optional kwargs passed to `ray.remote()` in read tasks
(e.g., `{"num_cpus": 2, "max_retries": 3}`).
- `concurrency`: Optional max number of Ray tasks to run concurrently. By default,
dynamically decided based on available resources.
- `**read_args`: Additional kwargs passed to the datasource (e.g., `per_task_row_limit`
in Ray 2.52.0+).
`to_ray()` accepts the same `override_num_blocks`, `ray_remote_args`,
`concurrency`, and `**read_args` parameters as `read_paimon`.

**Ray Block Size Configuration:**
### Ray Block Size Configuration

If you need to configure Ray's block size (e.g., when Paimon splits exceed Ray's default
128MB block size), set it before calling `to_ray()`:
If you need to configure Ray's block size (e.g., when Paimon splits exceed
Ray's default 128MB block size), set it on the `DataContext` before calling
either `read_paimon` or `to_ray`:

```python
from ray.data import DataContext

ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
ray_dataset = table_read.to_ray(splits)
```

See [Ray Data API Documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html) for more details.
See the [Ray Data API documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
for more details.

## Write

### `write_paimon` (recommended)

```python
import ray
from pypaimon.ray import write_paimon

ray_dataset = ray.data.read_json("/path/to/data.jsonl")

write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
```

`write_paimon` opens its own catalog, resolves the table, and commits the
write through Ray's Datasink API — there is no separate `prepare_commit` or
`close` step to run.

**Overwrite mode:**

```python
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
overwrite=True,
)
```

**Distribution / scheduling:**

```python
write_paimon(
ray_dataset,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
concurrency=4,
ray_remote_args={"num_cpus": 2},
)
```

**Parameters:**
- `dataset`: the Ray Dataset to write.
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
- `overwrite`: if `True`, overwrite existing data in the table.
- `concurrency`: optional max number of Ray write tasks to run concurrently.
- `ray_remote_args`: optional kwargs passed to `ray.remote()` in write tasks
(e.g. `{"num_cpus": 2}`).

### `TableWrite.write_ray()` (lower-level)

If you have already constructed a `table_write` from a write builder, you can
hand a Ray Dataset directly to it. `write_ray()` commits through the Ray
Datasink API, so there is no `prepare_commit` / `commit` step to run for the
Ray write itself — just close the writer when you are done with it:

```python
import ray

table = catalog.get_table('database_name.table_name')

# 1. Create table write and commit
# 1. Create table write and commit (commit is only needed for non-Ray writes
# on the same table_write instance — see below).
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

# 2 Write Ray Dataset (requires ray to be installed)
import ray
# 2. Write Ray Dataset
ray_dataset = ray.data.read_json("/path/to/data.jsonl")
table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
# Parameters:
# - dataset: Ray Dataset to write
# - overwrite: Whether to overwrite existing data (default: False)
# - concurrency: Optional max number of concurrent Ray tasks
# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., {"num_cpus": 2})
# Note: write_ray() handles commit internally through Ray Datasink API.
# Skip steps 3-4 if using write_ray() - just close the writer.

# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
commit_messages = table_write.prepare_commit()
Expand All @@ -130,8 +228,11 @@ table_write.close()
table_commit.close()
```

By default, the data will be appended to table. If you want to overwrite table, you should use `TableWrite#overwrite`
API:
### Overwrite at builder level

The recommended way to overwrite via `write_paimon` is the `overwrite=True`
flag above. When using the lower-level builder API, you can also configure
overwrite mode on the write builder itself:

```python
# overwrite whole table
Expand Down
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from pypaimon.ray.ray_paimon import read_paimon, write_paimon

__all__ = ["read_paimon", "write_paimon"]
124 changes: 124 additions & 0 deletions paimon-python/pypaimon/ray/ray_paimon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Top-level API for reading and writing Paimon tables with Ray Datasets.

Usage::

from pypaimon.ray import read_paimon, write_paimon

ds = read_paimon("db.table", catalog_options={"warehouse": "/path"})
write_paimon(ds, "db.table", catalog_options={"warehouse": "/path"})
"""

from typing import Any, Dict, List, Optional

import ray.data

from pypaimon.common.predicate import Predicate


def read_paimon(
table_identifier: str,
catalog_options: Dict[str, str],
*,
filter: Optional[Predicate] = None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**read_args,
) -> ray.data.Dataset:
"""Read a Paimon table into a Ray Dataset.

Args:
table_identifier: Full table name, e.g. ``"db_name.table_name"``.
catalog_options: Options passed to ``CatalogFactory.create()``,
e.g. ``{"warehouse": "/path/to/warehouse"}``.
filter: Optional predicate to push down into the scan.
projection: Optional list of column names to read.
limit: Optional row limit for the scan.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in read tasks.
concurrency: Optional max number of Ray read tasks to run concurrently.
override_num_blocks: Optional override for the number of output blocks.
**read_args: Additional kwargs forwarded to ``ray.data.read_datasource``.

Returns:
A ``ray.data.Dataset`` containing the table data.
"""
from pypaimon.read.datasource.ray_datasource import RayDatasource
from pypaimon.read.datasource.split_provider import CatalogSplitProvider

if override_num_blocks is not None and override_num_blocks < 1:
raise ValueError(
"override_num_blocks must be at least 1, got {}".format(override_num_blocks)
)

datasource = RayDatasource(
CatalogSplitProvider(
table_identifier=table_identifier,
catalog_options=catalog_options,
predicate=filter,
projection=projection,
limit=limit,
)
)
return ray.data.read_datasource(
datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
**read_args,
)


def write_paimon(
dataset: ray.data.Dataset,
table_identifier: str,
catalog_options: Dict[str, str],
*,
overwrite: bool = False,
concurrency: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
) -> None:
"""Write a Ray Dataset to a Paimon table.

Args:
dataset: The Ray Dataset to write.
table_identifier: Full table name, e.g. ``"db_name.table_name"``.
catalog_options: Options passed to ``CatalogFactory.create()``.
overwrite: If ``True``, overwrite existing data in the table.
concurrency: Optional max number of Ray write tasks to run concurrently.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in write tasks.
"""
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.write.ray_datasink import PaimonDatasink

catalog = CatalogFactory.create(catalog_options)
table = catalog.get_table(table_identifier)

datasink = PaimonDatasink(table, overwrite=overwrite)

write_kwargs = {}
if ray_remote_args is not None:
write_kwargs["ray_remote_args"] = ray_remote_args
if concurrency is not None:
write_kwargs["concurrency"] = concurrency

dataset.write_datasink(datasink, **write_kwargs)
Loading
Loading