Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ maintenance = { status = "actively-developed" }

[dependencies]
paste = "1.0.14"
pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] }
pyo3 = { version = "0.25.1", features = ["extension-module"] }
zenoh = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"internal",
"unstable",
], default-features = false }
zenoh-buffers = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" }
zenoh-ext = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"internal",
], optional = true }
90 changes: 90 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,96 @@ Example: Using :class:`zenoh.ZBytes`
:start-after: [raw_data]
:end-before: # [raw_data]

Scatter-gather payloads
~~~~~~~~~~~~~~~~~~~~~~~

Use :meth:`zenoh.ZBytes.from_segments` to construct a payload from multiple
Python buffer protocol objects without first joining them into one large
``bytes`` value:

.. code-block:: python

payload = zenoh.ZBytes.from_segments(
[header, segment_0, segment_1],
copy=True,
)
publisher.put(payload)

``copy=True`` copies each input buffer into Zenoh-owned memory while preserving
separate physical slices where possible. C-contiguous, byte-compatible buffers
are copied directly into Zenoh-owned slices. Set ``require_contiguous=False`` to
explicitly allow a non-contiguous input buffer to be flattened and copied.

``copy=False`` performs strict zero-copy construction for read-only,
C-contiguous, single-byte Python buffer exporters. This includes ``bytes``,
eligible ``memoryview`` objects, and custom exporters such as serialization
library segment views. Cropped memoryviews are supported if they still describe
one contiguous slice. With shared memory enabled, ``shm.ZShm`` segments are
preserved without copying, and ``shm.ZShmMut`` segments are consumed just like
passing them directly to ``ZBytes``. Generic ``memoryview`` objects are treated
as raw borrowed buffers and do not carry shared-memory identity. ``ZBytes``
retains each exported buffer view until Zenoh no longer references the payload.

External buffer pools can attach a lease token to a raw borrowed zero-copy
payload. The lease object must provide ``sink`` and ``lease_id`` attributes.
When Zenoh releases its last borrowed-buffer reference, zenoh-python only
notifies the sink by calling ``lease.sink.release(lease.lease_id)``; the
provider decides how to enqueue, deduplicate, or process that release event:

.. code-block:: python

class LeaseState:
def __init__(self, sink, lease_id):
self.sink = sink
self.lease_id = lease_id

payload = zenoh.ZBytes.from_segments(
[header_view, body_view],
copy=False,
lease=LeaseState(pool_release_sink, slot_id),
)

The sink's ``release`` method should be non-blocking or return quickly. Shared
memory buffers have their own lifecycle management, so ``lease`` cannot be used
with ``shm.ZShm`` or ``shm.ZShmMut`` segments.

The read-only flag prevents writes through the exported view but cannot prevent
writes through every alias to the same backing memory. After passing segments
to ``copy=False``, the application must treat their backing memory as immutable
until Zenoh no longer references the payload. Writable buffers, non-contiguous
buffers, and buffers whose items are not one byte wide raise ``RuntimeError``
instead of silently falling back to copying. Use ``copy=True`` for those
buffers.

On the receiving side, :meth:`zenoh.ZBytes.segments` returns a tuple of
zero-copy :class:`zenoh.ZBytesSegment` views over the payload's physical slices:

.. code-block:: python

physical_slices = sample.payload.segments()
payload_bytes = b"".join(map(bytes, physical_slices))

The returned segments remain valid after a subscriber callback returns. Each
segment implements the Python buffer protocol, so consumers that accept buffer
objects can use the segments directly or explicitly create ``memoryview``
objects:

.. code-block:: python

views = sample.payload.memoryviews()
first = memoryview(sample.payload.segments()[0])

``bytes(segment)`` copies one segment. ``bytes(payload)`` or
``payload.to_bytes()`` copies the whole payload. If you need the previous
copy-out behavior where each returned memoryview is backed by a new Python
``bytes`` object, use :meth:`zenoh.ZBytes.copied_memoryviews`.

Physical slice boundaries are an internal memory layout optimization. They are
not application-level frames and may differ from sender-side input boundaries
after routing, fragmentation, or shared-memory conversion. Applications that
need stable frames, such as Cap'n Proto segments, must encode segment lengths or
offsets in a payload header and reconstruct logical segments on receipt.

Serialization and deserialization of basic types and structures is provided in the :mod:`zenoh.ext`
module via :func:`zenoh.ext.z_serialize` and :func:`zenoh.ext.z_deserialize`.

Expand Down
81 changes: 81 additions & 0 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
ffi::CString,
os::raw::{c_int, c_void},
ptr,
};

use pyo3::{exceptions::PyBufferError, ffi, prelude::*};

/// Populate `view` as a read-only, single-byte, C-contiguous buffer over
/// `data`, transferring ownership of `owner` into the exported view so the
/// backing storage stays alive while the consumer holds the buffer.
///
/// # Safety
/// `data` must remain valid for as long as `owner` keeps the buffer alive, and
/// `view` must point to a valid `Py_buffer` provided by the buffer protocol.
pub(crate) unsafe fn fill_readonly_u8_buffer(
owner: Bound<'_, PyAny>,
data: &[u8],
view: *mut ffi::Py_buffer,
flags: c_int,
) -> PyResult<()> {
if view.is_null() {
return Err(PyBufferError::new_err("view is null"));
}
if flags & ffi::PyBUF_WRITABLE == ffi::PyBUF_WRITABLE {
return Err(PyBufferError::new_err("object is not writable"));
}

unsafe {
(*view).obj = owner.into_ptr();
(*view).buf = data.as_ptr() as *mut c_void;
(*view).len = data.len() as ffi::Py_ssize_t;
(*view).readonly = 1;
(*view).itemsize = 1;
(*view).format = if flags & ffi::PyBUF_FORMAT == ffi::PyBUF_FORMAT {
CString::new("B").unwrap().into_raw()
} else {
ptr::null_mut()
};
(*view).ndim = 1;
(*view).shape = if flags & ffi::PyBUF_ND == ffi::PyBUF_ND {
&mut (*view).len
} else {
ptr::null_mut()
};
(*view).strides = if flags & ffi::PyBUF_STRIDES == ffi::PyBUF_STRIDES {
&mut (*view).itemsize
} else {
ptr::null_mut()
};
(*view).suboffsets = ptr::null_mut();
(*view).internal = ptr::null_mut();
}
Ok(())
}

/// Release the format string allocated by [`fill_readonly_u8_buffer`].
///
/// # Safety
/// `view` must be a `Py_buffer` previously populated by
/// [`fill_readonly_u8_buffer`].
pub(crate) unsafe fn release_u8_buffer(view: *mut ffi::Py_buffer) {
unsafe {
if !view.is_null() && !(*view).format.is_null() {
drop(CString::from_raw((*view).format));
}
}
}
Loading