diff --git a/.cursor/rules/overview_dev.mdc b/.cursor/rules/overview_dev.mdc index 17ce98f07be..b837be34add 100644 --- a/.cursor/rules/overview_dev.mdc +++ b/.cursor/rules/overview_dev.mdc @@ -66,6 +66,15 @@ Use the `fetch_rules` tool to include these rules when working on specific areas - `SentryMetricsEvent`, `SentryMetricsEvents` - `SentryOptions.getMetrics()`, `beforeSend` callback +- **`queues`**: Use when working with: + - Sentry Queues product data or messaging span conventions + - Queue tracing spans/transactions (`queue.publish`, `queue.process`) + - `enableQueueTracing` option and `sentry.enable-queue-tracing` + - Kafka instrumentation (`sentry-kafka`, `SentryKafkaProducer`, `SentryKafkaConsumerTracing`) + - Spring Kafka queue auto-instrumentation and `SentryKafkaRecordInterceptor` + - Messaging span data (`messaging.system`, `messaging.destination.name`, receive latency, retry count) + - `sentry-task-enqueued-time` header and distributed trace propagation through queues + - **`continuous_profiling_jvm`**: Use when working with: - JVM continuous profiling (`sentry-async-profiler` module) - `IContinuousProfiler`, `JavaContinuousProfiler` @@ -118,6 +127,7 @@ Use the `fetch_rules` tool to include these rules when working on specific areas - System test/e2e/sample → `e2e_tests` - Feature flag/addFeatureFlag/flag evaluation → `feature_flags` - Metrics/count/distribution/gauge → `metrics` + - Queues/queue tracing/Kafka/Spring Kafka/queue.publish/queue.process/enableQueueTracing/messaging spans → `queues` - PR/pull request/stacked PR/stack → `pr` - JVM continuous profiling/async-profiler/JFR/ProfileChunk → `continuous_profiling_jvm` - Android continuous profiling/AndroidProfiler/frame metrics/method tracing → no dedicated rule yet; inspect the code directly diff --git a/.cursor/rules/queues.mdc b/.cursor/rules/queues.mdc new file mode 100644 index 00000000000..fe082c3b854 --- /dev/null +++ b/.cursor/rules/queues.mdc @@ -0,0 +1,82 @@ +--- +alwaysApply: false +description: Sentry Queues module and Java SDK queue tracing +--- +# Sentry Queues and Java SDK Queue Tracing + +## Product model + +Sentry Queues is built from tracing data. SDKs mark queue work with queue-specific span operations and messaging span data so Sentry can identify producers, consumers, destinations, latency, and failures. + +The important concepts are: +- `queue.publish`: a span for enqueueing/publishing a message to a queue or topic. +- `queue.process`: a transaction for processing a dequeued message. +- Messaging span data, especially: + - `messaging.system` (for example `kafka`) + - `messaging.destination.name` (queue/topic name) + - `messaging.message.id` + - `messaging.message.retry.count` + - `messaging.message.body.size` + - `messaging.message.envelope.size` + - `messaging.message.receive.latency` +- Distributed tracing headers (`sentry-trace` and `baggage`) link producer-side work to consumer-side processing. +- Queue receive latency is the time a message spent waiting between publish/enqueue and processing. For Java Kafka, this comes from the `sentry-task-enqueued-time` header that the producer writes and the consumer reads. + +The Queues UI is not backed by a separate Java event type. The Java SDK contributes data through spans/transactions with the expected operations, trace context, statuses, and messaging attributes. + +## Java SDK implementation + +Queue tracing is opt-in. `SentryOptions.isEnableQueueTracing()` defaults to `false` and can be enabled with `setEnableQueueTracing(true)` or external config key `enable-queue-tracing` (`sentry.enable-queue-tracing` in Spring Boot). Captured queue spans/transactions still depend on tracing being enabled and sampled. + +Kafka support lives in `sentry-kafka`: +- `SentryKafkaProducer.wrap(Producer)` wraps Kafka `Producer.send(...)` calls. + - Creates a `queue.publish` child span when there is an active span. + - Sets `messaging.system=kafka` and `messaging.destination.name=`. + - Injects `sentry-trace`, `baggage`, and `sentry-task-enqueued-time` headers. + - Still injects tracing/enqueued-time headers when queue tracing is enabled but there is no active span, so background producers can link to consumers. + - Finishes the span from the Kafka callback with `OK` or `INTERNAL_ERROR`. +- `SentryKafkaConsumerTracing.withTracing(record, callback)` is the manual raw-Kafka consumer helper. + - Forks root scopes for the processing lifecycle and makes them current. + - Continues the trace from Kafka headers. + - Starts a `queue.process` transaction bound to scope when tracing is enabled. + - Sets Kafka messaging data, body size, retry count, and receive latency when available. + - Finishes with `OK` or `INTERNAL_ERROR` and never lets instrumentation failures break customer processing. + +Spring Kafka support lives in `sentry-spring`, `sentry-spring-jakarta`, and `sentry-spring-7`: +- `SentryKafkaProducerBeanPostProcessor` installs a producer post-processor on `DefaultKafkaProducerFactory` and wraps created producers with `SentryKafkaProducer.wrap(...)`. +- `SentryKafkaConsumerBeanPostProcessor` installs `SentryKafkaRecordInterceptor` on listener container factories. +- `SentryKafkaRecordInterceptor` starts/finishes `queue.process` transactions around listener processing, continues traces from headers, forks scopes for the record lifecycle, and preserves any existing delegate interceptor. +- Spring Boot auto-configuration registers both post-processors only when Spring Kafka and `sentry-kafka` are present and `sentry.enable-queue-tracing=true`. +- Spring Boot queue auto-configuration is disabled when Sentry OpenTelemetry integration classes are present to avoid duplicate Kafka instrumentation. + +## Trace origins and suppression + +Queue instrumentation sets span origins so it can be identified and suppressed with `ignoredSpanOrigins`: +- Raw Kafka producer: `auto.queue.kafka.producer` +- Raw Kafka consumer helper: `manual.queue.kafka.consumer` +- Spring Kafka producer: `auto.queue.spring.kafka.producer`, `auto.queue.spring_jakarta.kafka.producer`, `auto.queue.spring7.kafka.producer` +- Spring Kafka consumer: `auto.queue.spring.kafka.consumer`, `auto.queue.spring_jakarta.kafka.consumer`, `auto.queue.spring7.kafka.consumer` + +## Files to inspect when changing queue tracing + +- Core option and conventions: + - `sentry/src/main/java/io/sentry/SentryOptions.java` + - `sentry/src/main/java/io/sentry/ExternalOptions.java` + - `sentry/src/main/java/io/sentry/SpanDataConvention.java` +- Raw Kafka: + - `sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java` + - `sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java` + - `sentry-kafka/src/test/kotlin/io/sentry/kafka/*Test.kt` +- Spring Kafka: + - `sentry-spring*/src/main/java/io/sentry/**/kafka/*` + - `sentry-spring*/src/test/kotlin/io/sentry/**/kafka/*Test.kt` + - `sentry-spring-boot*/src/main/java/io/sentry/**/SentryAutoConfiguration.java` + - `sentry-spring-boot*/src/test/kotlin/io/sentry/**/SentryKafkaAutoConfigurationTest.kt` + +## Related rules + +Also fetch: +- `options` when changing `enableQueueTracing` or configuration surfaces. +- `scopes` when changing consumer scope forking/lifecycle. +- `opentelemetry` when changing coexistence with OTel auto-instrumentation. +- `api` when changing public Kafka APIs or option methods. diff --git a/CHANGELOG.md b/CHANGELOG.md index dace9390975..87bfdc370a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add Kafka queue tracing for Spring Boot 2 ([#5352](https://github.com/getsentry/sentry-java/pull/5352)) - Add Kafka queue tracing for Spring Boot 4 ([#5348](https://github.com/getsentry/sentry-java/pull/5348)) - Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288)) - Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256)) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bdbd5a0c9f0..6c04e2ae2e6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -183,6 +183,7 @@ springboot3-starter-security = { module = "org.springframework.boot:spring-boot- springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-starter-jdbc", version.ref = "springboot3" } springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" } springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" } +spring-kafka2 = { module = "org.springframework.kafka:spring-kafka", version = "2.8.11" } spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" } spring-kafka4 = { module = "org.springframework.kafka:spring-kafka" } kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" } diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/build.gradle.kts index 07e61c75af8..57742935d5b 100644 --- a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/build.gradle.kts @@ -55,6 +55,10 @@ dependencies { implementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentlessSpring) implementation(projects.sentryAsyncProfiler) + // kafka + implementation(libs.spring.kafka2) + implementation(projects.sentryKafka) + // database query tracing implementation(projects.sentryJdbc) runtimeOnly(libs.hsqldb) diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..013b3590a71 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..779171942d5 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..e0abadf5f9c --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/main/resources/application-kafka.properties @@ -0,0 +1,12 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt new file mode 100644 index 00000000000..0f85e81a0a6 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -0,0 +1,41 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +class KafkaOtelCoexistenceSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `Sentry Kafka integration is suppressed when OTel is active`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("otel-coexistence-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "GET /kafka/produce" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true && + transaction.spans.any { span -> + span.op == "queue.publish" && + span.origin == "auto.opentelemetry" && + span.data?.get("messaging.system") == "kafka" + } + } + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.contexts.trace?.operation == "queue.process" && + transaction.contexts.trace?.origin == "auto.opentelemetry" && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true + } + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-opentelemetry/build.gradle.kts index 21a3cf3f7d5..08fe1b86e50 100644 --- a/sentry-samples/sentry-samples-spring-boot-opentelemetry/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry/build.gradle.kts @@ -53,6 +53,10 @@ dependencies { implementation(projects.sentryAsyncProfiler) implementation(libs.otel) + // kafka + implementation(libs.spring.kafka2) + implementation(projects.sentryKafka) + // database query tracing implementation(projects.sentryJdbc) runtimeOnly(libs.hsqldb) diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..013b3590a71 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..779171942d5 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..e0abadf5f9c --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/main/resources/application-kafka.properties @@ -0,0 +1,12 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt new file mode 100644 index 00000000000..0f85e81a0a6 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -0,0 +1,41 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +class KafkaOtelCoexistenceSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `Sentry Kafka integration is suppressed when OTel is active`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("otel-coexistence-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "GET /kafka/produce" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true && + transaction.spans.any { span -> + span.op == "queue.publish" && + span.origin == "auto.opentelemetry" && + span.data?.get("messaging.system") == "kafka" + } + } + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.contexts.trace?.operation == "queue.process" && + transaction.contexts.trace?.origin == "auto.opentelemetry" && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true + } + } +} diff --git a/sentry-samples/sentry-samples-spring-boot/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot/build.gradle.kts index b6fcd675cf3..4550d8981a5 100644 --- a/sentry-samples/sentry-samples-spring-boot/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot/build.gradle.kts @@ -43,6 +43,10 @@ dependencies { implementation(libs.springboot.starter.cache) implementation(libs.springboot.starter.websocket) implementation(libs.caffeine) + + // kafka + implementation(libs.spring.kafka2) + implementation(projects.sentryKafka) implementation(Config.Libs.aspectj) implementation(Config.Libs.kotlinReflect) implementation(kotlin(Config.kotlinStdLib, KotlinCompilerVersion.VERSION)) diff --git a/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..013b3590a71 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..779171942d5 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot/src/main/java/io/sentry/samples/spring/boot/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..eaaa62af13b --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot/src/main/resources/application-kafka.properties @@ -0,0 +1,10 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/sentry-samples/sentry-samples-spring-boot/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt b/sentry-samples/sentry-samples-spring-boot/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt new file mode 100644 index 00000000000..43781cf2c56 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt @@ -0,0 +1,117 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +/** + * System tests for Kafka queue instrumentation. + * + * Requires: + * - The sample app running with `--spring.profiles.active=kafka` + * - A Kafka broker at localhost:9092 + * - The mock Sentry server at localhost:8000 + */ +class KafkaQueueSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `producer endpoint creates queue publish span`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + } + + @Test + fun `consumer creates queue process transaction`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-consumer-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // The consumer runs asynchronously, so wait for the queue.process transaction + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.process") + } + } + + @Test + fun `producer and consumer share same trace`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("trace-test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // Capture the trace ID from the producer transaction (has queue.publish span) + var producerTraceId: String? = null + testHelper.ensureTransactionReceived { transaction, _ -> + if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) { + producerTraceId = transaction.contexts.trace?.traceId?.toString() + true + } else { + false + } + } + + // Verify the consumer transaction has the same trace ID + // Use retryCount=3 since the consumer may take a moment to process + testHelper.ensureEnvelopeReceived(retryCount = 3) { envelopeString -> + val envelope = + testHelper.jsonSerializer.deserializeEnvelope(envelopeString.byteInputStream()) + ?: return@ensureEnvelopeReceived false + val txItem = + envelope.items.firstOrNull { it.header.type == io.sentry.SentryItemType.Transaction } + ?: return@ensureEnvelopeReceived false + val tx = + txItem.getTransaction(testHelper.jsonSerializer) ?: return@ensureEnvelopeReceived false + + tx.contexts.trace?.operation == "queue.process" && + tx.contexts.trace?.traceId?.toString() == producerTraceId + } + } + + @Test + fun `queue publish span has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + val span = transaction.spans.firstOrNull { it.op == "queue.publish" } + if (span == null) return@ensureTransactionReceived false + + val data = span.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } + + @Test + fun `queue process transaction has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("process-attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) { + return@ensureTransactionReceived false + } + + val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } +} diff --git a/sentry-spring-boot/build.gradle.kts b/sentry-spring-boot/build.gradle.kts index a81613e5e1e..f04e469e34b 100644 --- a/sentry-spring-boot/build.gradle.kts +++ b/sentry-spring-boot/build.gradle.kts @@ -40,11 +40,13 @@ dependencies { compileOnly(libs.springboot.starter.graphql) compileOnly(libs.springboot.starter.quartz) compileOnly(libs.springboot.starter.security) + compileOnly(libs.spring.kafka2) compileOnly(platform(SpringBootPlugin.BOM_COORDINATES)) compileOnly(Config.Libs.springWeb) compileOnly(Config.Libs.springWebflux) compileOnly(projects.sentryOpentelemetry.sentryOpentelemetryCore) compileOnly(projects.sentryGraphql) + compileOnly(projects.sentryKafka) compileOnly(projects.sentryQuartz) annotationProcessor(platform(SpringBootPlugin.BOM_COORDINATES)) @@ -59,6 +61,7 @@ dependencies { testImplementation(projects.sentryLogback) testImplementation(projects.sentryQuartz) testImplementation(projects.sentryApacheHttpClient5) + testImplementation(projects.sentryKafka) testImplementation(projects.sentryTestSupport) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(libs.kotlin.test.junit) @@ -71,6 +74,7 @@ dependencies { testImplementation(libs.springboot.starter.aop) testImplementation(libs.springboot.starter.quartz) testImplementation(libs.springboot.starter.security) + testImplementation(libs.spring.kafka2) testImplementation(libs.springboot.starter.test) testImplementation(libs.springboot.starter.web) testImplementation(libs.springboot.starter.webflux) diff --git a/sentry-spring-boot/src/main/java/io/sentry/spring/boot/SentryAutoConfiguration.java b/sentry-spring-boot/src/main/java/io/sentry/spring/boot/SentryAutoConfiguration.java index 99fd602f74b..c7d5a892e9f 100644 --- a/sentry-spring-boot/src/main/java/io/sentry/spring/boot/SentryAutoConfiguration.java +++ b/sentry-spring-boot/src/main/java/io/sentry/spring/boot/SentryAutoConfiguration.java @@ -31,6 +31,8 @@ import io.sentry.spring.checkin.SentryQuartzConfiguration; import io.sentry.spring.exception.SentryCaptureExceptionParameterPointcutConfiguration; import io.sentry.spring.exception.SentryExceptionParameterAdviceConfiguration; +import io.sentry.spring.kafka.SentryKafkaConsumerBeanPostProcessor; +import io.sentry.spring.kafka.SentryKafkaProducerBeanPostProcessor; import io.sentry.spring.opentelemetry.SentryOpenTelemetryAgentWithoutAutoInitConfiguration; import io.sentry.spring.opentelemetry.SentryOpenTelemetryNoAgentConfiguration; import io.sentry.spring.tracing.CombinedTransactionNameProvider; @@ -231,6 +233,34 @@ static class SentryCacheConfiguration { } } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass( + name = { + "org.springframework.kafka.core.KafkaTemplate", + "io.sentry.kafka.SentryKafkaProducer" + }) + @ConditionalOnProperty(name = "sentry.enable-queue-tracing", havingValue = "true") + @ConditionalOnMissingClass({ + "io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider", + "io.sentry.opentelemetry.agent.AgentMarker" + }) + @Open + static class SentryKafkaQueueConfiguration { + + @Bean + public static @NotNull SentryKafkaProducerBeanPostProcessor + sentryKafkaProducerBeanPostProcessor() { + SentryIntegrationPackageStorage.getInstance().addIntegration("SpringKafka"); + return new SentryKafkaProducerBeanPostProcessor(); + } + + @Bean + public static @NotNull SentryKafkaConsumerBeanPostProcessor + sentryKafkaConsumerBeanPostProcessor() { + return new SentryKafkaConsumerBeanPostProcessor(); + } + } + @Configuration(proxyBeanMethods = false) @ConditionalOnClass(ProceedingJoinPoint.class) @ConditionalOnProperty( diff --git a/sentry-spring-boot/src/test/kotlin/io/sentry/spring/boot/SentryKafkaAutoConfigurationTest.kt b/sentry-spring-boot/src/test/kotlin/io/sentry/spring/boot/SentryKafkaAutoConfigurationTest.kt new file mode 100644 index 00000000000..fdf12bacf00 --- /dev/null +++ b/sentry-spring-boot/src/test/kotlin/io/sentry/spring/boot/SentryKafkaAutoConfigurationTest.kt @@ -0,0 +1,125 @@ +package io.sentry.spring.boot + +import io.sentry.kafka.SentryKafkaProducer +import io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider +import io.sentry.opentelemetry.agent.AgentMarker +import io.sentry.spring.kafka.SentryKafkaConsumerBeanPostProcessor +import io.sentry.spring.kafka.SentryKafkaProducerBeanPostProcessor +import kotlin.test.Test +import org.assertj.core.api.Assertions.assertThat +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.FilteredClassLoader +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.kafka.core.KafkaTemplate + +class SentryKafkaAutoConfigurationTest { + + private val contextRunner = + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(SentryAutoConfiguration::class.java)) + .withPropertyValues( + "sentry.dsn=http://key@localhost/proj", + "sentry.traces-sample-rate=1.0", + "sentry.shutdownTimeoutMillis=0", + "sentry.sessionFlushTimeoutMillis=0", + "sentry.flushTimeoutMillis=0", + "sentry.readTimeoutMillis=50", + "sentry.connectionTimeoutMillis=50", + "sentry.send-modules=false", + "sentry.debug=false", + ) + + private val noOtelClassLoader = + FilteredClassLoader( + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + private val noOtelCustomizerClassLoader = + FilteredClassLoader(SentryAutoConfigurationCustomizerProvider::class.java) + + private val noSentryKafkaClassLoader = + FilteredClassLoader( + SentryKafkaProducer::class.java, + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + private val noSpringKafkaClassLoader = + FilteredClassLoader( + KafkaTemplate::class.java, + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + @Test + fun `registers Kafka BPPs when queue tracing is enabled`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).hasSingleBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).hasSingleBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is disabled`() { + contextRunner.withClassLoader(noOtelClassLoader).run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when sentry-kafka is not present`() { + contextRunner + .withClassLoader(noSentryKafkaClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when spring-kafka is not present`() { + contextRunner + .withClassLoader(noSpringKafkaClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is explicitly false`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=false") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when OpenTelemetry agent is present`() { + contextRunner + .withClassLoader(noOtelCustomizerClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when OpenTelemetry integration is present`() { + contextRunner.withPropertyValues("sentry.enable-queue-tracing=true").run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } +} diff --git a/sentry-spring/api/sentry-spring.api b/sentry-spring/api/sentry-spring.api index 7148277e2ef..4e1bea84288 100644 --- a/sentry-spring/api/sentry-spring.api +++ b/sentry-spring/api/sentry-spring.api @@ -234,6 +234,30 @@ public final class io/sentry/spring/graphql/SentrySpringSubscriptionHandler : io public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V + public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V +} + public class io/sentry/spring/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring/build.gradle.kts b/sentry-spring/build.gradle.kts index 57c0b9d9f31..884a4a4b802 100644 --- a/sentry-spring/build.gradle.kts +++ b/sentry-spring/build.gradle.kts @@ -29,6 +29,7 @@ dependencies { compileOnly(Config.Libs.aspectj) compileOnly(Config.Libs.springWebflux) compileOnly(projects.sentryGraphql) + compileOnly(projects.sentryKafka) compileOnly(projects.sentryQuartz) compileOnly(libs.jetbrains.annotations) compileOnly(libs.nopen.annotations) @@ -37,6 +38,7 @@ dependencies { compileOnly(libs.slf4j.api) compileOnly(libs.springboot.starter.graphql) compileOnly(libs.springboot.starter.quartz) + compileOnly(libs.spring.kafka2) compileOnly(projects.sentryOpentelemetry.sentryOpentelemetryAgentcustomization) compileOnly(projects.sentryOpentelemetry.sentryOpentelemetryBootstrap) @@ -47,6 +49,7 @@ dependencies { // tests testImplementation(projects.sentryTestSupport) testImplementation(projects.sentryGraphql) + testImplementation(projects.sentryKafka) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(libs.awaitility.kotlin) testImplementation(libs.graphql.java17) @@ -56,6 +59,7 @@ dependencies { testImplementation(libs.springboot.starter.aop) testImplementation(libs.springboot.starter.graphql) testImplementation(libs.springboot.starter.security) + testImplementation(libs.spring.kafka2) testImplementation(libs.springboot.starter.test) testImplementation(libs.springboot.starter.web) testImplementation(libs.springboot.starter.webflux) diff --git a/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessor.java new file mode 100644 index 00000000000..7a3ba1caa27 --- /dev/null +++ b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -0,0 +1,98 @@ +package io.sentry.spring.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import java.lang.reflect.Field; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} + * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. + */ +@ApiStatus.Internal +public final class SentryKafkaConsumerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + private static final @NotNull String RECORD_INTERCEPTOR_FIELD_NAME = "recordInterceptor"; + + private final @NotNull String recordInterceptorFieldName; + + public SentryKafkaConsumerBeanPostProcessor() { + this(RECORD_INTERCEPTOR_FIELD_NAME); + } + + SentryKafkaConsumerBeanPostProcessor(final @NotNull String recordInterceptorFieldName) { + this.recordInterceptorFieldName = recordInterceptorFieldName; + } + + private static final class InterceptorReadFailedException extends Exception { + private static final long serialVersionUID = 1L; + + InterceptorReadFailedException(final @NotNull Throwable cause) { + super(cause); + } + } + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof AbstractKafkaListenerContainerFactory) { + final @NotNull AbstractKafkaListenerContainerFactory factory = + (AbstractKafkaListenerContainerFactory) bean; + + final @Nullable RecordInterceptor existing; + try { + existing = getExistingInterceptor(factory); + } catch (InterceptorReadFailedException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.ERROR, + e, + "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + + "existing recordInterceptor via reflection. Refusing to install Sentry's " + + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", + beanName); + return bean; + } + + if (existing instanceof SentryKafkaRecordInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final RecordInterceptor sentryInterceptor = + new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); + factory.setRecordInterceptor(sentryInterceptor); + } + return bean; + } + + private @Nullable RecordInterceptor getExistingInterceptor( + final @NotNull AbstractKafkaListenerContainerFactory factory) + throws InterceptorReadFailedException { + try { + final @NotNull Field field = + AbstractKafkaListenerContainerFactory.class.getDeclaredField(recordInterceptorFieldName); + field.setAccessible(true); + return (RecordInterceptor) field.get(factory); + } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { + throw new InterceptorReadFailedException(e); + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessor.java new file mode 100644 index 00000000000..7b3266a3510 --- /dev/null +++ b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -0,0 +1,76 @@ +package io.sentry.spring.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import io.sentry.kafka.SentryKafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerPostProcessor; + +/** + * Installs a {@link ProducerPostProcessor} on every {@link ProducerFactory} bean so that each + * {@link Producer} created by Spring Kafka is wrapped via {@link SentryKafkaProducer#wrap + * SentryKafkaProducer.wrap(Producer)}. + * + *

