Skip to content

feat: enable pickling of most Expr except udaf and udwf#1544

Open
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr1-scalar-expr-pickle
Open

feat: enable pickling of most Expr except udaf and udwf#1544
timsaucer wants to merge 2 commits into
apache:mainfrom
timsaucer:pr1-scalar-expr-pickle

Conversation

@timsaucer
Copy link
Copy Markdown
Member

@timsaucer timsaucer commented May 15, 2026

Which issue does this PR close?

Addresses part of #1517

This is PR 1 of 4. The four PRs stack sequentially on top of this one; subsequent PRs target this branch's tip until it merges.

Follow up PRs:

Rationale for this change

Today a LogicalPlan or Expr referencing a Python-defined ScalarUDF cannot survive a serialization round-trip without the receiver pre-registering a matching UDF, because the upstream protobuf codecs only carry the UDF name. That blocks shipping expressions to worker processes via pickle.dumps / multiprocessing.Pool / Ray actors / datafusion-distributed. This PR closes the scalar-UDF case end-to-end so the natural pickle.dumps(expr) pattern works for built-ins and Python scalar UDFs with no receiver-side setup.

What changes are included in this PR?

Adds Python-aware encoding to PythonLogicalCodec and PythonPhysicalCodec.

On the Python side, Expr gains __reduce__ plus a classmethod from_bytes(buf, ctx=None). A new datafusion.ipc module exposes set_worker_ctx / get_worker_ctx / clear_worker_ctx thread-locals; _resolve_ctx consults explicit-ctx > worker-ctx > global SessionContext.

cloudpickle>=2.0 is added as a runtime dependency (lazy-imported on the encode/decode hot path). This is a tiny dependency, in the kilobyte range.

Aggregate and window inline encoding, the per-session with_python_udf_inlining toggle, sender-side context wiring, and the user-guide docs land in PRs 2-4 of this series.

Are there any user-facing changes?

Yes, but these are only additions.

  • Expr is now picklable. Built-ins and Python scalar UDFs round-trip with no worker-side setup.
  • New Expr.to_bytes(ctx=None) / Expr.from_bytes(buf, ctx=None) shape. from_bytes is now a classmethod with ctx as a keyword-only None-default. Breaking for any direct Expr.from_bytes(ctx, blob) callers — the in-tree call sites are updated.
  • New public module datafusion.ipc with set_worker_ctx / get_worker_ctx / clear_worker_ctx.
  • New ScalarUDF.name property.
  • New runtime dependency on cloudpickle>=2.0.

Expr.from_bytes` has a signature flip, but that is unreleased (only merged yesterday) and so not a change any user will experience.

Adds Python-aware encoding to PythonLogicalCodec/PythonPhysicalCodec
so a ScalarUDF defined in Python travels inside the serialized
expression (cloudpickled into fun_definition) instead of needing a
matching registration on the receiver. With that in place, Expr gains
__reduce__ + classmethod from_bytes(buf, ctx=None) so pickle.dumps /
pickle.loads work end-to-end on expressions built from col, lit,
built-in functions, and Python scalar UDFs.

Wire format is framed as <DFPYUDF magic, version byte, cloudpickle
tuple>; the version byte lets a too-new/too-old payload surface a
clean Execution error instead of an opaque cloudpickle unpack
failure. Schema serde is via arrow-rs's native IPC (no pyarrow
round-trip). Cloudpickle module handle is cached per-interpreter
through PyOnceLock.

Worker-side context resolution lives in a new datafusion.ipc module:
set_worker_ctx / get_worker_ctx / clear_worker_ctx plus a private
_resolve_ctx helper consulted by Expr.from_bytes. Priority is
explicit ctx > worker ctx > global SessionContext. FFI UDFs still
travel by name and require the matching registration on the
receiver's context.

Aggregate and window UDF inline encoding, the per-session
with_python_udf_inlining toggle, sender-side context, and the
user-guide docs land in follow-on PRs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer changed the title feat: pickle support for Expr via inline scalar UDF encoding (1/4) feat: enable pickling of most Expr except udaf and udwf May 15, 2026
@timsaucer timsaucer requested a review from Copilot May 15, 2026 19:12
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

…edge-case tests

Inline `.. warning::` blocks on `Expr.to_bytes`, `Expr.from_bytes`, and
`Expr.__reduce__` so the cloudpickle / arbitrary-code-execution caveat is
visible at the public API surface in advance of the user-guide page that
lands in PR 4.

Add doctest-style `Examples:` blocks to `datafusion.ipc` functions
(`set_worker_ctx`, `clear_worker_ctx`, `get_worker_ctx`, `_resolve_ctx`),
`ScalarUDF.name`, and the new `Expr` pickle methods, per CLAUDE.md.

Tighten `Expr.__reduce__` return annotation to
`tuple[Callable[[bytes], Expr], tuple[bytes]]`.

Tests: multi-arg UDF round-trip (covers synthetic `arg_{i}` schema-field
loop in the codec) plus malformed-bytes paths through `Expr.from_bytes`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@timsaucer timsaucer marked this pull request as ready for review May 15, 2026 19:36
Copy link
Copy Markdown
Contributor

@ntjohnson1 ntjohnson1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general shape of the changes here seem reasonable. I guess my only lingering thought/question is mostly around cloudpickle capabilities and the usage of it.

Comment thread crates/core/src/codec.rs
})
}

/// Build the cloudpickle payload for a `PythonFunctionScalarUDF`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is capture more clearly somewhere else but it feels like there is some nuance of the dependency on cloudpickle that's not fully communicated here. I didn't do too much of a deep dive on it.

  1. cloudpickle only works on the same version of python (I'm not sure if it detects the mismatch with a nice error). So potentially your header might want to capture the source python version to give a nicer error and advertise that there is a limitation of only sending to the same version of python for remote workers

  2. cloudpickle seems to have serialize by reference (more like dill) and by value (super cool). The former needs the function installed in the environment so when deserialized it can reference it where maybe the later tries to just capture all necessary bits (here is where I didn't deep dive a ton). Those are fairly different mental models for support.

def test_udf_self_contained_blob(self):
e = _double_udf()(col("a"))
blob = pickle.dumps(e)
# The codec inlines the callable, so the blob is much bigger than a
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is testing the thing I was asking about but I haven't thought deeply enough if it actually does. If I know cloud pickle says it can serialize lambdas but if I instead had

from foo import double
def _double_udf():
    return udf(
        double,
        [pa.int64()],
        pa.int64(),
        volatility="immutable",
        name="double",
    )

Would I still be able to deserialize this on remote in a python environment without foo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants