diff --git a/Cargo.lock b/Cargo.lock index be64bc53..2ae943d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4288,6 +4288,7 @@ dependencies = [ "paste", "pyo3", "zenoh", + "zenoh-buffers", "zenoh-ext", ] diff --git a/Cargo.toml b/Cargo.toml index ff161af4..c8d431ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,11 +43,12 @@ maintenance = { status = "actively-developed" } [dependencies] paste = "1.0.14" -pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] } +pyo3 = { version = "0.25.1", features = ["extension-module"] } zenoh = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ "internal", "unstable", ], default-features = false } +zenoh-buffers = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } zenoh-ext = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ "internal", ], optional = true } diff --git a/docs/concepts.rst b/docs/concepts.rst index 1dcf474d..65f81a31 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -267,6 +267,96 @@ Example: Using :class:`zenoh.ZBytes` :start-after: [raw_data] :end-before: # [raw_data] +Scatter-gather payloads +~~~~~~~~~~~~~~~~~~~~~~~ + +Use :meth:`zenoh.ZBytes.from_segments` to construct a payload from multiple +Python buffer protocol objects without first joining them into one large +``bytes`` value: + +.. code-block:: python + + payload = zenoh.ZBytes.from_segments( + [header, segment_0, segment_1], + copy=True, + ) + publisher.put(payload) + +``copy=True`` copies each input buffer into Zenoh-owned memory while preserving +separate physical slices where possible. C-contiguous, byte-compatible buffers +are copied directly into Zenoh-owned slices. Set ``require_contiguous=False`` to +explicitly allow a non-contiguous input buffer to be flattened and copied. + +``copy=False`` performs strict zero-copy construction for read-only, +C-contiguous, single-byte Python buffer exporters. This includes ``bytes``, +eligible ``memoryview`` objects, and custom exporters such as serialization +library segment views. Cropped memoryviews are supported if they still describe +one contiguous slice. With shared memory enabled, ``shm.ZShm`` segments are +preserved without copying, and ``shm.ZShmMut`` segments are consumed just like +passing them directly to ``ZBytes``. Generic ``memoryview`` objects are treated +as raw borrowed buffers and do not carry shared-memory identity. ``ZBytes`` +retains each exported buffer view until Zenoh no longer references the payload. + +External buffer pools can attach a lease token to a raw borrowed zero-copy +payload. The lease object must provide ``sink`` and ``lease_id`` attributes. +When Zenoh releases its last borrowed-buffer reference, zenoh-python only +notifies the sink by calling ``lease.sink.release(lease.lease_id)``; the +provider decides how to enqueue, deduplicate, or process that release event: + +.. code-block:: python + + class LeaseState: + def __init__(self, sink, lease_id): + self.sink = sink + self.lease_id = lease_id + + payload = zenoh.ZBytes.from_segments( + [header_view, body_view], + copy=False, + lease=LeaseState(pool_release_sink, slot_id), + ) + +The sink's ``release`` method should be non-blocking or return quickly. Shared +memory buffers have their own lifecycle management, so ``lease`` cannot be used +with ``shm.ZShm`` or ``shm.ZShmMut`` segments. + +The read-only flag prevents writes through the exported view but cannot prevent +writes through every alias to the same backing memory. After passing segments +to ``copy=False``, the application must treat their backing memory as immutable +until Zenoh no longer references the payload. Writable buffers, non-contiguous +buffers, and buffers whose items are not one byte wide raise ``RuntimeError`` +instead of silently falling back to copying. Use ``copy=True`` for those +buffers. + +On the receiving side, :meth:`zenoh.ZBytes.segments` returns a tuple of +zero-copy :class:`zenoh.ZBytesSegment` views over the payload's physical slices: + +.. code-block:: python + + physical_slices = sample.payload.segments() + payload_bytes = b"".join(map(bytes, physical_slices)) + +The returned segments remain valid after a subscriber callback returns. Each +segment implements the Python buffer protocol, so consumers that accept buffer +objects can use the segments directly or explicitly create ``memoryview`` +objects: + +.. code-block:: python + + views = sample.payload.memoryviews() + first = memoryview(sample.payload.segments()[0]) + +``bytes(segment)`` copies one segment. ``bytes(payload)`` or +``payload.to_bytes()`` copies the whole payload. If you need the previous +copy-out behavior where each returned memoryview is backed by a new Python +``bytes`` object, use :meth:`zenoh.ZBytes.copied_memoryviews`. + +Physical slice boundaries are an internal memory layout optimization. They are +not application-level frames and may differ from sender-side input boundaries +after routing, fragmentation, or shared-memory conversion. Applications that +need stable frames, such as Cap'n Proto segments, must encode segment lengths or +offsets in a payload header and reconstruct logical segments on receipt. + Serialization and deserialization of basic types and structures is provided in the :mod:`zenoh.ext` module via :func:`zenoh.ext.z_serialize` and :func:`zenoh.ext.z_deserialize`. diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 00000000..f4365ac8 --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,81 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + ffi::CString, + os::raw::{c_int, c_void}, + ptr, +}; + +use pyo3::{exceptions::PyBufferError, ffi, prelude::*}; + +/// Populate `view` as a read-only, single-byte, C-contiguous buffer over +/// `data`, transferring ownership of `owner` into the exported view so the +/// backing storage stays alive while the consumer holds the buffer. +/// +/// # Safety +/// `data` must remain valid for as long as `owner` keeps the buffer alive, and +/// `view` must point to a valid `Py_buffer` provided by the buffer protocol. +pub(crate) unsafe fn fill_readonly_u8_buffer( + owner: Bound<'_, PyAny>, + data: &[u8], + view: *mut ffi::Py_buffer, + flags: c_int, +) -> PyResult<()> { + if view.is_null() { + return Err(PyBufferError::new_err("view is null")); + } + if flags & ffi::PyBUF_WRITABLE == ffi::PyBUF_WRITABLE { + return Err(PyBufferError::new_err("object is not writable")); + } + + unsafe { + (*view).obj = owner.into_ptr(); + (*view).buf = data.as_ptr() as *mut c_void; + (*view).len = data.len() as ffi::Py_ssize_t; + (*view).readonly = 1; + (*view).itemsize = 1; + (*view).format = if flags & ffi::PyBUF_FORMAT == ffi::PyBUF_FORMAT { + CString::new("B").unwrap().into_raw() + } else { + ptr::null_mut() + }; + (*view).ndim = 1; + (*view).shape = if flags & ffi::PyBUF_ND == ffi::PyBUF_ND { + &mut (*view).len + } else { + ptr::null_mut() + }; + (*view).strides = if flags & ffi::PyBUF_STRIDES == ffi::PyBUF_STRIDES { + &mut (*view).itemsize + } else { + ptr::null_mut() + }; + (*view).suboffsets = ptr::null_mut(); + (*view).internal = ptr::null_mut(); + } + Ok(()) +} + +/// Release the format string allocated by [`fill_readonly_u8_buffer`]. +/// +/// # Safety +/// `view` must be a `Py_buffer` previously populated by +/// [`fill_readonly_u8_buffer`]. +pub(crate) unsafe fn release_u8_buffer(view: *mut ffi::Py_buffer) { + unsafe { + if !view.is_null() && !(*view).format.is_null() { + drop(CString::from_raw((*view).format)); + } + } +} diff --git a/src/bytes.rs b/src/bytes.rs index c683ce5d..226e0c3c 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -11,22 +11,284 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{borrow::Cow, io::Read}; +#[cfg(feature = "shared-memory")] +use std::collections::HashSet; +use std::{ + any::Any, + borrow::Cow, + fmt, + io::Read, + os::raw::{c_int, c_void}, + ptr, slice, + sync::Arc, +}; use pyo3::{ - exceptions::{PyTypeError, PyValueError}, + exceptions::{PyRuntimeError, PyTypeError, PyValueError}, + ffi, prelude::*, - types::{PyByteArray, PyBytes, PyString}, + types::{PyByteArray, PyBytes, PyMemoryView, PyString, PyTuple}, }; +use zenoh_buffers::{ZBuf, ZSliceBuffer}; use crate::{ + buffer::{fill_readonly_u8_buffer, release_u8_buffer}, macros::{downcast_or_new, wrapper}, utils::{IntoPyResult, MapInto}, }; +unsafe extern "C" { + // `PyObject_AsReadBuffer` is part of the stable ABI. Holding a Python + // `memoryview` separately gives the acquired exporter resources a clear + // lifetime even though this legacy API returns only a pointer and length. + #[link_name = "PyObject_AsReadBuffer"] + fn py_object_as_read_buffer( + obj: *mut ffi::PyObject, + buffer: *mut *const c_void, + buffer_len: *mut ffi::Py_ssize_t, + ) -> c_int; +} + +struct BorrowedPyBufferSlice { + _owner: Py, + _lease: Option>, + ptr: *const u8, + len: usize, +} + +impl BorrowedPyBufferSlice { + fn new(buffer: &Bound) -> PyResult { + let mut ptr = ptr::null(); + let mut len = 0; + if unsafe { py_object_as_read_buffer(buffer.as_ptr(), &mut ptr, &mut len) } == -1 { + return Err(PyErr::fetch(buffer.py())); + } + if len < 0 { + Err(PyRuntimeError::new_err( + "buffer exporter returned a negative length", + )) + } else if len > 0 && ptr.is_null() { + Err(PyRuntimeError::new_err( + "buffer exporter returned a null pointer for a non-empty segment", + )) + } else { + Ok(Self { + _owner: buffer.clone().unbind(), + _lease: None, + ptr: ptr.cast(), + len: len as usize, + }) + } + } + + fn with_lease(mut self, lease: Option>) -> Self { + self._lease = lease; + self + } + + fn as_bytes(&self) -> &[u8] { + if self.len == 0 { + &[] + } else { + // SAFETY: `_owner` retains the validated `memoryview`, which owns + // its exporter resources and keeps this contiguous slice valid. + unsafe { slice::from_raw_parts(self.ptr, self.len) } + } + } +} + +struct LeaseState { + sink: Py, + lease_id: Py, +} + +impl LeaseState { + fn new(lease: &Bound) -> PyResult { + let sink = lease + .getattr("sink") + .map_err(|_| PyTypeError::new_err("lease must provide a 'sink' attribute"))?; + let release = sink + .getattr("release") + .map_err(|_| PyTypeError::new_err("lease.sink must provide a 'release' method"))?; + if !release.is_callable() { + return Err(PyTypeError::new_err("lease.sink.release must be callable")); + } + let lease_id = lease + .getattr("lease_id") + .map_err(|_| PyTypeError::new_err("lease must provide a 'lease_id' attribute"))?; + Ok(Self { + sink: sink.unbind(), + lease_id: lease_id.unbind(), + }) + } + + fn into_guard(self) -> Arc { + Arc::new(LeaseGuard { + sink: self.sink, + lease_id: self.lease_id, + }) + } +} + +struct LeaseGuard { + sink: Py, + lease_id: Py, +} + +impl Drop for LeaseGuard { + fn drop(&mut self) { + Python::with_gil(|py| { + let sink = self.sink.bind(py); + if let Err(err) = sink.call_method1("release", (self.lease_id.bind(py),)) { + err.write_unraisable(py, Some(sink)); + } + }); + } +} + +// SAFETY: `_owner` retains the validated `memoryview` while this pointer may be +// read from another thread. +unsafe impl Send for BorrowedPyBufferSlice {} +// SAFETY: The `copy=False` contract requires callers not to mutate the +// exported memory through another alias while Zenoh may reference it. +unsafe impl Sync for BorrowedPyBufferSlice {} + +impl fmt::Debug for BorrowedPyBufferSlice { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("BorrowedPyBufferSlice") + .field("len", &self.len) + .finish_non_exhaustive() + } +} + +impl ZSliceBuffer for BorrowedPyBufferSlice { + fn as_slice(&self) -> &[u8] { + self.as_bytes() + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +fn py_buffer_zbytes(buffer: BorrowedPyBufferSlice) -> zenoh::bytes::ZBytes { + ZBuf::from(Arc::new(buffer)).into() +} + +fn single_slice_zbytes(slice: zenoh_buffers::ZSlice) -> zenoh::bytes::ZBytes { + let mut zbuf = ZBuf::empty(); + zbuf.push_zslice(slice); + zbuf.into() +} + +fn physical_segment_zbytes( + zbytes: &zenoh::bytes::ZBytes, +) -> impl Iterator { + let zbuf: ZBuf = zbytes.clone().into(); + zbuf.into_zslices().map(single_slice_zbytes) +} + +fn copied_memoryviews<'py>( + zbytes: &zenoh::bytes::ZBytes, + py: Python<'py>, +) -> PyResult> { + let memoryview = py.import("builtins")?.getattr("memoryview")?; + let views = zbytes + .slices() + .map(|slice| memoryview.call1((PyBytes::new(py, slice),))) + .collect::>>()?; + PyTuple::new(py, views) +} + +#[pyclass] +#[derive(Clone)] +pub(crate) struct ZBytesSegment { + inner: zenoh::bytes::ZBytes, +} + +impl ZBytesSegment { + fn new(inner: zenoh::bytes::ZBytes) -> Self { + Self { inner } + } + + fn as_slice(&self) -> &[u8] { + self.inner.slices().next().unwrap_or(&[]) + } + + fn clone_inner(&self) -> zenoh::bytes::ZBytes { + self.inner.clone() + } +} + +#[pymethods] +impl ZBytesSegment { + fn to_bytes<'py>(&self, py: Python<'py>) -> PyResult> { + PyBytes::new_with(py, self.inner.len(), |bytes| { + self.inner.reader().read_exact(bytes).into_pyres() + }) + } + + #[cfg(feature = "shared-memory")] + fn as_shm(&self) -> Option { + self.inner.as_shm().map(ToOwned::to_owned).map_into() + } + + fn __len__(&self) -> usize { + self.inner.len() + } + + fn __bool__(&self) -> bool { + !self.inner.is_empty() + } + + fn __bytes__<'py>(&self, py: Python<'py>) -> PyResult> { + self.to_bytes(py) + } + + fn __repr__(&self) -> String { + format!("ZBytesSegment({:?})", self.inner) + } + + unsafe fn __getbuffer__( + slf: Bound<'_, Self>, + view: *mut ffi::Py_buffer, + flags: c_int, + ) -> PyResult<()> { + let (ptr, len) = { + let segment = slf.borrow(); + let bytes = segment.as_slice(); + (bytes.as_ptr(), bytes.len()) + }; + let bytes = if len == 0 { + &[] + } else { + // SAFETY: `slf` owns the single-slice ZBytes that keeps the backing + // storage alive for at least as long as the exported buffer. + unsafe { slice::from_raw_parts(ptr, len) } + }; + unsafe { fill_readonly_u8_buffer(slf.into_any(), bytes, view, flags) } + } + + unsafe fn __releasebuffer__(&self, view: *mut ffi::Py_buffer) { + unsafe { release_u8_buffer(view) } + } +} + wrapper!(zenoh::bytes::ZBytes: Clone, Default); downcast_or_new!(ZBytes); +enum SegmentAction { + Append(zenoh::bytes::ZBytes), + Borrow(BorrowedPyBufferSlice), + #[cfg(feature = "shared-memory")] + MoveShmMut(Py), +} + #[pymethods] impl ZBytes { #[new] @@ -63,6 +325,194 @@ impl ZBytes { }) } + #[staticmethod] + #[pyo3(signature = (segments, *, copy = false, require_contiguous = true, lease = None))] + fn from_segments( + segments: &Bound, + copy: bool, + require_contiguous: bool, + lease: Option<&Bound>, + ) -> PyResult { + let py = segments.py(); + let lease = lease.map(LeaseState::new).transpose()?; + if lease.is_some() && copy { + return Err(PyRuntimeError::new_err( + "lease can only be used with copy=False", + )); + } + let memoryview = py.import("builtins")?.getattr("memoryview")?; + let mut actions = Vec::new(); + let mut has_nonempty_raw_borrow = false; + #[cfg(feature = "shared-memory")] + let mut mutable_shm_segments = HashSet::new(); + + for (index, segment) in segments.try_iter()?.enumerate() { + let segment = segment?; + if let Ok(segment) = segment.downcast_exact::() { + if copy { + actions.push(SegmentAction::Append( + segment.borrow().as_slice().to_vec().into(), + )); + } else { + actions.push(SegmentAction::Append(segment.borrow().clone_inner())); + } + continue; + } + + #[cfg(feature = "shared-memory")] + if let Ok(buf) = segment.downcast_exact::() { + if lease.is_some() { + return Err(PyRuntimeError::new_err( + "lease cannot be used with shared-memory segments", + )); + } + if copy { + actions.push(SegmentAction::Append( + buf.borrow().0.as_ref().to_vec().into(), + )); + } else { + actions.push(SegmentAction::Append(buf.borrow().0.clone().into())); + } + continue; + } + + #[cfg(feature = "shared-memory")] + if let Ok(buf) = segment.downcast_exact::() { + if lease.is_some() { + return Err(PyRuntimeError::new_err( + "lease cannot be used with shared-memory segments", + )); + } + if copy { + actions.push(SegmentAction::Append(buf.borrow().get()?.to_vec().into())); + } else { + buf.borrow().get()?; + let ptr = segment.as_ptr() as usize; + if !mutable_shm_segments.insert(ptr) { + return Err(PyRuntimeError::new_err(format!( + "segment {index} repeats the same mutable SHM buffer; \ + zero-copy would need to consume it more than once" + ))); + } + actions.push(SegmentAction::MoveShmMut(buf.clone().unbind())); + } + continue; + } + + if !copy { + let view = memoryview.call1((&segment,)).map_err(|_| { + let type_name = segment + .get_type() + .name() + .map(|name| name.to_string()) + .unwrap_or_else(|_| "".to_string()); + PyRuntimeError::new_err(format!( + "zero-copy requires a read-only, C-contiguous, byte-compatible Python \ + buffer; segment {index} has type '{type_name}'; use copy=True" + )) + })?; + if !view.getattr("readonly")?.extract::()? { + return Err(PyRuntimeError::new_err(format!( + "segment {index} is writable; zero-copy requires a read-only buffer; \ + use copy=True" + ))); + } + if !view.getattr("c_contiguous")?.extract::()? { + return Err(PyRuntimeError::new_err(format!( + "segment {index} is not C-contiguous; zero-copy requires one contiguous \ + byte slice; use copy=True" + ))); + } + if view.getattr("itemsize")?.extract::()? != 1 { + return Err(PyRuntimeError::new_err(format!( + "segment {index} has unsupported item format; zero-copy requires a \ + single-byte buffer; use copy=True" + ))); + } + let buffer = BorrowedPyBufferSlice::new(&view)?; + has_nonempty_raw_borrow |= buffer.len > 0; + actions.push(SegmentAction::Borrow(buffer)); + continue; + } + + let view = memoryview.call1((&segment,)).map_err(|_| { + PyTypeError::new_err(format!( + "segment {index} does not support the Python buffer protocol" + )) + })?; + if view.getattr("itemsize")?.extract::()? != 1 { + return Err(PyTypeError::new_err(format!( + "segment {index} has unsupported item format; \ + expected a byte-compatible buffer" + ))); + } + let c_contiguous = view.getattr("c_contiguous")?.extract::()?; + if require_contiguous && !c_contiguous { + return Err(PyTypeError::new_err(format!( + "segment {index} is not C-contiguous; use require_contiguous=False" + ))); + } + if c_contiguous { + actions.push(SegmentAction::Append( + BorrowedPyBufferSlice::new(&view)? + .as_bytes() + .to_vec() + .into(), + )); + } else { + let bytes = view.call_method0("tobytes")?; + actions.push(SegmentAction::Append( + bytes.downcast::()?.as_bytes().to_vec().into(), + )); + } + } + let lease = if let Some(lease) = lease { + if !has_nonempty_raw_borrow { + return Err(PyRuntimeError::new_err( + "lease requires at least one non-empty ordinary Python zero-copy buffer segment", + )); + } + Some(lease.into_guard()) + } else { + None + }; + let mut writer = zenoh::bytes::ZBytes::writer(); + for action in actions { + match action { + SegmentAction::Append(zbytes) => writer.append(zbytes), + SegmentAction::Borrow(buffer) => { + writer.append(py_buffer_zbytes(buffer.with_lease(lease.clone()))) + } + #[cfg(feature = "shared-memory")] + SegmentAction::MoveShmMut(buf) => { + writer.append(buf.bind(py).borrow_mut().take()?.into()); + } + } + } + Ok(Self(writer.finish())) + } + + fn segments<'py>(&self, py: Python<'py>) -> PyResult> { + let segments = physical_segment_zbytes(&self.0) + .map(|inner| Py::new(py, ZBytesSegment::new(inner))) + .collect::>>()?; + PyTuple::new(py, segments) + } + + fn memoryviews<'py>(&self, py: Python<'py>) -> PyResult> { + let views = physical_segment_zbytes(&self.0) + .map(|inner| { + let segment = Py::new(py, ZBytesSegment::new(inner))?; + PyMemoryView::from(segment.bind(py).as_any()).map(|view| view.unbind()) + }) + .collect::>>()?; + PyTuple::new(py, views) + } + + fn copied_memoryviews<'py>(&self, py: Python<'py>) -> PyResult> { + copied_memoryviews(&self.0, py) + } + fn to_string(&self) -> PyResult> { self.0 .try_to_string() diff --git a/src/lib.rs b/src/lib.rs index 09235cce..32c3c42f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ // // TODO https://github.com/eclipse-zenoh/zenoh-python/pull/235#discussion_r1644498390 // mod logging; +mod buffer; mod bytes; mod cancellation; mod config; @@ -57,7 +58,7 @@ pub(crate) mod zenoh { #[pymodule_export] use crate::{ - bytes::{Encoding, ZBytes}, + bytes::{Encoding, ZBytes, ZBytesSegment}, cancellation::CancellationToken, config::{Config, WhatAmI, WhatAmIMatcher, ZenohId}, handlers::Handler, diff --git a/src/shm.rs b/src/shm.rs index 0f067033..533df1ac 100644 --- a/src/shm.rs +++ b/src/shm.rs @@ -1,13 +1,20 @@ -use std::{num::NonZeroUsize, str, sync::Arc}; +use std::{ + num::NonZeroUsize, + os::raw::c_int, + slice, str, + sync::Arc, +}; use pyo3::{ exceptions::{PyTypeError, PyValueError}, + ffi, prelude::*, types::{PyByteArray, PyBytes, PySlice, PyString, PyType}, }; use zenoh::shm::{ChunkAllocResult, PosixShmProviderBackend, ShmBuf}; use crate::{ + buffer::{fill_readonly_u8_buffer, release_u8_buffer}, macros::{downcast_or_new, wrapper, zerror}, utils::{wait, IntoPyResult, MapInto}, }; @@ -225,6 +232,30 @@ impl ZShm { fn __bytes__<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> { PyBytes::new(py, &self.0) } + + unsafe fn __getbuffer__( + slf: Bound<'_, Self>, + view: *mut ffi::Py_buffer, + flags: c_int, + ) -> PyResult<()> { + let (ptr, len) = { + let shm = slf.borrow(); + let bytes: &[u8] = shm.0.as_ref(); + (bytes.as_ptr(), bytes.len()) + }; + let bytes = if len == 0 { + &[] + } else { + // SAFETY: `slf` owns the ZShm handle and keeps the mapped SHM + // buffer alive for at least as long as the exported buffer. + unsafe { slice::from_raw_parts(ptr, len) } + }; + unsafe { fill_readonly_u8_buffer(slf.into_any(), bytes, view, flags) } + } + + unsafe fn __releasebuffer__(&self, view: *mut ffi::Py_buffer) { + unsafe { release_u8_buffer(view) } + } } #[pyclass] @@ -233,7 +264,7 @@ pub(crate) struct ZShmMut { } impl ZShmMut { - fn get(&self) -> PyResult<&zenoh::shm::ZShmMut> { + pub(crate) fn get(&self) -> PyResult<&zenoh::shm::ZShmMut> { self.buf .as_ref() .ok_or_else(|| zerror!("ZShmMut has been consumed by ZBytes conversion")) diff --git a/tests/test_session.py b/tests/test_session.py index 14f91880..412a3a0d 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -11,6 +11,7 @@ # Contributors: # ZettaScale Zenoh Team, # +import gc import json import time from typing import List, Tuple @@ -147,17 +148,32 @@ def run_session_pubsub(peer01: Session, peer02: Session): keyexpr = "test_pub/session" msg = "Pub Message".encode() + def make_payload(): + return zenoh.ZBytes.from_segments( + [ + memoryview(bytearray(b"Pub ")).toreadonly(), + memoryview(bytearray(b"Message")).toreadonly(), + ] + ) + + payload = make_payload() + gc.collect() + num_received = 0 num_errors = 0 + retained_segments = None def sub_callback(sample: Sample): nonlocal num_received nonlocal num_errors + nonlocal retained_segments + if retained_segments is None: + retained_segments = sample.payload.segments() if ( sample.key_expr != keyexpr or sample.priority != Priority.DATA_HIGH or sample.congestion_control != CongestionControl.BLOCK - or bytes(sample.payload) != msg + or b"".join(map(bytes, sample.payload.segments())) != msg ): num_errors += 1 num_received += 1 @@ -173,12 +189,15 @@ def sub_callback(sample: Sample): time.sleep(SLEEP) for _ in range(0, MSG_COUNT): - publisher.put("Pub Message") + publisher.put(payload) time.sleep(SLEEP) print(f"[PS][02d] Received on peer02 session. {num_received}/{MSG_COUNT} msgs.") assert num_received == MSG_COUNT assert num_errors == 0 + gc.collect() + assert retained_segments is not None + assert b"".join(map(bytes, retained_segments)) == msg print("[PS][03d] Undeclare publisher on peer01 session") publisher.undeclare() diff --git a/tests/test_zbytes_segments.py b/tests/test_zbytes_segments.py new file mode 100644 index 00000000..5c0f7bd5 --- /dev/null +++ b/tests/test_zbytes_segments.py @@ -0,0 +1,468 @@ +# +# Copyright (c) 2026 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +import gc +import sys +from array import array + +import pytest + +from zenoh import ZBytes, ZBytesSegment + + +class RecordingLeaseSink: + def __init__(self, *, raise_on_release=False): + self.raise_on_release = raise_on_release + self.released = [] + + def release(self, lease_id): + self.released.append(lease_id) + if self.raise_on_release: + raise RuntimeError("release failed") + + +class LeaseState: + def __init__(self, sink, lease_id): + self.sink = sink + self.lease_id = lease_id + + +def make_lease(lease_id="lease-1", *, raise_on_release=False): + sink = RecordingLeaseSink(raise_on_release=raise_on_release) + return sink, LeaseState(sink, lease_id) + + +@pytest.mark.parametrize( + "segments", + [ + [b"hello", b"world"], + [bytearray(b"hello"), bytearray(b"world")], + [memoryview(b"hello"), memoryview(bytearray(b"world"))], + [array("B", b"hello"), array("b", b"world")], + [b"", b"hello", b"", b"world"], + [bytes([i]) for i in range(256)], + ], +) +def test_from_segments_copies_byte_compatible_buffers(segments): + payload = ZBytes.from_segments(segments, copy=True) + expected = b"helloworld" if len(segments) < 10 else bytes(range(256)) + + assert bytes(payload) == expected + + +def test_from_segments_preserves_owned_segment_layout(): + payload = ZBytes.from_segments([b"hello", b"world"], copy=True) + + assert tuple(map(bytes, payload.segments())) == (b"hello", b"world") + + +def test_from_segments_constructs_zero_copy_payload_from_immutable_bytes(): + def make_payload(): + return ZBytes.from_segments([b"hello", b"world"]) + + payload = make_payload() + gc.collect() + + assert bytes(payload) == b"helloworld" + assert tuple(map(bytes, payload.segments())) == (b"hello", b"world") + + +@pytest.mark.parametrize( + ("segment", "expected"), + [ + (memoryview(b"hello"), b"hello"), + (memoryview(b"hello")[1:], b"ello"), + ], +) +def test_from_segments_constructs_zero_copy_payload_from_bytes_memoryview( + segment, expected +): + payload = ZBytes.from_segments([segment]) + + assert bytes(payload) == expected + + +def test_zero_copy_payload_keeps_python_bytes_owner_alive(): + owner = bytes(bytearray(b"hello")) + initial_refcount = sys.getrefcount(owner) + + payload = ZBytes.from_segments([owner]) + + assert sys.getrefcount(owner) > initial_refcount + del payload + gc.collect() + assert sys.getrefcount(owner) == initial_refcount + + +def test_zero_copy_payload_keeps_readonly_buffer_export_alive(): + owner = bytearray(b"hello") + payload = ZBytes.from_segments([memoryview(owner).toreadonly()]) + + with pytest.raises(BufferError): + owner.extend(b"!") + + assert bytes(payload) == b"hello" + del payload + gc.collect() + owner.extend(b"!") + assert owner == b"hello!" + + +def test_zero_copy_payload_lease_releases_after_payload_drop(): + sink, lease = make_lease("slot-1") + payload = ZBytes.from_segments([b"hello"], lease=lease) + + assert sink.released == [] + + del payload + gc.collect() + + assert sink.released == ["slot-1"] + + +def test_zero_copy_payload_lease_is_shared_by_multiple_segments(): + sink, lease = make_lease("slot-2") + payload = ZBytes.from_segments([b"hello", memoryview(b"world")], lease=lease) + + assert bytes(payload) == b"helloworld" + del payload + gc.collect() + + assert sink.released == ["slot-2"] + + +@pytest.mark.parametrize( + "view_factory", + [ + lambda payload: payload.segments(), + lambda payload: payload.memoryviews(), + ], +) +def test_zero_copy_payload_lease_waits_for_derived_views(view_factory): + sink, lease = make_lease("slot-3") + payload = ZBytes.from_segments([b"hello", b"world"], lease=lease) + views = view_factory(payload) + + del payload + gc.collect() + assert sink.released == [] + + assert b"".join(map(bytes, views)) == b"helloworld" + del views + gc.collect() + + assert sink.released == ["slot-3"] + + +def test_zero_copy_payload_lease_release_error_is_unraisable(monkeypatch): + captured = [] + + def hook(args): + captured.append(args) + + monkeypatch.setattr(sys, "unraisablehook", hook) + sink, lease = make_lease("slot-error", raise_on_release=True) + payload = ZBytes.from_segments([b"hello"], lease=lease) + + del payload + gc.collect() + + assert sink.released == ["slot-error"] + assert len(captured) == 1 + assert isinstance(captured[0].exc_value, RuntimeError) + assert captured[0].object is sink + + +@pytest.mark.parametrize( + "segment", + [ + bytearray(b"hello"), + memoryview(bytearray(b"hello")), + memoryview(b"hello")[::2], + array("I", [1, 2, 3]), + ], +) +def test_from_segments_rejects_unsupported_zero_copy_buffers(segment): + with pytest.raises(RuntimeError, match="segment 0.*use copy=True"): + ZBytes.from_segments([segment]) + + +def test_from_segments_rejects_non_buffer_segment(): + with pytest.raises(TypeError, match="segment 1 does not support"): + ZBytes.from_segments([b"hello", object()], copy=True) + + +def test_from_segments_rejects_non_byte_compatible_segment(): + with pytest.raises(TypeError, match="segment 0 has unsupported item format"): + ZBytes.from_segments([array("I", [1, 2, 3])], copy=True) + + +def test_from_segments_rejects_non_contiguous_segment_by_default(): + segment = memoryview(bytearray(b"hello"))[::2] + + with pytest.raises(TypeError, match="segment 0 is not C-contiguous"): + ZBytes.from_segments([segment], copy=True) + + +def test_from_segments_rejects_lease_with_copy(): + sink, lease = make_lease("slot-copy") + + with pytest.raises(RuntimeError, match="lease can only be used with copy=False"): + ZBytes.from_segments([b"hello"], copy=True, lease=lease) + + assert sink.released == [] + + +@pytest.mark.parametrize("segments", [[], [b""]]) +def test_from_segments_rejects_lease_without_nonempty_raw_borrow(segments): + sink, lease = make_lease("slot-empty") + + with pytest.raises(RuntimeError, match="lease requires at least one non-empty"): + ZBytes.from_segments(segments, lease=lease) + + assert sink.released == [] + + +def test_from_segments_rejects_lease_with_zbytes_segment_only(): + source = ZBytes.from_segments([b"hello"], copy=True) + (segment,) = source.segments() + sink, lease = make_lease("slot-segment") + + with pytest.raises(RuntimeError, match="lease requires at least one non-empty"): + ZBytes.from_segments([segment], lease=lease) + + assert sink.released == [] + + +def test_from_segments_rejects_invalid_lease_interface(): + with pytest.raises(TypeError, match="lease must provide a 'sink' attribute"): + ZBytes.from_segments([b"hello"], lease=object()) + + +def test_from_segments_can_copy_non_contiguous_segment_explicitly(): + segment = memoryview(bytearray(b"hello"))[::2] + + payload = ZBytes.from_segments( + [segment], + copy=True, + require_contiguous=False, + ) + + assert bytes(payload) == b"hlo" + + +def test_segments_return_zero_copy_segment_views_with_independent_lifetimes(): + payload = ZBytes.from_segments([b"hello", b"world"], copy=True) + segments = payload.segments() + + del payload + gc.collect() + + assert isinstance(segments, tuple) + assert all(isinstance(segment, ZBytesSegment) for segment in segments) + assert all(memoryview(segment).readonly for segment in segments) + assert b"".join(map(bytes, segments)) == b"helloworld" + with pytest.raises(TypeError): + memoryview(segments[0])[0] = 0 + + +def test_memoryviews_are_zero_copy_views_over_segments(): + payload = ZBytes.from_segments([b"hello", b"world"], copy=True) + views = payload.memoryviews() + + assert all(isinstance(view, memoryview) for view in views) + assert all(view.readonly for view in views) + assert tuple(map(bytes, views)) == tuple(map(bytes, payload.segments())) + + +def test_memoryviews_keep_segment_owner_alive(): + payload = ZBytes.from_segments([b"hello", b"world"], copy=True) + views = payload.memoryviews() + + del payload + gc.collect() + + assert tuple(map(bytes, views)) == (b"hello", b"world") + + +def test_copied_memoryviews_preserve_old_copy_out_behavior(): + payload = ZBytes.from_segments([b"hello", b"world"], copy=True) + views = payload.copied_memoryviews() + + del payload + gc.collect() + + assert all(isinstance(view, memoryview) for view in views) + assert tuple(map(bytes, views)) == (b"hello", b"world") + + +def test_from_segments_copies_large_payload_without_joining_inputs(): + segment = bytes(1024 * 1024) + payload = ZBytes.from_segments([segment, segment, segment, segment], copy=True) + + assert len(payload) == 4 * 1024 * 1024 + assert sum(map(len, payload.segments())) == len(payload) + + +def test_from_segments_accepts_numpy_uint8_when_available(): + numpy = pytest.importorskip("numpy") + segments = [numpy.array([1, 2, 3], dtype=numpy.uint8)] + + assert bytes(ZBytes.from_segments(segments, copy=True)) == b"\x01\x02\x03" + + +def test_from_segments_accepts_readonly_numpy_uint8_zero_copy_when_available(): + numpy = pytest.importorskip("numpy") + segment = numpy.array([1, 2, 3], dtype=numpy.uint8) + segment.flags.writeable = False + + assert bytes(ZBytes.from_segments([segment])) == b"\x01\x02\x03" + + +def test_from_segments_accepts_zbytes_segment_without_copy(): + source = ZBytes.from_segments([b"hello", b"world"], copy=True) + hello, world = source.segments() + + payload = ZBytes.from_segments([world, hello], copy=False) + + assert bytes(payload) == b"worldhello" + + +def test_from_segments_copies_zbytes_segment(): + source = ZBytes.from_segments([bytearray(b"hello")], copy=True) + (segment,) = source.segments() + + payload = ZBytes.from_segments([segment], copy=True) + + assert bytes(payload) == b"hello" + + +def test_from_segments_accepts_shm_mut_zero_copy_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + + payload = ZBytes.from_segments([buf], copy=False) + + assert bytes(payload) == b"hello" + assert payload.as_shm() is not None + with pytest.raises(Exception, match="consumed"): + bytes(buf) + + +def test_from_segments_rejects_lease_with_shm_mut_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + sink, lease = make_lease("slot-shm-mut") + + with pytest.raises(RuntimeError, match="lease cannot be used with shared-memory"): + ZBytes.from_segments([buf], copy=False, lease=lease) + + assert bytes(buf) == b"hello" + assert sink.released == [] + + +def test_from_segments_accepts_shm_zero_copy_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + original = ZBytes(buf).as_shm() + + payload = ZBytes.from_segments([original], copy=False) + + assert bytes(payload) == b"hello" + assert payload.as_shm() is not None + + +def test_from_segments_rejects_lease_with_shm_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + original = ZBytes(buf).as_shm() + sink, lease = make_lease("slot-shm") + + with pytest.raises(RuntimeError, match="lease cannot be used with shared-memory"): + ZBytes.from_segments([original], copy=False, lease=lease) + + assert sink.released == [] + + +def test_from_segments_accepts_mixed_shm_segments_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"frame" + + payload = ZBytes.from_segments([b"h", buf, b"t"], copy=False) + + assert bytes(payload) == b"hframet" + assert payload.as_shm() is None + segments = payload.segments() + assert tuple(map(bytes, segments)) == (b"h", b"frame", b"t") + assert segments[1].as_shm() is not None + + +def test_from_segments_copies_shm_mut_without_consuming_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + + payload = ZBytes.from_segments([buf], copy=True) + + assert bytes(payload) == b"hello" + assert payload.as_shm() is None + assert bytes(buf) == b"hello" + + +def test_from_segments_does_not_partially_consume_shm_mut_on_validation_error(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + + with pytest.raises(RuntimeError, match="segment 1.*use copy=True"): + ZBytes.from_segments([buf, bytearray(b"mutable")], copy=False) + + assert bytes(buf) == b"hello" + + +def test_from_segments_rejects_repeated_shm_mut_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + + with pytest.raises(RuntimeError, match="repeats the same mutable SHM"): + ZBytes.from_segments([buf, buf], copy=False) + + assert bytes(buf) == b"hello" + + +def test_zshm_and_shm_segment_export_readonly_memoryview_when_available(): + shm = pytest.importorskip("zenoh.shm") + provider = shm.ShmProvider.default_backend(4096) + buf = provider.alloc(5) + buf[:] = b"hello" + payload = ZBytes.from_segments([buf], copy=False) + zshm = payload.as_shm() + + shm_view = memoryview(zshm) + segment_view = memoryview(payload.segments()[0]) + + assert shm_view.readonly + assert segment_view.readonly + assert bytes(shm_view) == b"hello" + assert bytes(segment_view) == b"hello" diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 5fe200e5..4bad41c2 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -11,7 +11,7 @@ # Contributors: # ZettaScale Zenoh Team, # -from collections.abc import Callable +from collections.abc import Callable, Iterable from datetime import datetime, timedelta from enum import Enum, auto from pathlib import Path @@ -2193,6 +2193,19 @@ class WhatAmIMatcher: _IntoWhatAmIMatcher = WhatAmIMatcher | str +@final +class ZBytesSegment: + """A zero-copy view over one physical ZBytes slice.""" + + @_unstable + def as_shm(self) -> shm.ZShm | None: ... + def to_bytes(self) -> bytes: + """Copy this segment into a Python bytes object.""" + + def __bool__(self) -> bool: ... + def __len__(self) -> int: ... + def __bytes__(self) -> bytes: ... + @final class ZBytes: """ZBytes represents raw bytes data that can be interpreted as strings or byte arrays. @@ -2214,6 +2227,49 @@ class ZBytes: def __new__( cls, bytes: bytearray | bytes | str | shm.ZShm | shm.ZShmMut | None = None ) -> Self: ... + @staticmethod + def from_segments( + segments: Iterable[Any], + *, + copy: bool = False, + require_contiguous: bool = True, + lease: Any | None = None, + ) -> Self: + """Build a payload from Python buffer protocol objects. + + ``copy=True`` copies each input buffer into a separate Zenoh-owned segment. + ``copy=False`` performs strict zero-copy construction for read-only, + C-contiguous, single-byte buffer exporters and raises ``RuntimeError`` for + unsupported buffers. The caller must not mutate their backing memory through + another alias while Zenoh may still reference the payload. + + With shared-memory enabled, ``copy=False`` preserves ``shm.ZShm`` and + consumes ``shm.ZShmMut`` segments. Generic memoryviews are treated as raw + borrowed buffers, not as shared-memory descriptors. + + ``lease`` may be used with ``copy=False`` raw borrowed buffers to bind an + external pool lease to Zenoh's internal payload lifetime. The object must + provide ``lease.sink`` and ``lease.lease_id``; when Zenoh releases the last + borrowed buffer reference, it calls ``lease.sink.release(lease.lease_id)``. + The release method should be non-blocking or return quickly. Shared-memory + segments have their own lifetime management and cannot be combined with a + custom lease. + """ + + def segments(self) -> tuple[ZBytesSegment, ...]: + """Return zero-copy views for the payload's physical slices. + + The returned segment views keep their backing payload memory alive. + Physical slice boundaries are an internal memory layout detail and must not + be used as application-level framing. + """ + + def memoryviews(self) -> tuple[memoryview, ...]: + """Return zero-copy memoryviews for the payload's physical slices.""" + + def copied_memoryviews(self) -> tuple[memoryview, ...]: + """Return memoryviews backed by copied Python bytes for each physical slice.""" + def to_bytes(self) -> bytes: """Return the underlying data as bytes. diff --git a/zenoh/shm.pyi b/zenoh/shm.pyi index 9a3a6c49..39026e98 100644 --- a/zenoh/shm.pyi +++ b/zenoh/shm.pyi @@ -125,7 +125,10 @@ _IntoMemoryLayout = MemoryLayout | tuple[int, AllocAlignment] | int @_unstable @final class ZShm: - """A SHM buffer""" + """An immutable SHM buffer. + + Implements the Python buffer protocol for read-only memoryviews. + """ def is_valid(self) -> bool: ... def __bytes__(self) -> bytes: ...