From 6b4db5e41573196ccf632bf7c7f4ef603b3c4214 Mon Sep 17 00:00:00 2001 From: Amit Singh Date: Tue, 16 Jun 2026 17:47:51 +0530 Subject: [PATCH] feat(forge_tracker): add posthog ai observability trace ids --- Cargo.lock | 87 +--- Cargo.toml | 2 +- crates/forge_main/src/tracker.rs | 14 + crates/forge_main/src/ui.rs | 5 +- crates/forge_tracker/Cargo.toml | 6 +- crates/forge_tracker/src/collect/posthog.rs | 440 ++++++++++++++++---- crates/forge_tracker/src/dispatch.rs | 143 ++++++- crates/forge_tracker/src/error.rs | 14 +- crates/forge_tracker/src/event.rs | 23 + 9 files changed, 566 insertions(+), 168 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88e47dd3a6..ed9f55a540 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,15 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "addr2line" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" -dependencies = [ - "gimli", -] - [[package]] name = "adler2" version = "2.0.1" @@ -156,9 +147,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.41" +version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0f9ee0f6e02ffd7ad5816e9464499fba7b3effd01123b515c41d1697c43dad1" +checksum = "e79b3f8a79cccc2898f31920fc69f304859b3bd567490f75ebf51ae1c792a9ac" dependencies = [ "compression-codecs", "compression-core", @@ -731,21 +722,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "backtrace" -version = "0.3.76" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-link", -] - [[package]] name = "base64" version = "0.21.7" @@ -1076,7 +1052,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -1091,9 +1067,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.37" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7b51a7d9c967fc26773061ba86150f19c50c0d65c887cb1fbe295fd16619b7" +checksum = "ce2548391e9c1929c21bf6aa2680af86fe4c1b33e6cea9ac1cfeec0bd11218cf" dependencies = [ "compression-core", "flate2", @@ -1102,9 +1078,9 @@ dependencies = [ [[package]] name = "compression-core" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" [[package]] name = "config" @@ -1828,7 +1804,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2001,7 +1977,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2793,13 +2769,11 @@ dependencies = [ "derive_more", "dirs", "forge_domain", - "http 1.4.2", "lazy_static", "machineid-rs", "posthog-rs", "pretty_assertions", "regex", - "reqwest 0.12.28", "serde", "serde_json", "sysinfo 0.38.4", @@ -2807,7 +2781,6 @@ dependencies = [ "tracing", "tracing-appender", "tracing-subscriber", - "url", "uuid", "whoami 2.1.2", ] @@ -3057,12 +3030,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "gimli" -version = "0.32.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" - [[package]] name = "gix" version = "0.84.0" @@ -3472,7 +3439,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19753d40da53d0ec41604750eeb969097a90fb2d7f7992730d904541c04e2c19" dependencies = [ "bstr", - "hashbrown 0.16.1", + "hashbrown 0.17.0", ] [[package]] @@ -4907,7 +4874,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5634,7 +5601,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5941,15 +5908,6 @@ dependencies = [ "objc2-foundation", ] -[[package]] -name = "object" -version = "0.37.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.21.4" @@ -6329,11 +6287,10 @@ dependencies = [ [[package]] name = "posthog-rs" -version = "0.12.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e1beb349e47c45a4ffdee65e8c096ff0022c0a4d6c566cacdf50f553427fb3" +checksum = "6289682e81f10dfdcb4863c240ef313a28793258828d3b7f71b0cc51dc6f3e9a" dependencies = [ - "backtrace", "chrono", "derive_builder", "flate2", @@ -7169,12 +7126,6 @@ dependencies = [ "ordered-multimap", ] -[[package]] -name = "rustc-demangle" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" - [[package]] name = "rustc-hash" version = "2.1.2" @@ -7213,7 +7164,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7293,7 +7244,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -8231,7 +8182,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -8294,7 +8245,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -9396,7 +9347,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index db98f1bdb2..8ab4d3a038 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,8 +59,8 @@ machineid-rs = "1.2.4" mockito = "1.7.2" nom = "8.0.0" nu-ansi-term = "0.50.1" -posthog-rs = "0.12.0" pretty_assertions = "1.4.1" +posthog-rs = { version = "0.13.1", default-features = false, features = ["async-client"] } proc-macro2 = "1.0" quote = "1.0" rustyline = "18.0.0" diff --git a/crates/forge_main/src/tracker.rs b/crates/forge_main/src/tracker.rs index c033c4541d..f4be3db630 100644 --- a/crates/forge_main/src/tracker.rs +++ b/crates/forge_main/src/tracker.rs @@ -3,6 +3,20 @@ use forge_tracker::{EventKind, ToolCallPayload}; use crate::TRACKER; /// Helper functions to eliminate duplication of tokio::spawn + TRACKER patterns + +/// Begin a new trace. All subsequent [`dispatch`] calls within this +/// conversation turn will share the same `$trace_id` in PostHog, grouping +/// prompt, tool-call, and error events into a single AI observability trace. +pub async fn begin_trace(session_id: Option) { + TRACKER.begin_trace(session_id).await; +} + +/// End the current trace so that the next conversation turn starts a fresh +/// trace and session. +pub async fn end_trace() { + TRACKER.end_trace().await; +} + /// Generic dispatcher for any event fn dispatch(event: EventKind) { tokio::spawn(TRACKER.dispatch(event)); diff --git a/crates/forge_main/src/ui.rs b/crates/forge_main/src/ui.rs index 59a1057328..1485162638 100644 --- a/crates/forge_main/src/ui.rs +++ b/crates/forge_main/src/ui.rs @@ -3860,6 +3860,7 @@ impl A + Send + Sync> UI async fn on_message(&mut self, content: Option) -> Result<()> { let conversation_id = self.init_conversation().await?; + tracker::begin_trace(Some(conversation_id.into_string())).await; if self.config.auto_install_vscode_extension { self.install_vscode_extension(); @@ -3898,7 +3899,9 @@ impl A + Send + Sync> UI // Create the chat request with the event let chat = ChatRequest::new(event, conversation_id); - self.on_chat(chat).await + let result = self.on_chat(chat).await; + tracker::end_trace().await; + result } async fn on_chat(&mut self, chat: ChatRequest) -> Result<()> { diff --git a/crates/forge_tracker/Cargo.toml b/crates/forge_tracker/Cargo.toml index 9fa0d04045..6a2f69c734 100644 --- a/crates/forge_tracker/Cargo.toml +++ b/crates/forge_tracker/Cargo.toml @@ -5,20 +5,18 @@ edition.workspace = true rust-version.workspace = true [dependencies] -reqwest.workspace = true derive_more.workspace = true -url.workspace = true +posthog-rs.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing.workspace = true +uuid.workspace = true sysinfo.workspace = true -posthog-rs = "0.12.0" async-trait.workspace = true chrono.workspace = true whoami.workspace = true convert_case.workspace = true -http.workspace = true regex.workspace = true tracing-appender.workspace = true tracing-subscriber.workspace = true diff --git a/crates/forge_tracker/src/collect/posthog.rs b/crates/forge_tracker/src/collect/posthog.rs index d7c3213a29..5648380904 100644 --- a/crates/forge_tracker/src/collect/posthog.rs +++ b/crates/forge_tracker/src/collect/posthog.rs @@ -1,103 +1,397 @@ -use std::collections::HashMap; -use std::time::Duration; - -use chrono::NaiveDateTime; -use http::header::{HeaderName, HeaderValue}; -use reqwest::Client; -use serde::Serialize; -use serde_json::Value; +use tokio::sync::OnceCell; use super::super::Result; use super::Collect; use crate::Event; +/// PostHog event collector backed by the `posthog-rs` SDK. +/// +/// The underlying [`posthog_rs::Client`] is lazily initialized on the first +/// [`collect`](Collect::collect) call, so `Tracker::new` can be called outside +/// an async context. pub struct Tracker { - api_secret: &'static str, - client: Client, + api_key: String, + client: OnceCell, } -impl Tracker { - pub fn new(api_secret: &'static str) -> Self { - // Configure HTTP client with connection pooling similar to forge_provider - let client = Client::builder() - .connect_timeout(Duration::from_secs(10)) - .read_timeout(Duration::from_secs(30)) - .pool_idle_timeout(Duration::from_secs(90)) - .pool_max_idle_per_host(5) - .build() - .expect("Failed to build HTTP client for PostHog tracker"); - - Self { api_secret, client } +/// Maps our domain event names to PostHog AI Observability event types. +fn to_ai_event_name(name: &str) -> &str { + match name { + "prompt" => "$ai_generation", + "tool_call" => "$ai_span", + "ai_trace" => "$ai_trace", + _ => name, } } -#[derive(Debug, Serialize)] -struct Payload { - api_key: String, - event: String, - distinct_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - properties: Option>, - #[serde(rename = "$set", skip_serializing_if = "Option::is_none")] - set: Option, - timestamp: Option, -} +impl Tracker { + pub fn new(api_key: String) -> Self { + Self { api_key, client: OnceCell::new() } + } + + /// Returns a reference to the initialized SDK client, creating it on the + /// first call. + async fn client(&self) -> &posthog_rs::Client { + self.client + .get_or_init(|| async { posthog_rs::client(self.api_key.as_str()).await }) + .await + } + + /// Library metadata injected into every `$ai_*` event so PostHog can + /// identify the SDK / agent that produced the telemetry. + const AI_LIB: &str = "forge_tracker"; + const AI_FRAMEWORK: &str = "code-forge"; -impl Payload { - fn new(api_key: String, mut input: Event) -> Self { - let mut properties = HashMap::new(); - let distinct_id = input.client_id.to_string(); - let event = input.event_name.to_string(); - let mut set = None; - if let Some(identity) = input.identity.take() - && let Ok(value) = serde_json::to_value(identity) - { - set = Some(value); + /// Derives the project name from the current working directory (cached). + fn project_name() -> Option { + std::env::current_dir() + .ok() + .and_then(|p| p.file_name().map(|n| n.to_string_lossy().into_owned())) + } + + /// Converts our domain [`Event`] into a [`posthog_rs::Event`], injecting + /// PostHog AI Observability trace properties and user profile updates. + /// + /// Property naming follows the conventions used by the `@posthog/pi` + /// extension so generic "prompt" and "tool_call" events appear in the + /// PostHog LLM Analytics dashboard. + fn build_event(input: &Event) -> posthog_rs::Event { + let raw_name: String = input.event_name.clone().into(); + let event_name = to_ai_event_name(&raw_name).to_string(); + let distinct_id = input.client_id.clone(); + let mut event = posthog_rs::Event::new(event_name, distinct_id); + + // Required AI-observability metadata — always injected. + let _ = event.insert_prop("$ai_lib", Self::AI_LIB); + let _ = event.insert_prop("$ai_lib_version", crate::dispatch::version()); + let _ = event.insert_prop("$ai_framework", Self::AI_FRAMEWORK); + if let Some(proj) = Self::project_name() { + let _ = event.insert_prop("$ai_project_name", &proj); + let _ = event.insert_prop("$ai_agent_name", &proj); } - if let Ok(Value::Object(map)) = serde_json::to_value(input) { + // Serialize all domain fields and inject them as PostHog properties. + // Keys listed below are mapped to their `$ai_*` PostHog equivalents + // and excluded from the flat property passthrough. + if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(input) { for (key, value) in map { - properties.insert(key, value); + match key.as_str() { + // Used as the PostHog event name and distinct_id already. + "event_name" | "client_id" => {} + + // PostHog AI Observability trace properties. + // Mirrors the `@posthog/pi` convention of `$ai_*` prefixes. + "trace_id" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_trace_id", s); + } + } + "session_id" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_session_id", s); + } + } + "ai_span_id" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_span_id", s); + } + } + "ai_parent_id" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_parent_id", s); + } + } + + // LLM generation metadata. + "model" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_model", s); + } + } + "provider" => { + if let Some(s) = value.as_str() { + let _ = event.insert_prop("$ai_provider", s); + } + } + + // Token / latency fields (when present). + "ai_input_tokens" => { + if let Some(n) = value.as_u64() { + let _ = event.insert_prop("$ai_input_tokens", n); + } + } + "ai_output_tokens" => { + if let Some(n) = value.as_u64() { + let _ = event.insert_prop("$ai_output_tokens", n); + } + } + "ai_total_tokens" => { + if let Some(n) = value.as_u64() { + let _ = event.insert_prop("$ai_total_tokens", n); + } + } + "ai_latency" => { + if let Some(n) = value.as_f64() { + let _ = event.insert_prop("$ai_latency", n); + } + } + + // User identity: PostHog applies `$set` on the person + // profile linked to the distinct_id. + "identity" if !value.is_null() => { + let _ = event.insert_prop("$set", &value); + } + + _ => { + let _ = event.insert_prop(&key, &value); + } + } } } - Self { - api_key, - event, - distinct_id, - properties: Some(properties), - set, - timestamp: Some(chrono::Utc::now().naive_utc()), - } + event } } -impl Tracker { - fn create_request(&self, event: Event) -> Result { - let url = reqwest::Url::parse("https://us.i.posthog.com/capture/")?; - let mut request = reqwest::Request::new(reqwest::Method::POST, url); - request.headers_mut().insert( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/json"), +#[async_trait::async_trait] +impl Collect for Tracker { + async fn collect(&self, event: Event) -> Result<()> { + let sdk_event = Self::build_event(&event); + self.client().await.capture(sdk_event).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use chrono::Utc; + use pretty_assertions::assert_eq; + use serde_json::json; + + use super::*; + use crate::event::{Identity, Name}; + + /// Fixture: a fully populated [`Event`] with trace, session, model, and + /// identity fields set so we can verify every special-case mapping. + fn fixture() -> Event { + Event { + event_name: Name::from("test_event".to_string()), + event_value: "test_value".to_string(), + start_time: Utc::now(), + cores: 4, + client_id: "client-123".to_string(), + os_name: "macOS".to_string(), + up_time: 0, + path: None, + cwd: None, + user: "test_user".to_string(), + args: vec![], + version: "1.0.0".to_string(), + email: vec![], + model: Some("claude-sonnet".to_string()), + conversation: None, + identity: Some(Identity { login: "test@example.com".to_string() }), + trace_id: Some("trace-uuid-123".to_string()), + session_id: Some("session-uuid-456".to_string()), + ai_span_id: Some("span-uuid-789".to_string()), + ai_parent_id: Some("parent-uuid-012".to_string()), + provider: Some("anthropic".to_string()), + ai_input_tokens: Some(150), + ai_output_tokens: Some(80), + ai_total_tokens: Some(230), + ai_latency: Some(1.42), + } + } + + #[test] + fn build_event_maps_trace_id_to_dollar_ai_trace_id() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("trace-uuid-123")); + assert_eq!(actual.properties().get("$ai_trace_id"), expected); + } + + #[test] + fn build_event_maps_session_id_to_dollar_ai_session_id() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("session-uuid-456")); + assert_eq!(actual.properties().get("$ai_session_id"), expected); + } + + #[test] + fn build_event_maps_model_to_dollar_ai_model() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("claude-sonnet")); + assert_eq!(actual.properties().get("$ai_model"), expected); + } + + #[test] + fn build_event_maps_identity_to_dollar_set() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!({"login": "test@example.com"})); + assert_eq!(actual.properties().get("$set"), expected); + } + + #[test] + fn build_event_excludes_event_name_from_properties() { + let input = fixture(); + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("event_name"), None); + } + + #[test] + fn build_event_excludes_client_id_from_properties() { + let input = fixture(); + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("client_id"), None); + } + + #[test] + fn build_event_passes_through_regular_fields() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("test_value")); + assert_eq!(actual.properties().get("event_value"), expected); + let expected = Some(&json!("test_user")); + assert_eq!(actual.properties().get("user"), expected); + let expected = Some(&json!("macOS")); + assert_eq!(actual.properties().get("os_name"), expected); + } + + #[test] + fn build_event_omits_trace_id_when_none() { + let mut input = fixture(); + input.trace_id = None; + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("$ai_trace_id"), None); + } + + #[test] + fn build_event_omits_session_id_when_none() { + let mut input = fixture(); + input.session_id = None; + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("$ai_session_id"), None); + } + + #[test] + fn build_event_omits_ai_model_when_none() { + let mut input = fixture(); + input.model = None; + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("$ai_model"), None); + } + + #[test] + fn build_event_omits_dollar_set_when_identity_none() { + let mut input = fixture(); + input.identity = None; + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("$set"), None); + } + + #[test] + fn build_event_maps_prompt_to_ai_generation() { + let mut input = fixture(); + input.event_name = Name::from("prompt".to_string()); + let actual = Tracker::build_event(&input); + assert_eq!(actual.event_name(), "$ai_generation"); + } + + #[test] + fn build_event_maps_tool_call_to_ai_span() { + let mut input = fixture(); + input.event_name = Name::from("tool_call".to_string()); + let actual = Tracker::build_event(&input); + assert_eq!(actual.event_name(), "$ai_span"); + } + + #[test] + fn build_event_maps_ai_trace_to_ai_trace() { + let mut input = fixture(); + input.event_name = Name::from("ai_trace".to_string()); + let actual = Tracker::build_event(&input); + assert_eq!(actual.event_name(), "$ai_trace"); + } + + #[test] + fn build_event_passes_through_unknown_event_names() { + let mut input = fixture(); + input.event_name = Name::from("start".to_string()); + let actual = Tracker::build_event(&input); + assert_eq!(actual.event_name(), "start"); + } + + #[test] + fn build_event_stamps_ai_lib_metadata() { + let input = fixture(); + let actual = Tracker::build_event(&input); + assert_eq!( + actual.properties().get("$ai_lib"), + Some(&json!("forge_tracker")) ); + assert_eq!( + actual.properties().get("$ai_framework"), + Some(&json!("code-forge")) + ); + } - let payload = Payload::new(self.api_secret.to_string(), event); + #[test] + fn build_event_stamps_ai_lib_version() { + let input = fixture(); + let actual = Tracker::build_event(&input); + // Just verify the key exists — the version changes per build + assert!(actual.properties().contains_key("$ai_lib_version")); + } - let _ = request - .body_mut() - .insert(reqwest::Body::from(serde_json::to_string(&payload)?)); + #[test] + fn build_event_maps_ai_span_id_to_dollar_ai_span_id() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("span-uuid-789")); + assert_eq!(actual.properties().get("$ai_span_id"), expected); + } - Ok(request) + #[test] + fn build_event_maps_ai_parent_id_to_dollar_ai_parent_id() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("parent-uuid-012")); + assert_eq!(actual.properties().get("$ai_parent_id"), expected); } -} -#[async_trait::async_trait] -impl Collect for Tracker { - // TODO: move http request to a dispatch - async fn collect(&self, event: Event) -> Result<()> { - let request = self.create_request(event)?; - self.client.execute(request).await?; + #[test] + fn build_event_maps_provider_to_dollar_ai_provider() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!("anthropic")); + assert_eq!(actual.properties().get("$ai_provider"), expected); + } - Ok(()) + #[test] + fn build_event_maps_token_counts_to_ai_properties() { + let input = fixture(); + let actual = Tracker::build_event(&input); + assert_eq!(actual.properties().get("$ai_input_tokens"), Some(&json!(150))); + assert_eq!(actual.properties().get("$ai_output_tokens"), Some(&json!(80))); + assert_eq!(actual.properties().get("$ai_total_tokens"), Some(&json!(230))); + } + + #[test] + fn build_event_maps_latency_to_dollar_ai_latency() { + let input = fixture(); + let actual = Tracker::build_event(&input); + let expected = Some(&json!(1.42)); + assert_eq!(actual.properties().get("$ai_latency"), expected); + } + + #[test] + fn build_event_distinct_id_uses_client_id_field() { + let input = fixture(); + let actual = Tracker::build_event(&input); + assert_eq!(actual.distinct_id(), "client-123"); } } diff --git a/crates/forge_tracker/src/dispatch.rs b/crates/forge_tracker/src/dispatch.rs index bbec64e4f3..abc8ed58d7 100644 --- a/crates/forge_tracker/src/dispatch.rs +++ b/crates/forge_tracker/src/dispatch.rs @@ -9,6 +9,7 @@ use forge_domain::Conversation; use sysinfo::System; use tokio::process::Command; use tokio::sync::Mutex; +use uuid::Uuid; use super::Result; use crate::can_track::can_track; @@ -66,13 +67,26 @@ pub struct Tracker { email: Arc>>>, model: Arc>>, conversation: Arc>>, + /// Session ID for the current conversation turn. Maps to `$ai_session_id` + /// in the PostHog payload. Set via [`begin_trace`](Tracker::begin_trace) + /// and cleared via [`end_trace`](Tracker::end_trace). + session_id: Arc>>, + /// Trace ID shared across all events within a single conversation turn. + /// Set via [`begin_trace`](Tracker::begin_trace) and cleared via + /// [`end_trace`](Tracker::end_trace). When absent, each [`dispatch`] + /// call generates its own trace ID. + trace_id: Arc>>, + /// Span ID of the most recent AI generation, used as `$ai_parent_id` for + /// subsequent tool-call spans so they appear nested under the generation + /// in PostHog AI Observability. + generation_span_id: Arc>>, is_logged_in: Arc, rate_limiter: Arc>, } impl Default for Tracker { fn default() -> Self { - let posthog_tracker = Box::new(posthog::Tracker::new(POSTHOG_API_SECRET)); + let posthog_tracker = Box::new(posthog::Tracker::new(POSTHOG_API_SECRET.to_string())); let start_time = Utc::now(); let can_track = can_track(); Self { @@ -82,6 +96,9 @@ impl Default for Tracker { email: Arc::new(Mutex::new(None)), model: Arc::new(Mutex::new(None)), conversation: Arc::new(Mutex::new(None)), + session_id: Arc::new(Mutex::new(None)), + trace_id: Arc::new(Mutex::new(None)), + generation_span_id: Arc::new(Mutex::new(None)), is_logged_in: Arc::new(AtomicBool::new(false)), rate_limiter: Arc::new(Mutex::new(RateLimiter::new(MAX_EVENTS_PER_MINUTE))), } @@ -105,6 +122,84 @@ impl Tracker { self.dispatch(EventKind::Login(id)).await.ok(); } + /// Begin a new trace for the current conversation turn. + /// + /// All subsequent [`dispatch`] calls will share the same `$trace_id` in + /// PostHog, grouping prompt, tool-call, and error events into a single AI + /// observability trace. + /// + /// When `session_id` is provided it is stored as the `$ai_session_id` for + /// every event in this trace. + pub async fn begin_trace>(&self, session_id: Option) { + *self.trace_id.lock().await = Some(Uuid::new_v4().to_string()); + *self.session_id.lock().await = session_id.map(|s| s.into()); + *self.generation_span_id.lock().await = None; + } + + /// End the current trace, clear all shared IDs, and send a final + /// `$ai_trace` summary event to PostHog. + /// Subsequent [`dispatch`] calls will generate new trace IDs until + /// [`begin_trace`](Tracker::begin_trace) is called again. + pub async fn end_trace(&self) { + // Snapshot the trace_id before we clear it so the `$ai_trace` event + // carries the correct identifier. + let trace_id = { self.trace_id.lock().await.clone() }; + let session_id = { self.session_id.lock().await.clone() }; + + *self.session_id.lock().await = None; + *self.trace_id.lock().await = None; + *self.generation_span_id.lock().await = None; + + // Send the $ai_trace summary event. + if let Some(tid) = trace_id { + self.dispatch_trace_summary(tid, session_id).await.ok(); + } + } + + /// Dispatches a `$ai_trace` event that summarises the completed agent run. + async fn dispatch_trace_summary( + &self, + trace_id: String, + session_id: Option, + ) -> Result<()> { + if !self.can_track || !self.rate_limiter.lock().await.inc_and_check() { + return Ok(()); + } + + let event = Event { + event_name: "ai_trace".to_string().into(), + event_value: String::new(), + start_time: self.start_time, + cores: cores(), + client_id: client_id(), + os_name: os_name(), + up_time: up_time(self.start_time), + args: args(), + path: path(), + cwd: cwd(), + user: user(), + version: version(), + email: vec![], + model: None, + conversation: None, + identity: None, + session_id, + trace_id: Some(trace_id), + ai_span_id: None, + ai_parent_id: None, + provider: None, + ai_input_tokens: None, + ai_output_tokens: None, + ai_total_tokens: None, + ai_latency: None, + }; + + for collector in self.collectors.as_ref() { + collector.collect(event.clone()).await?; + } + Ok(()) + } + pub async fn dispatch(&self, event_kind: EventKind) -> Result<()> { if !self.can_track { return Ok(()); @@ -114,8 +209,27 @@ impl Tracker { return Ok(()); // Drop event if rate limit exceeded } + // Derive span parent-child wiring based on event kind. + let (ai_span_id, ai_parent_id) = match event_kind { + // A generation (prompt) gets a fresh span_id; it has no parent. + EventKind::Prompt(_) => (Some(Uuid::new_v4().to_string()), None), + // Tool calls are children of the most recent generation. + EventKind::ToolCall(_) => { + let span = Uuid::new_v4().to_string(); + let parent = self.generation_span_id.lock().await.clone(); + (Some(span), parent) + } + _ => (None, None), + }; + + // Persist the generation span for subsequent tool-call parenting. + if matches!(event_kind, EventKind::Prompt(_)) { + *self.generation_span_id.lock().await = ai_span_id.clone(); + } + // Create a new event let email = self.system_info().await; + let conversation = self.conversation.lock().await.clone(); let event = Event { event_name: event_kind.name(), event_value: event_kind.value(), @@ -131,11 +245,26 @@ impl Tracker { version: version(), email: email.clone(), model: self.model.lock().await.clone(), - conversation: self.conversation().await, + conversation, identity: match event_kind { EventKind::Login(id) => Some(id), _ => None, }, + session_id: self.session_id.lock().await.clone(), + trace_id: Some( + self.trace_id + .lock() + .await + .clone() + .unwrap_or_else(|| Uuid::new_v4().to_string()), + ), + ai_span_id, + ai_parent_id, + provider: None, + ai_input_tokens: None, + ai_output_tokens: None, + ai_total_tokens: None, + ai_latency: None, }; // Dispatch the event to all collectors @@ -153,12 +282,6 @@ impl Tracker { guard.clone().unwrap_or_default() } - async fn conversation(&self) -> Option { - let mut guard = self.conversation.lock().await; - let conversation = guard.clone(); - *guard = None; - conversation - } pub async fn set_conversation(&self, conversation: Conversation) { *self.conversation.lock().await = Some(conversation); } @@ -239,7 +362,9 @@ fn up_time(start_time: DateTime) -> i64 { current_time.signed_duration_since(start_time).num_minutes() } -fn version() -> String { +/// Exposed so the PostHog collector can stamp `$ai_lib_version` on every +/// event. +pub fn version() -> String { VERSION.to_string() } diff --git a/crates/forge_tracker/src/error.rs b/crates/forge_tracker/src/error.rs index 8736bc9b21..c183a0fe8f 100644 --- a/crates/forge_tracker/src/error.rs +++ b/crates/forge_tracker/src/error.rs @@ -1,23 +1,13 @@ use derive_more::{Debug, From}; -use reqwest::header::InvalidHeaderValue; #[derive(From, Debug)] pub enum Error { - #[debug("Reqwest Error: {}", _0)] - Reqwest(reqwest::Error), - - #[debug("Invalid Header Value: {}", _0)] - InvalidHeaderValue(InvalidHeaderValue), + #[debug("PostHog Error: {}", _0)] + PostHog(posthog_rs::Error), #[debug("Serde JSON Error: {}", _0)] SerdeJson(serde_json::Error), - #[debug("Url Parser Error: {}", _0)] - UrlParser(url::ParseError), - - #[debug("PostHog Error: {}", _0)] - PostHog(posthog_rs::Error), - #[debug("Tokio Join Error: {}", _0)] TokioJoin(tokio::task::JoinError), diff --git a/crates/forge_tracker/src/event.rs b/crates/forge_tracker/src/event.rs index ed09c919d4..0332e76bcf 100644 --- a/crates/forge_tracker/src/event.rs +++ b/crates/forge_tracker/src/event.rs @@ -24,6 +24,29 @@ pub struct Event { pub model: Option, pub conversation: Option, pub identity: Option, + /// PostHog AI Observability: unique ID linking all events in one trace (e.g., one LLM conversation turn). + /// Maps to the `$ai_trace_id` property in the PostHog payload. + pub trace_id: Option, + /// PostHog AI Observability: groups multiple traces into a session (e.g., an entire chat session). + /// Maps to the `$ai_session_id` property in the PostHog payload. + pub session_id: Option, + /// PostHog AI Observability: unique span ID for this event within a trace. + /// Maps to the `$ai_span_id` property in the PostHog payload. + /// Generated per generation, reused by child spans. + pub ai_span_id: Option, + /// PostHog AI Observability: span ID of the parent generation for tool calls. + /// Maps to the `$ai_parent_id` property in the PostHog payload. + pub ai_parent_id: Option, + /// LLM provider (e.g. "anthropic", "openai"). Maps to `$ai_provider`. + pub provider: Option, + /// LLM input token count. Maps to `$ai_input_tokens`. + pub ai_input_tokens: Option, + /// LLM output token count. Maps to `$ai_output_tokens`. + pub ai_output_tokens: Option, + /// Total token count. Maps to `$ai_total_tokens`. + pub ai_total_tokens: Option, + /// LLM response latency in seconds. Maps to `$ai_latency`. + pub ai_latency: Option, } #[derive(Clone, Debug, Serialize, Deserialize)]