diff --git a/Cargo.lock b/Cargo.lock index be64bc53..4500b8ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3887,7 +3887,6 @@ dependencies = [ [[package]] name = "zenoh" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "ahash", "arc-swap", @@ -3938,7 +3937,6 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "zenoh-collections", ] @@ -3946,7 +3944,6 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "tracing", "uhlc", @@ -3958,7 +3955,6 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "ahash", ] @@ -3966,7 +3962,6 @@ dependencies = [ [[package]] name = "zenoh-config" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "json5", "nonempty-collections", @@ -3991,7 +3986,6 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "lazy_static", "tokio", @@ -4002,7 +3996,6 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "aes", "hmac", @@ -4015,7 +4008,6 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "bincode", @@ -4034,7 +4026,6 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "getrandom 0.2.17", "hashbrown 0.16.1", @@ -4049,7 +4040,6 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -4067,7 +4057,6 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "base64", @@ -4103,7 +4092,6 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "rustls-webpki", @@ -4119,7 +4107,6 @@ dependencies = [ [[package]] name = "zenoh-link-quic_datagram" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "rustls-webpki", @@ -4135,7 +4122,6 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "socket2 0.5.10", @@ -4152,7 +4138,6 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "base64", @@ -4181,7 +4166,6 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "libc", @@ -4203,7 +4187,6 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "nix", @@ -4221,7 +4204,6 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "futures-util", @@ -4241,7 +4223,6 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "proc-macro2", "quote", @@ -4252,7 +4233,6 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "git-version", "libloading", @@ -4269,7 +4249,6 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "const_format", "rand 0.8.5", @@ -4294,7 +4273,6 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "anyhow", ] @@ -4302,7 +4280,6 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "lazy_static", "ron", @@ -4316,7 +4293,6 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "advisory-lock", "async-trait", @@ -4345,7 +4321,6 @@ dependencies = [ [[package]] name = "zenoh-stats" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "ahash", "prometheus-client", @@ -4358,7 +4333,6 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "arc-swap", "event-listener", @@ -4372,7 +4346,6 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "futures", "tokio", @@ -4385,7 +4358,6 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "crossbeam-utils", @@ -4421,7 +4393,6 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.9.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#90c06a28c7e1d396ea1a97b3a63a5e6a38afc6ee" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index ff161af4..ec312428 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,10 @@ maintenance = { status = "actively-developed" } [dependencies] paste = "1.0.14" pyo3 = { version = "0.25.1", features = ["abi3-py39", "extension-module"] } -zenoh = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh = { version = "1.9.0", git = "https://github.com/YuanYuYuan/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", "unstable", ], default-features = false } -zenoh-ext = { version = "1.9.0", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ +zenoh-ext = { version = "1.9.0", git = "https://github.com/YuanYuYuan/zenoh.git", branch = "feat/routing-timestamps", features = [ "internal", ], optional = true } diff --git a/examples/z_timestamp_instrumentation.py b/examples/z_timestamp_instrumentation.py new file mode 100644 index 00000000..d4c0ab65 --- /dev/null +++ b/examples/z_timestamp_instrumentation.py @@ -0,0 +1,114 @@ +# +# Copyright (c) 2026 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +# Demonstrates opt-in end-to-end latency instrumentation. +# +# Run this example to see Send/Route/Receive timestamps on each message. +# The custom_callback variant shows how to inject your own clock bytes. +# +import time + +import zenoh +from zenoh import InterceptionPoint, TimestampInstrumentation + + +def print_stack(stack): + if stack is None: + print(" (no timestamp stack)") + return + for rec in stack.records: + ts = rec.as_timestamp() + if ts is not None: + print(f" {rec.point.name:8s} hlc={ts} custom={rec.is_custom}") + else: + print( + f" {rec.point.name:8s} raw={rec.timestamp().hex()} custom={rec.is_custom}" + ) + + +def example_put_subscribe(session): + print("\n── put/subscribe with send+receive instrumentation ─────────────────") + instr = TimestampInstrumentation(send=True, receive=True) + received = [] + with session.declare_subscriber("demo/ts/**", lambda s: received.append(s)): + time.sleep(0.05) + session.put("demo/ts/hello", b"world", timestamp_instrumentation=instr) + time.sleep(0.2) + if received: + print(f"Received sample on '{received[0].key_expr}':") + print_stack(received[0].timestamp_stack) + + +def example_publisher_default(session): + print("\n── publisher with default instrumentation ───────────────────────────") + instr = TimestampInstrumentation(send=True, receive=True) + received = [] + with session.declare_publisher( + "demo/ts/pub", timestamp_instrumentation=instr + ) as pub: + with session.declare_subscriber("demo/ts/pub", lambda s: received.append(s)): + time.sleep(0.05) + pub.put(b"message-1") + pub.put( + b"message-2", + timestamp_instrumentation=TimestampInstrumentation(send=True), + ) + time.sleep(0.2) + for s in received: + print(f"Received '{s.payload.to_string()}':") + print_stack(s.timestamp_stack) + + +def example_custom_callback(): + print("\n── session with custom timestamp callback ───────────────────────────") + import struct + import time as _t + + def my_clock(ctx): + # Return a simple 8-byte little-endian nanosecond timestamp. + ns = int(_t.time_ns()) + return struct.pack(", timestamp: Option, source_info: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { let this = self.get_ref()?; - let builder = build!( + let mut builder = build!( this.put(payload), encoding, attachment, timestamp, source_info ); + if let Some(instr) = timestamp_instrumentation { + builder = builder.timestamp_instrumentation(Some(instr.0)); + } wait(py, builder) } diff --git a/src/query.rs b/src/query.rs index 17d5e674..ccd785c7 100644 --- a/src/query.rs +++ b/src/query.rs @@ -30,6 +30,7 @@ use crate::{ sample::SourceInfo, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampStack, utils::{generic, wait, IntoPyResult, IntoPython, IntoRust, MapInto}, }; @@ -276,6 +277,14 @@ impl Reply { self.0.replier_id().map_into() } + #[getter] + fn timestamp_stack(&self) -> Option { + match self.0.result() { + Ok(sample) => sample.timestamp_stack().cloned().map(TimestampStack), + Err(err) => err.timestamp_stack().cloned().map(TimestampStack), + } + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } @@ -295,6 +304,11 @@ impl ReplyError { self.0.encoding().clone().into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map(TimestampStack) + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } diff --git a/src/sample.rs b/src/sample.rs index dce32cf6..07aaea53 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -21,6 +21,7 @@ use crate::{ qos::{CongestionControl, Priority}, session::EntityGlobalId, time::Timestamp, + timestamp_stack::TimestampStack, utils::MapInto, }; @@ -95,6 +96,11 @@ impl Sample { self.0.source_info().cloned().map_into() } + #[getter] + fn timestamp_stack(&self) -> Option { + self.0.timestamp_stack().cloned().map(TimestampStack) + } + fn __repr__(&self) -> String { format!("{:?}", self.0) } diff --git a/src/session.rs b/src/session.rs index 92ae591e..686fc964 100644 --- a/src/session.rs +++ b/src/session.rs @@ -33,6 +33,7 @@ use crate::{ query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector}, sample::{Locality, SampleKind, SourceInfo}, time::Timestamp, + timestamp_stack::{py_to_session_ts_callback, TimestampInstrumentation}, utils::{duration, wait, IntoPython, MapInto}, }; @@ -94,7 +95,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None))] + #[pyo3(signature = (key_expr, payload, *, encoding = None, congestion_control = None, priority = None, express = None, attachment = None, timestamp = None, allowed_destination = None, source_info = None, timestamp_instrumentation = None))] fn put( &self, py: Python, @@ -108,8 +109,9 @@ impl Session { timestamp: Option, allowed_destination: Option, source_info: Option, + timestamp_instrumentation: Option, ) -> PyResult<()> { - let build = build!( + let mut build = build!( self.0.put(key_expr, payload), encoding, congestion_control, @@ -120,6 +122,9 @@ impl Session { allowed_destination, source_info, ); + if let Some(instr) = timestamp_instrumentation { + build = build.timestamp_instrumentation(Some(instr.0)); + } wait(py, build) } @@ -151,7 +156,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None))] + #[pyo3(signature = (selector, handler = None, *, target = None, consolidation = None, accept_replies = None, timeout = None, congestion_control = None, priority = None, express = None, payload = None, encoding = None, attachment = None, allowed_destination = None, source_info = None, cancellation_token = None, timestamp_instrumentation = None))] fn get( &self, py: Python, @@ -172,9 +177,10 @@ impl Session { allowed_destination: Option, source_info: Option, cancellation_token: Option, + timestamp_instrumentation: Option, ) -> PyResult> { let (handler, _) = into_handler(py, handler, cancellation_token.as_ref())?; - let builder = build!( + let mut builder = build!( self.0.get(selector), target, consolidation, @@ -190,7 +196,9 @@ impl Session { source_info, cancellation_token ); - + if let Some(instr) = timestamp_instrumentation { + builder = builder.timestamp_instrumentation(Some(instr.0)); + } wait(py, builder.with(handler)).map_into() } @@ -235,7 +243,7 @@ impl Session { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, allowed_destination = None))] + #[pyo3(signature = (key_expr, *, encoding = None, congestion_control = None, priority = None, express = None, reliability = None, allowed_destination = None, timestamp_instrumentation = None))] fn declare_publisher( &self, py: Python, @@ -246,8 +254,9 @@ impl Session { express: Option, reliability: Option, allowed_destination: Option, + timestamp_instrumentation: Option, ) -> PyResult { - let builder = build!( + let mut builder = build!( self.0.declare_publisher(key_expr), encoding, congestion_control, @@ -256,6 +265,9 @@ impl Session { reliability, allowed_destination, ); + if let Some(instr) = timestamp_instrumentation { + builder = builder.timestamp_instrumentation(Some(instr.0)); + } wait(py, builder).map_into() } @@ -306,8 +318,17 @@ impl Drop for Session { } #[pyfunction] -pub(crate) fn open(py: Python, config: Config) -> PyResult { - wait(py, zenoh::open(config)).map(Session) +#[pyo3(signature = (config, *, timestamp_callback = None))] +pub(crate) fn open( + py: Python, + config: Config, + timestamp_callback: Option, +) -> PyResult { + let mut builder = zenoh::open(config); + if let Some(cb) = timestamp_callback { + builder = builder.with_timestamp_callback(py_to_session_ts_callback(cb)); + } + wait(py, builder).map(Session) } wrapper!(zenoh::session::SessionInfo); diff --git a/src/timestamp_stack.rs b/src/timestamp_stack.rs new file mode 100644 index 00000000..e0c13300 --- /dev/null +++ b/src/timestamp_stack.rs @@ -0,0 +1,182 @@ +// +// Copyright (c) 2026 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::sync::Arc; + +use pyo3::{prelude::*, types::PyBytes}; +use zenoh::timestamp_stack::{ + InterceptionPoint as RustInterceptionPoint, SessionTimestampCallback, + TimestampInstrumentation as RustTimestampInstrumentation, TsStackContext as RustTsStackContext, +}; + +use crate::{ + config::{WhatAmI, ZenohId}, + macros::wrapper, + time::Timestamp, + utils::IntoPyResult, +}; + +// InterceptionPoint is #[non_exhaustive] so we can't use enum_mapper! (it generates exhaustive +// From impls). Define it manually with a repr u8 for Python comparison, and a fallback variant. +#[pyo3::pyclass(eq)] +#[repr(u8)] +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum InterceptionPoint { + #[pyo3(name = "SEND")] + Send = 0, + #[pyo3(name = "ROUTE")] + Route = 1, + #[pyo3(name = "RECEIVE")] + Receive = 2, + /// Catch-all for future variants added by the Rust core. + #[pyo3(name = "UNKNOWN")] + Unknown = 255, +} + +impl From for InterceptionPoint { + fn from(v: RustInterceptionPoint) -> Self { + match v { + RustInterceptionPoint::Send => Self::Send, + RustInterceptionPoint::Route => Self::Route, + RustInterceptionPoint::Receive => Self::Receive, + _ => Self::Unknown, + } + } +} + +impl From for RustInterceptionPoint { + fn from(v: InterceptionPoint) -> Self { + match v { + InterceptionPoint::Send => RustInterceptionPoint::Send, + InterceptionPoint::Route => RustInterceptionPoint::Route, + InterceptionPoint::Receive | InterceptionPoint::Unknown => { + RustInterceptionPoint::Receive + } + } + } +} + +wrapper!(zenoh::timestamp_stack::TsStackContext: Clone); + +#[pymethods] +impl TsStackContext { + #[getter] + fn zid(&self) -> ZenohId { + self.0.zid.into() + } + + #[getter] + fn whatami(&self) -> WhatAmI { + self.0.whatami.into() + } + + #[getter] + fn interception_point(&self) -> InterceptionPoint { + self.0.interception_point.into() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampInstrumentation: Clone, Copy); + +#[pymethods] +impl TimestampInstrumentation { + #[new] + #[pyo3(signature = (*, send = false, route = false, receive = false))] + fn new(send: bool, route: bool, receive: bool) -> PyResult { + RustTimestampInstrumentation::new(send, route, receive) + .map(Self) + .into_pyres() + } + + fn is_instrumented(&self, point: InterceptionPoint) -> bool { + self.0.is_instrumented(point.into()) + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStackRecord: Clone); + +#[pymethods] +impl TimestampStackRecord { + #[getter] + fn point(&self) -> InterceptionPoint { + self.0.point().into() + } + + #[getter] + fn is_custom(&self) -> bool { + self.0.is_custom() + } + + fn timestamp<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> { + PyBytes::new(py, self.0.timestamp()) + } + + fn as_timestamp(&self) -> Option { + self.0.as_timestamp().map(Timestamp) + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +wrapper!(zenoh::timestamp_stack::TimestampStack: Clone); + +#[pymethods] +impl TimestampStack { + #[getter] + fn instrumentation(&self) -> TimestampInstrumentation { + TimestampInstrumentation(self.0.instrumentation()) + } + + #[getter] + fn records(&self) -> Vec { + self.0 + .records() + .iter() + .cloned() + .map(TimestampStackRecord) + .collect() + } + + fn __repr__(&self) -> String { + format!("{:?}", self.0) + } +} + +/// Build a `SessionTimestampCallback` Arc from a Python callable. +pub(crate) fn py_to_session_ts_callback(py_cb: PyObject) -> SessionTimestampCallback { + Arc::new(move |ctx: RustTsStackContext| { + Python::with_gil(|py| { + let py_ctx = match Py::new(py, TsStackContext(ctx)) { + Ok(obj) => obj, + Err(_) => return Vec::new(), + }; + match py_cb.call1(py, (py_ctx,)) { + Ok(result) => result.extract::>(py).unwrap_or_default(), + Err(e) => { + e.print(py); + Vec::new() + } + } + }) + }) +} diff --git a/tests/test_timestamp_stack.py b/tests/test_timestamp_stack.py new file mode 100644 index 00000000..d5814f72 --- /dev/null +++ b/tests/test_timestamp_stack.py @@ -0,0 +1,239 @@ +# +# Copyright (c) 2026 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +import time +from typing import List, Optional + +import pytest + +import zenoh +from zenoh import InterceptionPoint, TimestampInstrumentation, TimestampStack + +SLEEP = 0.2 + + +def peer_config() -> zenoh.Config: + cfg = zenoh.Config() + cfg.insert_json5("scouting/multicast/enabled", "false") + return cfg + + +# ── helpers ─────────────────────────────────────────────────────────────────── + + +def collect_one(key: str, action, timeout: float = SLEEP) -> Optional[zenoh.Sample]: + received: List[zenoh.Sample] = [] + with zenoh.open(peer_config()) as session: + with session.declare_subscriber(key, lambda s: received.append(s)): + time.sleep(0.05) + action(session) + time.sleep(timeout) + return received[0] if received else None + + +# ── test_no_instrumentation ─────────────────────────────────────────────────── + + +def test_no_instrumentation(): + """Without instrumentation the stack should be None.""" + + def put(session): + session.put("test/ts/none", b"hello") + + sample = collect_one("test/ts/none", put) + assert sample is not None + assert sample.timestamp_stack is None + + +# ── test_put_subscribe_send_receive ────────────────────────────────────────── + + +def test_put_subscribe_send_receive(): + """A put with send+receive instrumentation produces SEND and RECEIVE records.""" + instr = TimestampInstrumentation(send=True, receive=True) + + def put(session): + session.put("test/ts/put", b"hello", timestamp_instrumentation=instr) + + sample = collect_one("test/ts/put", put) + assert sample is not None + stack = sample.timestamp_stack + assert stack is not None + points = [r.point for r in stack.records] + assert InterceptionPoint.SEND in points + assert InterceptionPoint.RECEIVE in points + + +# ── test_send_only ──────────────────────────────────────────────────────────── + + +def test_send_only(): + """send=True, receive=False → only SEND record.""" + instr = TimestampInstrumentation(send=True, receive=False) + + def put(session): + session.put("test/ts/send_only", b"x", timestamp_instrumentation=instr) + + sample = collect_one("test/ts/send_only", put) + assert sample is not None + stack = sample.timestamp_stack + assert stack is not None + points = [r.point for r in stack.records] + assert InterceptionPoint.SEND in points + assert InterceptionPoint.RECEIVE not in points + + +# ── test_receive_only ───────────────────────────────────────────────────────── + + +def test_receive_only(): + """receive=True, send=False → only RECEIVE record.""" + instr = TimestampInstrumentation(send=False, receive=True) + + def put(session): + session.put("test/ts/recv_only", b"x", timestamp_instrumentation=instr) + + sample = collect_one("test/ts/recv_only", put) + assert sample is not None + stack = sample.timestamp_stack + assert stack is not None + points = [r.point for r in stack.records] + assert InterceptionPoint.RECEIVE in points + assert InterceptionPoint.SEND not in points + + +# ── test_publisher_default ──────────────────────────────────────────────────── + + +def test_publisher_default(): + """Publisher-level default instrumentation applies to all puts.""" + instr = TimestampInstrumentation(send=True, receive=True) + received: List[zenoh.Sample] = [] + + with zenoh.open(peer_config()) as session: + with session.declare_subscriber( + "test/ts/pub_default", lambda s: received.append(s) + ): + with session.declare_publisher( + "test/ts/pub_default", timestamp_instrumentation=instr + ) as pub: + time.sleep(0.05) + pub.put(b"data") + time.sleep(SLEEP) + + assert len(received) == 1 + stack = received[0].timestamp_stack + assert stack is not None + points = [r.point for r in stack.records] + assert InterceptionPoint.SEND in points + assert InterceptionPoint.RECEIVE in points + + +# ── test_publisher_per_put_override ────────────────────────────────────────── + + +def test_publisher_per_put_override(): + """Per-put override takes precedence over publisher default.""" + default_instr = TimestampInstrumentation(send=True, receive=True) + override_instr = TimestampInstrumentation(send=True, receive=False) + received: List[zenoh.Sample] = [] + + with zenoh.open(peer_config()) as session: + with session.declare_subscriber( + "test/ts/pub_override", lambda s: received.append(s) + ): + with session.declare_publisher( + "test/ts/pub_override", timestamp_instrumentation=default_instr + ) as pub: + time.sleep(0.05) + pub.put(b"data", timestamp_instrumentation=override_instr) + time.sleep(SLEEP) + + assert len(received) == 1 + points = [r.point for r in received[0].timestamp_stack.records] + assert InterceptionPoint.SEND in points + assert InterceptionPoint.RECEIVE not in points + + +# ── test_as_timestamp ───────────────────────────────────────────────────────── + + +def test_as_timestamp(): + """Standard HLC records decode via as_timestamp(); returns a Timestamp object.""" + instr = TimestampInstrumentation(send=True, receive=True) + + def put(session): + session.put("test/ts/as_ts", b"t", timestamp_instrumentation=instr) + + sample = collect_one("test/ts/as_ts", put) + assert sample is not None + for r in sample.timestamp_stack.records: + if not r.is_custom: + ts = r.as_timestamp() + assert ts is not None + + +# ── test_is_custom_false ────────────────────────────────────────────────────── + + +def test_is_custom_false(): + """Standard (non-callback) records have is_custom == False.""" + instr = TimestampInstrumentation(send=True, receive=True) + + def put(session): + session.put("test/ts/not_custom", b"x", timestamp_instrumentation=instr) + + sample = collect_one("test/ts/not_custom", put) + assert sample is not None + for r in sample.timestamp_stack.records: + assert not r.is_custom + + +# ── test_custom_callback ────────────────────────────────────────────────────── + + +def test_custom_callback(): + """A session-level timestamp callback produces custom records with the returned bytes.""" + MARKER = b"custom-ts-bytes" + + def my_callback(ctx): + return MARKER + + instr = TimestampInstrumentation(send=True, receive=True) + received: List[zenoh.Sample] = [] + + with zenoh.open(peer_config(), timestamp_callback=my_callback) as session: + with session.declare_subscriber( + "test/ts/custom_cb", lambda s: received.append(s) + ): + time.sleep(0.05) + session.put("test/ts/custom_cb", b"x", timestamp_instrumentation=instr) + time.sleep(SLEEP) + + assert len(received) == 1 + stack = received[0].timestamp_stack + assert stack is not None + custom_records = [r for r in stack.records if r.is_custom] + assert len(custom_records) > 0 + for r in custom_records: + assert r.timestamp() == MARKER + assert r.as_timestamp() is None # custom bytes don't decode as UHLC + + +# ── test_invalid_instrumentation ───────────────────────────────────────────── + + +def test_invalid_instrumentation(): + """All-false instrumentation should raise (at least one point required).""" + with pytest.raises(Exception): + TimestampInstrumentation(send=False, route=False, receive=False) diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index 5fe200e5..2f723603 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -742,6 +742,7 @@ class Publisher: attachment: _IntoZBytes | None = None, timestamp: Timestamp | None = None, source_info: SourceInfo | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Publish data to :class:`Subscriber` instances matching this publisher's key expression. @@ -1440,6 +1441,7 @@ class Session: timestamp: Timestamp | None = None, allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ): """Publish data directly from the session. @@ -1482,6 +1484,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Handler[Reply]: """Query data from the matching queryables in the system. @@ -1507,6 +1510,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> _H: """Query data from the matching queryables in the system. @@ -1532,6 +1536,7 @@ class Session: allowed_destination: Locality | None = None, source_info: SourceInfo | None = None, cancellation_token: CancellationToken | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> None: """Query data from the matching queryables in the system. @@ -1611,6 +1616,7 @@ class Session: express: bool | None = None, reliability: Reliability | None = None, allowed_destination: Locality | None = None, + timestamp_instrumentation: TimestampInstrumentation | None = None, ) -> Publisher: """Create a :class:`Publisher` for the given key expression.""" @@ -2260,7 +2266,9 @@ def init_log_from_env_or(level: str): For example, `RUST_LOG=debug` will set the log level to DEBUG. If `RUST_LOG` is not set, then logging is set to the provided level.""" -def open(config: Config) -> Session: +def open( + config: Config, *, timestamp_callback: SessionTimestampCallback | None = None +) -> Session: """Open a zenoh :class:`zenoh.Session`. For more information about sessions and configuration, see :ref:`session-and-config`.