The wrapper records a {@code queue.publish} span around each {@code send(...)} that finishes + * when the broker ack callback fires, giving a real producer-send lifecycle span. {@code + * KafkaTemplate} beans are left untouched, so all customer-configured listeners, interceptors and + * observation settings are preserved. + * + *

Note: {@link ProducerFactory#addPostProcessor(ProducerPostProcessor)} is a default method on + * the interface that is a no-op unless overridden. Custom factories that do not extend {@code + * DefaultKafkaProducerFactory} will not receive Sentry producer instrumentation; a warning is + * logged at startup in that case. + */ +@ApiStatus.Internal +public final class SentryKafkaProducerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof ProducerFactory) { + final @NotNull ProducerFactory factory = (ProducerFactory) bean; + final @NotNull SentryProducerPostProcessor pp = new SentryProducerPostProcessor<>(); + factory.addPostProcessor(pp); + if (!factory.getPostProcessors().contains(pp)) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Sentry Kafka producer tracing not active for ProducerFactory '%s' (%s). " + + "addPostProcessor() was not honored — the factory may not extend " + + "DefaultKafkaProducerFactory. Wrap producers manually with " + + "SentryKafkaProducer.wrap(producer).", + beanName, + factory.getClass().getName()); + } + } + return bean; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } + + /** + * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} via + * {@link SentryKafkaProducer#wrap}. + */ + static final class SentryProducerPostProcessor implements ProducerPostProcessor { + @Override + public @NotNull Producer apply(final @NotNull Producer producer) { + return SentryKafkaProducer.wrap( + producer, ScopesAdapter.getInstance(), "auto.queue.spring.kafka.producer"); + } + } +} diff --git a/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java new file mode 100644 index 00000000000..8d848e40734 --- /dev/null +++ b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java @@ -0,0 +1,298 @@ +package io.sentry.spring.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.SentryLevel; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import io.sentry.kafka.SentryKafkaProducer; +import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.KafkaHeaders; + +/** + * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka + * records with distributed tracing support. + */ +@ApiStatus.Internal +@SuppressWarnings("deprecation") +public final class SentryKafkaRecordInterceptor implements RecordInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring.kafka.consumer"; + + private final @NotNull IScopes scopes; + private final @Nullable RecordInterceptor delegate; + + private static final @NotNull ThreadLocal currentContext = + new ThreadLocal<>(); + + public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { + this(scopes, null); + } + + public SentryKafkaRecordInterceptor( + final @NotNull IScopes scopes, final @Nullable RecordInterceptor delegate) { + this.scopes = scopes; + this.delegate = delegate; + } + + @Override + public @Nullable ConsumerRecord intercept(final @NotNull ConsumerRecord record) { + return intercept(record, null); + } + + @Override + public @Nullable ConsumerRecord intercept( + final @NotNull ConsumerRecord record, final @Nullable Consumer consumer) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegateIntercept(record, consumer); + } + + try { + finishStaleContext(); + + final @NotNull IScopes forkedScopes = scopes.forkedRootScopes("SentryKafkaRecordInterceptor"); + final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); + currentContext.set(new SentryRecordContext(lifecycleToken, null)); + + final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); + + final @Nullable ITransaction transaction = + startTransaction(forkedScopes, record, transactionContext); + currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); + } catch (Throwable t) { + scopes.getOptions().getLogger().log(SentryLevel.ERROR, "Unable to wrap Kafka consumer.", t); + } + return delegateIntercept(record, consumer); + } + + @Override + public void success( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.success(record, consumer); + } + } finally { + finishSpan(SpanStatus.OK, null); + } + } + + @Override + public void failure( + final @NotNull ConsumerRecord record, + final @NotNull Exception exception, + final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.failure(record, exception, consumer); + } + } finally { + finishSpan(SpanStatus.INTERNAL_ERROR, exception); + } + } + + @Override + public void afterRecord( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.afterRecord(record, consumer); + } + } + + @Override + public void setupThreadState(final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.setupThreadState(consumer); + } + } + + @Override + public void clearThreadState(final @NotNull Consumer consumer) { + try { + finishStaleContext(); + } finally { + if (delegate != null) { + delegate.clearThreadState(consumer); + } + } + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + + private @Nullable ConsumerRecord delegateIntercept( + final @NotNull ConsumerRecord record, final @Nullable Consumer consumer) { + if (delegate != null) { + return consumer != null ? delegate.intercept(record, consumer) : delegate.intercept(record); + } + return record; + } + + private @Nullable TransactionContext continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable List baggageHeaders = + headerValues(record, BaggageHeader.BAGGAGE_HEADER); + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, + final @NotNull ConsumerRecord record, + final @Nullable TransactionContext transactionContext) { + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionContext txContext = + transactionContext != null + ? transactionContext + : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); + + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, "messaging.message.id"); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr != null) { + try { + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); + if (latencyMs >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); + } + } catch (NumberFormatException ignored) { + // ignore malformed header + } + } + + return transaction; + } + + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + + private void finishStaleContext() { + if (currentContext.get() != null) { + finishSpan(SpanStatus.UNKNOWN, null); + } + } + + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { + final @Nullable SentryRecordContext ctx = currentContext.get(); + if (ctx == null) { + return; + } + currentContext.remove(); + + try { + final @Nullable ITransaction transaction = ctx.transaction; + if (transaction != null) { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } + } finally { + ctx.lifecycleToken.close(); + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + + private @Nullable List headerValues( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + @Nullable List values = null; + for (final @NotNull Header header : record.headers().headers(headerName)) { + if (header.value() != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(header.value(), StandardCharsets.UTF_8)); + } + } + return values; + } + + private static final class SentryRecordContext { + final @NotNull ISentryLifecycleToken lifecycleToken; + final @Nullable ITransaction transaction; + + SentryRecordContext( + final @NotNull ISentryLifecycleToken lifecycleToken, + final @Nullable ITransaction transaction) { + this.lifecycleToken = lifecycleToken; + this.transaction = transaction; + } + } +} diff --git a/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt new file mode 100644 index 00000000000..8ff2bf0b935 --- /dev/null +++ b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -0,0 +1,121 @@ +package io.sentry.spring.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.mockito.kotlin.mock +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.listener.RecordInterceptor + +class SentryKafkaConsumerBeanPostProcessorTest { + + @Test + fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + // Verify via reflection that the interceptor was set + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val interceptor = field.get(factory) + assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + // First wrap + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val firstInterceptor = field.get(factory) + + // Second wrap — should be idempotent + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + val secondInterceptor = field.get(factory) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not wrap non-factory beans`() { + val someBean = "not a factory" + val processor = SentryKafkaConsumerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `chains existing customer RecordInterceptor as delegate`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val installed = field.get(factory) + assertTrue( + installed is SentryKafkaRecordInterceptor<*, *>, + "expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}", + ) + + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + assertSame( + customerInterceptor, + delegateField.get(installed), + "customer interceptor must be preserved as delegate", + ) + } + + @Test + fun `skips installation when reflection fails and preserves customer interceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + assertSame(customerInterceptor, field.get(factory)) + + val processor = SentryKafkaConsumerBeanPostProcessor("missingRecordInterceptor") + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + assertSame( + customerInterceptor, + field.get(factory), + "customer interceptor must remain installed when Sentry cannot read it", + ) + } +} diff --git a/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessorTest.kt new file mode 100644 index 00000000000..11a943307c8 --- /dev/null +++ b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -0,0 +1,95 @@ +package io.sentry.spring.kafka + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.Producer +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.core.ProducerPostProcessor + +class SentryKafkaProducerBeanPostProcessorTest { + + @Test + fun `registers Sentry post-processor on ProducerFactory`() { + val factory = mock>() + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + val captor = argumentCaptor>() + verify(factory).addPostProcessor(captor.capture()) + assertTrue( + captor.firstValue is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) + } + + @Test + fun `does not throw when addPostProcessor is a no-op (default interface method)`() { + // Factory using the default no-op addPostProcessor / getPostProcessors + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) + val processor = SentryKafkaProducerBeanPostProcessor() + + // Should complete without throwing, and log a warning via ScopesAdapter + processor.postProcessAfterInitialization(factory, "myFactory") + + verify(factory).addPostProcessor(any()) + } + + @Test + fun `does not modify non-ProducerFactory beans`() { + val someBean = "not a producer factory" + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `returns the same bean instance`() { + val factory = mock>() + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertSame(factory, result, "BPP must return the same bean, not a replacement") + } + + @Test + fun `registered post-processor wraps producers via SentryKafkaProducer wrap`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + + val wrapped = pp.apply(raw) + + assertTrue(java.lang.reflect.Proxy.isProxyClass(wrapped.javaClass)) + } + + @Test + fun `integrates with DefaultKafkaProducerFactory addPostProcessor contract`() { + // Sanity check against the real Spring Kafka API surface — DefaultKafkaProducerFactory + // honors addPostProcessor and exposes it via getPostProcessors(). + val factory = DefaultKafkaProducerFactory(emptyMap()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertEquals(1, factory.postProcessors.size) + assertTrue( + factory.postProcessors.first() + is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) + } +} diff --git a/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt new file mode 100644 index 00000000000..0fc5187b4d4 --- /dev/null +++ b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt @@ -0,0 +1,465 @@ +package io.sentry.spring.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.SpanDataConvention +import io.sentry.TransactionContext +import io.sentry.kafka.SentryKafkaProducer +import io.sentry.test.initForTest +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Optional +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNull +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.listener.RecordInterceptor +import org.springframework.kafka.support.KafkaHeaders + +class SentryKafkaRecordInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes + private lateinit var options: SentryOptions + private lateinit var consumer: Consumer + private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: SentryTracer + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + consumer = mock() + lifecycleToken = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + whenever(scopes.options).thenReturn(options) + whenever(scopes.isEnabled).thenReturn(true) + + forkedScopes = mock() + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + + transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createRecord( + topic: String = "my-topic", + headers: RecordHeaders = RecordHeaders(), + serializedValueSize: Int = -1, + ): ConsumerRecord { + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) + } + + private fun createRecordWithHeaders( + sentryTrace: String? = null, + baggage: String? = null, + baggageHeaders: List? = null, + enqueuedTime: String? = null, + deliveryAttempt: Int? = null, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggageHeaders?.forEach { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, + it.toByteArray(StandardCharsets.UTF_8), + ) + } + deliveryAttempt?.let { + headers.add( + KafkaHeaders.DELIVERY_ATTEMPT, + ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(), + ) + } + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + @Test + fun `intercept forks root scopes`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") + verify(forkedScopes).makeCurrent() + } + + @Test + fun `intercept continues trace from headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept calls continueTrace with null when no headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept passes all baggage headers to continueTrace`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = + createRecordWithHeaders( + sentryTrace = sentryTraceValue, + baggageHeaders = listOf("third=party", "sentry-sample_rate=1"), + ) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace( + org.mockito.kotlin.eq(sentryTraceValue), + org.mockito.kotlin.eq(listOf("third=party", "sentry-sample_rate=1")), + ) + } + + @Test + fun `sets body size from serializedValueSize`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = 42) + + interceptor.intercept(record, consumer) + + assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `does not set body size when serializedValueSize is negative`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = -1) + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `sets retry count from delivery attempt header`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecordWithHeaders(deliveryAttempt = 3) + + interceptor.intercept(record, consumer) + + assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `does not set retry count when delivery attempt header is missing`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `sets receive latency from enqueued time in epoch seconds`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() + val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) + + interceptor.intercept(record, consumer) + + val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) + assertTrue(latency is Long && latency >= 0) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedRootScopes(any()) + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `does not create span when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedRootScopes(any()) + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `delegates to existing interceptor`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + interceptor.intercept(record, consumer) + + verify(delegate).intercept(record, consumer) + } + + @Test + fun `success finishes transaction and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + + verify(delegate).success(record, consumer) + } + + @Test + fun `failure finishes transaction with error and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + val exception = RuntimeException("processing failed") + + interceptor.intercept(record, consumer) + interceptor.failure(record, exception, consumer) + + verify(delegate).failure(record, exception, consumer) + } + + @Test + fun `afterRecord delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.afterRecord(record, consumer) + + verify(delegate).afterRecord(record, consumer) + } + + @Test + fun `trace origin is set correctly`() { + assertEquals("auto.queue.spring.kafka.consumer", SentryKafkaRecordInterceptor.TRACE_ORIGIN) + } + + @Test + fun `clearThreadState cleans up stale context`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken).close() + } + + @Test + fun `clearThreadState is no-op when no context exists`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.clearThreadState(consumer) + } + + @Test + fun `setupThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + + verify(delegate).setupThreadState(consumer) + } + + @Test + fun `setupThreadState is no-op without delegate`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.setupThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.clearThreadState(consumer) + + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() { + val delegate = mock>() + whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom")) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.intercept(record, consumer) + + try { + interceptor.clearThreadState(consumer) + } catch (ignored: RuntimeException) { + // expected + } + + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `full lifecycle intercept success clearThreadState closes token exactly once`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + interceptor.clearThreadState(consumer) + + // token closed once by success(); clearThreadState must not re-close it + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + // delegate hooks still delegated across the full lifecycle + verify(delegate).setupThreadState(consumer) + verify(delegate).success(record, consumer) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + // delegate filters the record — per Spring Kafka contract, success/failure will not be invoked + whenever(delegate.intercept(record, consumer)).thenReturn(null) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val result = interceptor.intercept(record, consumer) + interceptor.clearThreadState(consumer) + + assertNull(result) + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept throws clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + val boom = RuntimeException("delegate boom") + whenever(delegate.intercept(record, consumer)).thenThrow(boom) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val thrown = assertFailsWith { interceptor.intercept(record, consumer) } + assertEquals(boom, thrown) + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `intercept cleans up stale context from previous record`() { + val lifecycleToken2 = mock() + val forkedScopes2 = mock() + whenever(forkedScopes2.options).thenReturn(options) + whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2) + val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2) + whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) + + var callCount = 0 + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + whenever(scopes.forkedRootScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } + + // First intercept sets up context + interceptor.intercept(record, consumer) + + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + + // First lifecycle token should have been closed by the defensive cleanup + verify(lifecycleToken).close() + } +} diff --git a/test/system-test-runner.py b/test/system-test-runner.py index 6c6a8604f94..6f886ee6311 100644 --- a/test/system-test-runner.py +++ b/test/system-test-runner.py @@ -70,6 +70,9 @@ KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" KAFKA_BROKER_REQUIRED_MODULES = { "sentry-samples-console", + "sentry-samples-spring-boot", + "sentry-samples-spring-boot-opentelemetry", + "sentry-samples-spring-boot-opentelemetry-noagent", "sentry-samples-spring-boot-jakarta", "sentry-samples-spring-boot-jakarta-opentelemetry", "sentry-samples-spring-boot-jakarta-opentelemetry-noagent", @@ -78,6 +81,9 @@ "sentry-samples-spring-boot-4-opentelemetry-noagent", } KAFKA_PROFILE_REQUIRED_MODULES = { + "sentry-samples-spring-boot", + "sentry-samples-spring-boot-opentelemetry", + "sentry-samples-spring-boot-opentelemetry-noagent", "sentry-samples-spring-boot-jakarta", "sentry-samples-spring-boot-jakarta-opentelemetry", "sentry-samples-spring-boot-jakarta-opentelemetry-noagent",