diff --git a/.cursor/rules/pr.mdc b/.cursor/rules/pr.mdc index 08a07511c6..e15c0a0a56 100644 --- a/.cursor/rules/pr.mdc +++ b/.cursor/rules/pr.mdc @@ -258,3 +258,5 @@ git push **Never merge into the collection branch.** Syncing only happens between stack PR branches. The collection branch is untouched until the user merges PRs through GitHub. Prefer merge over rebase — it preserves commit history, doesn't invalidate existing review comments, and avoids the need for force-pushing. Only rebase if explicitly requested. + +**Never amend or force-push stack branches.** Do not use `git commit --amend`, `--force`, or `--force-with-lease` on branches that are part of a stack. Amending a pushed commit requires a force-push, which can cause GitHub to auto-merge or auto-close other PRs in the stack. If a commit needs fixing, add a new fixup commit instead. diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bd3b12794..5ce9b04b72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ ### Features +- Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288)) +- Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256)) +- Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) - Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136)) - By default, the SDK now extracts the organization ID from the DSN (e.g. `o123.ingest.sentry.io`) and compares it with the `sentry-org_id` value in incoming baggage headers. When the two differ, the SDK starts a fresh trace instead of continuing the foreign one. This guards against accidentally linking traces across organizations. - New option `enableStrictTraceContinuation` (default `false`): when enabled, both the SDK's org ID **and** the incoming baggage org ID must be present and match for a trace to be continued. Traces with a missing org ID on either side are rejected. Configurable via code (`setStrictTraceContinuation(true)`), `sentry.properties` (`enable-strict-trace-continuation=true`), Android manifest (`io.sentry.strict-trace-continuation.enabled`), or Spring Boot (`sentry.strict-trace-continuation=true`). diff --git a/README.md b/README.md index 25fedc8217..72737932c5 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Sentry SDK for Java and Android | sentry | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-jul | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jul?style=for-the-badge&logo=sentry&color=green) | | sentry-jdbc | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jdbc?style=for-the-badge&logo=sentry&color=green) | +| sentry-kafka | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-kafka?style=for-the-badge&logo=sentry&color=green) | | sentry-apollo | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-apollo-3 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-3?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-apollo-4 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-4?style=for-the-badge&logo=sentry&color=green) | 21 | diff --git a/buildSrc/src/main/java/Config.kt b/buildSrc/src/main/java/Config.kt index b5d1dafeb7..0e353f1c5e 100644 --- a/buildSrc/src/main/java/Config.kt +++ b/buildSrc/src/main/java/Config.kt @@ -80,6 +80,7 @@ object Config { val SENTRY_JCACHE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jcache" val SENTRY_QUARTZ_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.quartz" val SENTRY_JDBC_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jdbc" + val SENTRY_KAFKA_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.kafka" val SENTRY_OPENFEATURE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.openfeature" val SENTRY_LAUNCHDARKLY_SERVER_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.launchdarkly-server" val SENTRY_LAUNCHDARKLY_ANDROID_SDK_NAME = "$SENTRY_ANDROID_SDK_NAME.launchdarkly" diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index eb7ab86e4b..2238800c53 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -183,6 +183,8 @@ springboot3-starter-security = { module = "org.springframework.boot:spring-boot- springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-starter-jdbc", version.ref = "springboot3" } springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" } springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" } +spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" } springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" } springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" } springboot4-starter = { module = "org.springframework.boot:spring-boot-starter", version.ref = "springboot4" } diff --git a/sentry-kafka/README.md b/sentry-kafka/README.md new file mode 100644 index 0000000000..ef4b531985 --- /dev/null +++ b/sentry-kafka/README.md @@ -0,0 +1,5 @@ +# sentry-kafka + +This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly. + +Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks. diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api new file mode 100644 index 0000000000..30faaa1256 --- /dev/null +++ b/sentry-kafka/api/sentry-kafka.api @@ -0,0 +1,25 @@ +public final class io/sentry/kafka/BuildConfig { + public static final field SENTRY_KAFKA_SDK_NAME Ljava/lang/String; + public static final field VERSION_NAME Ljava/lang/String; +} + +public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { + public static final field TRACE_ORIGIN Ljava/lang/String; + public fun (Lio/sentry/IScopes;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onCommit (Ljava/util/Map;)V + public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords; +} + +public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { + public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; + public static final field TRACE_ORIGIN Ljava/lang/String; + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Ljava/lang/String;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V + public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; +} + diff --git a/sentry-kafka/build.gradle.kts b/sentry-kafka/build.gradle.kts new file mode 100644 index 0000000000..ee3ba0d4a6 --- /dev/null +++ b/sentry-kafka/build.gradle.kts @@ -0,0 +1,83 @@ +import net.ltgt.gradle.errorprone.errorprone +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile + +plugins { + `java-library` + id("io.sentry.javadoc") + alias(libs.plugins.kotlin.jvm) + jacoco + alias(libs.plugins.errorprone) + alias(libs.plugins.gradle.versions) + alias(libs.plugins.buildconfig) +} + +tasks.withType().configureEach { + compilerOptions.jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_1_8 +} + +dependencies { + api(projects.sentry) + compileOnly(libs.kafka.clients) + compileOnly(libs.jetbrains.annotations) + compileOnly(libs.nopen.annotations) + + errorprone(libs.errorprone.core) + errorprone(libs.nopen.checker) + errorprone(libs.nullaway) + + // tests + testImplementation(projects.sentryTestSupport) + testImplementation(kotlin(Config.kotlinStdLib)) + testImplementation(libs.kotlin.test.junit) + testImplementation(libs.mockito.kotlin) + testImplementation(libs.mockito.inline) + testImplementation(libs.kafka.clients) +} + +configure { test { java.srcDir("src/test/java") } } + +jacoco { toolVersion = libs.versions.jacoco.get() } + +tasks.jacocoTestReport { + reports { + xml.required.set(true) + html.required.set(false) + } +} + +tasks { + jacocoTestCoverageVerification { + violationRules { rule { limit { minimum = Config.QualityPlugins.Jacoco.minimumCoverage } } } + } + check { + dependsOn(jacocoTestCoverageVerification) + dependsOn(jacocoTestReport) + } +} + +tasks.withType().configureEach { + options.errorprone { + check("NullAway", net.ltgt.gradle.errorprone.CheckSeverity.ERROR) + option("NullAway:AnnotatedPackages", "io.sentry") + } +} + +buildConfig { + useJavaOutput() + packageName("io.sentry.kafka") + buildConfigField("String", "SENTRY_KAFKA_SDK_NAME", "\"${Config.Sentry.SENTRY_KAFKA_SDK_NAME}\"") + buildConfigField("String", "VERSION_NAME", "\"${project.version}\"") +} + +tasks.jar { + manifest { + attributes( + "Sentry-Version-Name" to project.version, + "Sentry-SDK-Name" to Config.Sentry.SENTRY_KAFKA_SDK_NAME, + "Sentry-SDK-Package-Name" to "maven:io.sentry:sentry-kafka", + "Implementation-Vendor" to "Sentry", + "Implementation-Title" to project.name, + "Implementation-Version" to project.version, + ) + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java new file mode 100644 index 0000000000..caa773352e --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java @@ -0,0 +1,95 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.IScopes; +import io.sentry.ITransaction; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +@ApiStatus.Internal +public final class SentryKafkaConsumerInterceptor implements ConsumerInterceptor { + + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer"; + + private final @NotNull IScopes scopes; + + public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { + this.scopes = scopes; + } + + @Override + public @NotNull ConsumerRecords onConsume(final @NotNull ConsumerRecords records) { + if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) { + return records; + } + + final @NotNull ConsumerRecord firstRecord = records.iterator().next(); + + try { + final @Nullable TransactionContext continued = continueTrace(firstRecord); + final @NotNull TransactionContext txContext = + continued != null ? continued : new TransactionContext("queue.receive", "queue.receive"); + txContext.setName("queue.receive"); + txContext.setOperation("queue.receive"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(false); + + final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions); + if (!transaction.isNoOp()) { + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic()); + transaction.setData("messaging.batch.message.count", records.count()); + transaction.setStatus(SpanStatus.OK); + transaction.finish(); + } + } catch (Throwable ignored) { + // Instrumentation must never break the customer's Kafka poll loop. + } + + return records; + } + + @Override + public void onCommit(final @NotNull Map offsets) {} + + @Override + public void close() {} + + @Override + public void configure(final @Nullable Map configs) {} + + private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + return scopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java new file mode 100644 index 0000000000..c6b3184b39 --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -0,0 +1,109 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISpan; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanOptions; +import io.sentry.SpanStatus; +import io.sentry.util.TracingUtils; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +@ApiStatus.Internal +public final class SentryKafkaProducerInterceptor implements ProducerInterceptor { + + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; + public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; + + private final @NotNull IScopes scopes; + private final @NotNull String traceOrigin; + + public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { + this(scopes, TRACE_ORIGIN); + } + + public SentryKafkaProducerInterceptor( + final @NotNull IScopes scopes, final @NotNull String traceOrigin) { + this.scopes = scopes; + this.traceOrigin = traceOrigin; + } + + @Override + public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { + if (!scopes.getOptions().isEnableQueueTracing()) { + return record; + } + + final @Nullable ISpan activeSpan = scopes.getSpan(); + if (activeSpan == null || activeSpan.isNoOp()) { + return record; + } + + try { + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(traceOrigin); + final @NotNull ISpan span = + activeSpan.startChild("queue.publish", record.topic(), spanOptions); + if (span.isNoOp()) { + return record; + } + + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + injectHeaders(record.headers(), span); + + span.setStatus(SpanStatus.OK); + span.finish(); + } catch (Throwable ignored) { + // Instrumentation must never break the customer's Kafka send. + } + + return record; + } + + @Override + public void onAcknowledgement( + final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} + + @Override + public void close() {} + + @Override + public void configure(final @Nullable Map configs) {} + + private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, null, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); + headers.add( + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } + } + + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis())) + .getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt new file mode 100644 index 0000000000..daee640793 --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt @@ -0,0 +1,72 @@ +package io.sentry.kafka + +import io.sentry.IScopes +import io.sentry.ITransaction +import io.sentry.SentryOptions +import io.sentry.TransactionContext +import io.sentry.TransactionOptions +import kotlin.test.Test +import kotlin.test.assertSame +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaConsumerInterceptorTest { + + @Test + fun `does nothing when queue tracing is disabled`() { + val scopes = mock() + val options = SentryOptions().apply { isEnableQueueTracing = false } + whenever(scopes.options).thenReturn(options) + + val interceptor = SentryKafkaConsumerInterceptor(scopes) + val records = singleRecordBatch() + + val result = interceptor.onConsume(records) + + assertSame(records, result) + verify(scopes, never()).startTransaction(any(), any()) + } + + @Test + fun `starts and finishes queue receive transaction for consumed batch`() { + val scopes = mock() + val options = SentryOptions().apply { isEnableQueueTracing = true } + val transaction = mock() + + whenever(scopes.options).thenReturn(options) + whenever(scopes.continueTrace(any(), any())).thenReturn(null) + whenever(scopes.startTransaction(any(), any())) + .thenReturn(transaction) + whenever(transaction.isNoOp).thenReturn(false) + + val interceptor = SentryKafkaConsumerInterceptor(scopes) + + interceptor.onConsume(singleRecordBatch()) + + verify(scopes).startTransaction(any(), any()) + verify(transaction).setData("messaging.system", "kafka") + verify(transaction).setData("messaging.destination.name", "my-topic") + verify(transaction).setData("messaging.batch.message.count", 1) + verify(transaction).finish() + } + + @Test + fun `commit callback is no-op`() { + val interceptor = SentryKafkaConsumerInterceptor(mock()) + + interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) + } + + private fun singleRecordBatch(): ConsumerRecords { + val partition = TopicPartition("my-topic", 0) + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + return ConsumerRecords(mapOf(partition to listOf(record))) + } +} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt new file mode 100644 index 0000000000..99b487c1c0 --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -0,0 +1,98 @@ +package io.sentry.kafka + +import io.sentry.IScopes +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.TransactionContext +import io.sentry.test.initForTest +import java.nio.charset.StandardCharsets +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.ProducerRecord +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever + +class SentryKafkaProducerInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + } + whenever(scopes.options).thenReturn(options) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } + + @Test + fun `creates queue publish span and injects headers`() { + val tx = createTransaction() + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(1, tx.spans.size) + val span = tx.spans.first() + assertEquals("queue.publish", span.operation) + assertEquals("my-topic", span.description) + assertEquals("kafka", span.data["messaging.system"]) + assertEquals("my-topic", span.data["messaging.destination.name"]) + assertEquals(SentryKafkaProducerInterceptor.TRACE_ORIGIN, span.spanContext.origin) + assertTrue(span.isFinished) + + val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) + assertNotNull(sentryTraceHeader) + + val enqueuedTimeHeader = + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) + assertNotNull(enqueuedTimeHeader) + val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble() + assertTrue(enqueuedTime > 0) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + val tx = createTransaction() + options.isEnableQueueTracing = false + val interceptor = SentryKafkaProducerInterceptor(scopes) + + interceptor.onSend(ProducerRecord("my-topic", "key", "value")) + + assertEquals(0, tx.spans.size) + } + + @Test + fun `returns original record when no active span`() { + whenever(scopes.span).thenReturn(null) + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val result = interceptor.onSend(record) + + assertSame(record, result) + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts index 86914467a6..87909294cd 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts @@ -52,6 +52,10 @@ dependencies { implementation(projects.sentryAsyncProfiler) implementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentlessSpring) + // kafka + implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) + // cache tracing implementation(libs.springboot3.starter.cache) implementation(libs.caffeine) diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java new file mode 100644 index 0000000000..8287d9a05a --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java new file mode 100644 index 0000000000..b65236c919 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application-kafka.properties new file mode 100644 index 0000000000..fe79e3faca --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application-kafka.properties @@ -0,0 +1,13 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.autoconfigure.exclude= +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application.properties b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application.properties index a3a59d290b..ff8897ad68 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application.properties +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/main/resources/application.properties @@ -35,6 +35,9 @@ spring.graphql.graphiql.enabled=true spring.graphql.websocket.path=/graphql spring.quartz.job-store-type=memory +# Kafka is only active with the 'kafka' profile (--spring.profiles.active=kafka) +spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration + # Cache tracing sentry.enable-cache-tracing=true spring.cache.cache-names=todos diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts index 37d7a94eec..0f20925f78 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts @@ -56,6 +56,10 @@ dependencies { implementation(libs.otel) implementation(projects.sentryAsyncProfiler) + // kafka + implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) + // cache tracing implementation(libs.springboot3.starter.cache) implementation(libs.caffeine) diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java new file mode 100644 index 0000000000..8287d9a05a --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java new file mode 100644 index 0000000000..b65236c919 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application-kafka.properties new file mode 100644 index 0000000000..fe79e3faca --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application-kafka.properties @@ -0,0 +1,13 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.autoconfigure.exclude= +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application.properties b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application.properties index 12a9ca1726..d19e874624 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application.properties +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/main/resources/application.properties @@ -35,6 +35,9 @@ spring.graphql.graphiql.enabled=true spring.graphql.websocket.path=/graphql spring.quartz.job-store-type=memory +# Kafka is only active with the 'kafka' profile (--spring.profiles.active=kafka) +spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration + # Cache tracing sentry.enable-cache-tracing=true spring.cache.cache-names=todos diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts index a945b87109..d58c3b53d7 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts @@ -59,6 +59,10 @@ dependencies { implementation(libs.springboot3.starter.cache) implementation(libs.caffeine) + // kafka + implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) + // OpenFeature SDK implementation(libs.openfeature) diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java new file mode 100644 index 0000000000..8287d9a05a --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java new file mode 100644 index 0000000000..b65236c919 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/java/io/sentry/samples/spring/boot/jakarta/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot.jakarta; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties new file mode 100644 index 0000000000..71e517b82a --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties @@ -0,0 +1,11 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.autoconfigure.exclude= +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application.properties b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application.properties index 60b92d369d..6a3dfb063b 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application.properties +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application.properties @@ -37,6 +37,10 @@ spring.quartz.job-store-type=memory # Cache tracing sentry.enable-cache-tracing=true + +# Kafka is only active with the 'kafka' profile (--spring.profiles.active=kafka) +spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration + spring.cache.cache-names=todos spring.cache.caffeine.spec=maximumSize=500,expireAfterAccess=600s diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt new file mode 100644 index 0000000000..43781cf2c5 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt @@ -0,0 +1,117 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +/** + * System tests for Kafka queue instrumentation. + * + * Requires: + * - The sample app running with `--spring.profiles.active=kafka` + * - A Kafka broker at localhost:9092 + * - The mock Sentry server at localhost:8000 + */ +class KafkaQueueSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `producer endpoint creates queue publish span`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + } + + @Test + fun `consumer creates queue process transaction`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-consumer-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // The consumer runs asynchronously, so wait for the queue.process transaction + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.process") + } + } + + @Test + fun `producer and consumer share same trace`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("trace-test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // Capture the trace ID from the producer transaction (has queue.publish span) + var producerTraceId: String? = null + testHelper.ensureTransactionReceived { transaction, _ -> + if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) { + producerTraceId = transaction.contexts.trace?.traceId?.toString() + true + } else { + false + } + } + + // Verify the consumer transaction has the same trace ID + // Use retryCount=3 since the consumer may take a moment to process + testHelper.ensureEnvelopeReceived(retryCount = 3) { envelopeString -> + val envelope = + testHelper.jsonSerializer.deserializeEnvelope(envelopeString.byteInputStream()) + ?: return@ensureEnvelopeReceived false + val txItem = + envelope.items.firstOrNull { it.header.type == io.sentry.SentryItemType.Transaction } + ?: return@ensureEnvelopeReceived false + val tx = + txItem.getTransaction(testHelper.jsonSerializer) ?: return@ensureEnvelopeReceived false + + tx.contexts.trace?.operation == "queue.process" && + tx.contexts.trace?.traceId?.toString() == producerTraceId + } + } + + @Test + fun `queue publish span has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + val span = transaction.spans.firstOrNull { it.op == "queue.publish" } + if (span == null) return@ensureTransactionReceived false + + val data = span.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } + + @Test + fun `queue process transaction has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("process-attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) { + return@ensureTransactionReceived false + } + + val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } +} diff --git a/sentry-spring-boot-jakarta/build.gradle.kts b/sentry-spring-boot-jakarta/build.gradle.kts index 0416651924..36b7dad3cc 100644 --- a/sentry-spring-boot-jakarta/build.gradle.kts +++ b/sentry-spring-boot-jakarta/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { compileOnly(projects.sentryGraphql) compileOnly(projects.sentryGraphql22) compileOnly(projects.sentryQuartz) + compileOnly(libs.spring.kafka3) compileOnly(Config.Libs.springWeb) compileOnly(Config.Libs.springWebflux) compileOnly(libs.context.propagation) @@ -70,6 +71,7 @@ dependencies { testImplementation(projects.sentryApacheHttpClient5) testImplementation(projects.sentryGraphql) testImplementation(projects.sentryGraphql22) + testImplementation(projects.sentryKafka) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryCore) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgent) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentcustomization) @@ -90,6 +92,7 @@ dependencies { testImplementation(libs.springboot3.starter) testImplementation(libs.springboot3.starter.aop) testImplementation(libs.springboot3.starter.graphql) + testImplementation(libs.spring.kafka3) testImplementation(libs.springboot3.starter.quartz) testImplementation(libs.springboot3.starter.security) testImplementation(libs.springboot3.starter.test) diff --git a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java index ef57868ad8..0499df95b1 100644 --- a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java +++ b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java @@ -31,6 +31,8 @@ import io.sentry.spring.jakarta.checkin.SentryQuartzConfiguration; import io.sentry.spring.jakarta.exception.SentryCaptureExceptionParameterPointcutConfiguration; import io.sentry.spring.jakarta.exception.SentryExceptionParameterAdviceConfiguration; +import io.sentry.spring.jakarta.kafka.SentryKafkaConsumerBeanPostProcessor; +import io.sentry.spring.jakarta.kafka.SentryKafkaProducerBeanPostProcessor; import io.sentry.spring.jakarta.opentelemetry.SentryOpenTelemetryAgentWithoutAutoInitConfiguration; import io.sentry.spring.jakarta.opentelemetry.SentryOpenTelemetryNoAgentConfiguration; import io.sentry.spring.jakarta.tracing.CombinedTransactionNameProvider; @@ -75,6 +77,7 @@ import org.springframework.core.annotation.Order; import org.springframework.core.env.Environment; import org.springframework.graphql.execution.DataFetcherExceptionResolverAdapter; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.web.client.RestClient; @@ -246,6 +249,27 @@ static class SentryCacheConfiguration { } } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(KafkaTemplate.class) + @ConditionalOnProperty(name = "sentry.enable-queue-tracing", havingValue = "true") + @ConditionalOnMissingClass("io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider") + @Open + static class SentryKafkaQueueConfiguration { + + @Bean + public static @NotNull SentryKafkaProducerBeanPostProcessor + sentryKafkaProducerBeanPostProcessor() { + SentryIntegrationPackageStorage.getInstance().addIntegration("SpringKafka"); + return new SentryKafkaProducerBeanPostProcessor(); + } + + @Bean + public static @NotNull SentryKafkaConsumerBeanPostProcessor + sentryKafkaConsumerBeanPostProcessor() { + return new SentryKafkaConsumerBeanPostProcessor(); + } + } + @Configuration(proxyBeanMethods = false) @ConditionalOnClass(ProceedingJoinPoint.class) @ConditionalOnProperty( diff --git a/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt new file mode 100644 index 0000000000..c0963580f3 --- /dev/null +++ b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt @@ -0,0 +1,70 @@ +package io.sentry.spring.boot.jakarta + +import io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider +import io.sentry.spring.jakarta.kafka.SentryKafkaConsumerBeanPostProcessor +import io.sentry.spring.jakarta.kafka.SentryKafkaProducerBeanPostProcessor +import kotlin.test.Test +import org.assertj.core.api.Assertions.assertThat +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.FilteredClassLoader +import org.springframework.boot.test.context.runner.ApplicationContextRunner + +class SentryKafkaAutoConfigurationTest { + + private val contextRunner = + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(SentryAutoConfiguration::class.java)) + .withPropertyValues( + "sentry.dsn=http://key@localhost/proj", + "sentry.traces-sample-rate=1.0", + "sentry.shutdownTimeoutMillis=0", + "sentry.sessionFlushTimeoutMillis=0", + "sentry.flushTimeoutMillis=0", + "sentry.readTimeoutMillis=50", + "sentry.connectionTimeoutMillis=50", + "sentry.send-modules=false", + "sentry.debug=false", + ) + + /** Hide the OTel customizer so conditions evaluate as "no OTel present". */ + private val noOtelClassLoader = + FilteredClassLoader(SentryAutoConfigurationCustomizerProvider::class.java) + + @Test + fun `registers Kafka BPPs when queue tracing is enabled`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).hasSingleBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).hasSingleBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is disabled`() { + contextRunner.withClassLoader(noOtelClassLoader).run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is explicitly false`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=false") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when OpenTelemetry integration is present`() { + contextRunner.withPropertyValues("sentry.enable-queue-tracing=true").run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } +} diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index fe634da6f4..edfa6399d7 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -244,6 +244,28 @@ public final class io/sentry/spring/jakarta/graphql/SentrySpringSubscriptionHand public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V + public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V +} + public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/build.gradle.kts b/sentry-spring-jakarta/build.gradle.kts index f1920e2451..cbf2e5346b 100644 --- a/sentry-spring-jakarta/build.gradle.kts +++ b/sentry-spring-jakarta/build.gradle.kts @@ -29,6 +29,7 @@ tasks.withType().configureEach { dependencies { api(projects.sentry) + compileOnly(projects.sentryKafka) compileOnly(platform(SpringBootPlugin.BOM_COORDINATES)) compileOnly(Config.Libs.springWeb) compileOnly(Config.Libs.springAop) @@ -41,6 +42,7 @@ dependencies { compileOnly(libs.servlet.jakarta.api) compileOnly(libs.slf4j.api) compileOnly(libs.springboot3.starter.graphql) + compileOnly(libs.spring.kafka3) compileOnly(libs.springboot3.starter.quartz) compileOnly(Config.Libs.springWebflux) @@ -58,6 +60,7 @@ dependencies { // tests testImplementation(projects.sentryTestSupport) testImplementation(projects.sentryGraphql) + testImplementation(projects.sentryKafka) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(libs.awaitility.kotlin) testImplementation(libs.context.propagation) @@ -68,6 +71,7 @@ dependencies { testImplementation(libs.springboot3.starter.aop) testImplementation(libs.springboot3.starter.graphql) testImplementation(libs.springboot3.starter.security) + testImplementation(libs.spring.kafka3) testImplementation(libs.springboot3.starter.test) testImplementation(libs.springboot3.starter.web) testImplementation(libs.springboot3.starter.webflux) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java new file mode 100644 index 0000000000..f272a575cb --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -0,0 +1,71 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import java.lang.reflect.Field; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} + * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. + */ +@ApiStatus.Internal +public final class SentryKafkaConsumerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof AbstractKafkaListenerContainerFactory) { + final @NotNull AbstractKafkaListenerContainerFactory factory = + (AbstractKafkaListenerContainerFactory) bean; + + final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + if (existing instanceof SentryKafkaRecordInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final RecordInterceptor sentryInterceptor = + new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); + factory.setRecordInterceptor(sentryInterceptor); + } + return bean; + } + + @SuppressWarnings("unchecked") + private @Nullable RecordInterceptor getExistingInterceptor( + final @NotNull AbstractKafkaListenerContainerFactory factory) { + try { + final @NotNull Field field = + AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); + field.setAccessible(true); + return (RecordInterceptor) field.get(factory); + } catch (NoSuchFieldException | IllegalAccessException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Unable to read existing recordInterceptor from " + + "AbstractKafkaListenerContainerFactory via reflection. " + + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", + e); + return null; + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java new file mode 100644 index 0000000000..4ce6a7c5ed --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -0,0 +1,84 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import io.sentry.kafka.SentryKafkaProducerInterceptor; +import java.lang.reflect.Field; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.CompositeProducerInterceptor; + +/** + * Sets a {@link SentryKafkaProducerInterceptor} on {@link KafkaTemplate} beans via {@link + * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. + * + *

If the template already has a {@link ProducerInterceptor}, both are composed using {@link + * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public + * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry + * interceptor is set. + */ +@ApiStatus.Internal +public final class SentryKafkaProducerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof KafkaTemplate) { + final @NotNull KafkaTemplate template = (KafkaTemplate) bean; + final @Nullable ProducerInterceptor existing = getExistingInterceptor(template); + + if (existing instanceof SentryKafkaProducerInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final SentryKafkaProducerInterceptor sentryInterceptor = + new SentryKafkaProducerInterceptor<>( + ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); + + if (existing != null) { + @SuppressWarnings("rawtypes") + final CompositeProducerInterceptor composite = + new CompositeProducerInterceptor(sentryInterceptor, existing); + template.setProducerInterceptor(composite); + } else { + template.setProducerInterceptor(sentryInterceptor); + } + } + return bean; + } + + @SuppressWarnings("unchecked") + private @Nullable ProducerInterceptor getExistingInterceptor( + final @NotNull KafkaTemplate template) { + try { + final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); + field.setAccessible(true); + return (ProducerInterceptor) field.get(template); + } catch (NoSuchFieldException | IllegalAccessException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " + + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", + e); + return null; + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java new file mode 100644 index 0000000000..9cfda3c237 --- /dev/null +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -0,0 +1,257 @@ +package io.sentry.spring.jakarta.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.Sentry; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import io.sentry.kafka.SentryKafkaProducerInterceptor; +import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.KafkaHeaders; + +/** + * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka + * records with distributed tracing support. + */ +@ApiStatus.Internal +public final class SentryKafkaRecordInterceptor implements RecordInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.consumer"; + + private final @NotNull IScopes scopes; + private final @Nullable RecordInterceptor delegate; + + private static final @NotNull ThreadLocal currentContext = + new ThreadLocal<>(); + + public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { + this(scopes, null); + } + + public SentryKafkaRecordInterceptor( + final @NotNull IScopes scopes, final @Nullable RecordInterceptor delegate) { + this.scopes = scopes; + this.delegate = delegate; + } + + @Override + public @Nullable ConsumerRecord intercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegateIntercept(record, consumer); + } + + finishStaleContext(); + + final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor"); + final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); + + final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); + + final @Nullable ITransaction transaction = + startTransaction(forkedScopes, record, transactionContext); + currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); + + return delegateIntercept(record, consumer); + } + + @Override + public void success( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.success(record, consumer); + } + } finally { + finishSpan(SpanStatus.OK, null); + } + } + + @Override + public void failure( + final @NotNull ConsumerRecord record, + final @NotNull Exception exception, + final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.failure(record, exception, consumer); + } + } finally { + finishSpan(SpanStatus.INTERNAL_ERROR, exception); + } + } + + @Override + public void afterRecord( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.afterRecord(record, consumer); + } + } + + @Override + public void clearThreadState(final @NotNull Consumer consumer) { + finishStaleContext(); + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + + private @Nullable ConsumerRecord delegateIntercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + return delegate.intercept(record, consumer); + } + return record; + } + + private @Nullable TransactionContext continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, + final @NotNull ConsumerRecord record, + final @Nullable TransactionContext transactionContext) { + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionContext txContext = + transactionContext != null + ? transactionContext + : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); + + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, "messaging.message.id"); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr != null) { + try { + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); + if (latencyMs >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); + } + } catch (NumberFormatException ignored) { + // ignore malformed header + } + } + + return transaction; + } + + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + + private void finishStaleContext() { + if (currentContext.get() != null) { + finishSpan(SpanStatus.UNKNOWN, null); + } + } + + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { + final @Nullable SentryRecordContext ctx = currentContext.get(); + if (ctx == null) { + return; + } + currentContext.remove(); + + try { + final @Nullable ITransaction transaction = ctx.transaction; + if (transaction != null) { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } + } finally { + ctx.lifecycleToken.close(); + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + + private static final class SentryRecordContext { + final @NotNull ISentryLifecycleToken lifecycleToken; + final @Nullable ITransaction transaction; + + SentryRecordContext( + final @NotNull ISentryLifecycleToken lifecycleToken, + final @Nullable ITransaction transaction) { + this.lifecycleToken = lifecycleToken; + this.transaction = transaction; + } + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt new file mode 100644 index 0000000000..8595cb9ae7 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -0,0 +1,58 @@ +package io.sentry.spring.jakarta.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.mockito.kotlin.mock +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory + +class SentryKafkaConsumerBeanPostProcessorTest { + + @Test + fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + // Verify via reflection that the interceptor was set + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val interceptor = field.get(factory) + assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val processor = SentryKafkaConsumerBeanPostProcessor() + // First wrap + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val firstInterceptor = field.get(factory) + + // Second wrap — should be idempotent + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + val secondInterceptor = field.get(factory) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not wrap non-factory beans`() { + val someBean = "not a factory" + val processor = SentryKafkaConsumerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt new file mode 100644 index 0000000000..f0247178f2 --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -0,0 +1,79 @@ +package io.sentry.spring.jakarta.kafka + +import io.sentry.kafka.SentryKafkaProducerInterceptor +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.ProducerInterceptor +import org.mockito.kotlin.mock +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.CompositeProducerInterceptor + +class SentryKafkaProducerBeanPostProcessorTest { + + private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { + val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") + field.isAccessible = true + return field.get(template) + } + + @Test + fun `sets SentryKafkaProducerInterceptor on KafkaTemplate`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertTrue(readInterceptor(template) is SentryKafkaProducerInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaProducerInterceptor already set`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + val firstInterceptor = readInterceptor(template) + + processor.postProcessAfterInitialization(template, "kafkaTemplate") + val secondInterceptor = readInterceptor(template) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not modify non-KafkaTemplate beans`() { + val someBean = "not a kafka template" + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `returns the same bean instance`() { + val template = KafkaTemplate(mock>()) + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertSame(template, result, "BPP should return the same bean, not a replacement") + } + + @Test + fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { + val template = KafkaTemplate(mock>()) + val customerInterceptor = mock>() + template.setProducerInterceptor(customerInterceptor) + + val processor = SentryKafkaProducerBeanPostProcessor() + processor.postProcessAfterInitialization(template, "kafkaTemplate") + + assertTrue( + readInterceptor(template) is CompositeProducerInterceptor<*, *>, + "Should use CompositeProducerInterceptor when existing interceptor is present", + ) + } +} diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt new file mode 100644 index 0000000000..1239b4007e --- /dev/null +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -0,0 +1,315 @@ +package io.sentry.spring.jakarta.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.SpanDataConvention +import io.sentry.TransactionContext +import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.test.initForTest +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.mockito.Mockito +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.listener.RecordInterceptor +import org.springframework.kafka.support.KafkaHeaders + +class SentryKafkaRecordInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes + private lateinit var options: SentryOptions + private lateinit var consumer: Consumer + private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: SentryTracer + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + consumer = mock() + lifecycleToken = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + whenever(scopes.options).thenReturn(options) + whenever(scopes.isEnabled).thenReturn(true) + + forkedScopes = mock() + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + + transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun withMockSentry(closure: () -> T): T = + Mockito.mockStatic(Sentry::class.java).use { + it.`when` { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes) + it.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) + closure.invoke() + } + + private fun createRecord( + topic: String = "my-topic", + headers: RecordHeaders = RecordHeaders(), + ): ConsumerRecord { + val record = ConsumerRecord(topic, 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + private fun createRecordWithHeaders( + sentryTrace: String? = null, + baggage: String? = null, + enqueuedTime: String? = null, + deliveryAttempt: Int? = null, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + it.toByteArray(StandardCharsets.UTF_8), + ) + } + deliveryAttempt?.let { + headers.add( + KafkaHeaders.DELIVERY_ATTEMPT, + ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(), + ) + } + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + @Test + fun `intercept forks root scopes`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + verify(forkedScopes).makeCurrent() + } + + @Test + fun `intercept continues trace from headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) + + withMockSentry { interceptor.intercept(record, consumer) } + + verify(forkedScopes) + .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept calls continueTrace with null when no headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) + } + + @Test + fun `sets retry count from delivery attempt header`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecordWithHeaders(deliveryAttempt = 3) + + withMockSentry { interceptor.intercept(record, consumer) } + + assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `does not set retry count when delivery attempt header is missing`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `sets receive latency from enqueued time in epoch seconds`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() + val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) + + withMockSentry { interceptor.intercept(record, consumer) } + + val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) + assertTrue(latency is Long && latency >= 0) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `does not create span when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `delegates to existing interceptor`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + withMockSentry { interceptor.intercept(record, consumer) } + + verify(delegate).intercept(record, consumer) + } + + @Test + fun `success finishes transaction and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + interceptor.success(record, consumer) + + verify(delegate).success(record, consumer) + } + + @Test + fun `failure finishes transaction with error and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + val exception = RuntimeException("processing failed") + + withMockSentry { interceptor.intercept(record, consumer) } + interceptor.failure(record, exception, consumer) + + verify(delegate).failure(record, exception, consumer) + } + + @Test + fun `afterRecord delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.afterRecord(record, consumer) + + verify(delegate).afterRecord(record, consumer) + } + + @Test + fun `trace origin is set correctly`() { + assertEquals( + "auto.queue.spring_jakarta.kafka.consumer", + SentryKafkaRecordInterceptor.TRACE_ORIGIN, + ) + } + + @Test + fun `clearThreadState cleans up stale context`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + withMockSentry { interceptor.intercept(record, consumer) } + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken).close() + } + + @Test + fun `clearThreadState is no-op when no context exists`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.clearThreadState(consumer) + } + + @Test + fun `intercept cleans up stale context from previous record`() { + val lifecycleToken2 = mock() + val forkedScopes2 = mock() + whenever(forkedScopes2.options).thenReturn(options) + whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2) + val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2) + whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) + + var callCount = 0 + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + Mockito.mockStatic(Sentry::class.java).use { mockSentry -> + mockSentry.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) + mockSentry + .`when` { Sentry.forkedRootScopes(any()) } + .thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } + + // First intercept sets up context + interceptor.intercept(record, consumer) + + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + } + + // First lifecycle token should have been closed by the defensive cleanup + verify(lifecycleToken).close() + } +} diff --git a/sentry-system-test-support/api/sentry-system-test-support.api b/sentry-system-test-support/api/sentry-system-test-support.api index 83a9f288d0..1cbec85751 100644 --- a/sentry-system-test-support/api/sentry-system-test-support.api +++ b/sentry-system-test-support/api/sentry-system-test-support.api @@ -560,6 +560,8 @@ public final class io/sentry/systemtest/util/RestTestClient : io/sentry/systemte public final fun getTodo (J)Lio/sentry/systemtest/Todo; public final fun getTodoRestClient (J)Lio/sentry/systemtest/Todo; public final fun getTodoWebclient (J)Lio/sentry/systemtest/Todo; + public final fun produceKafkaMessage (Ljava/lang/String;)Ljava/lang/String; + public static synthetic fun produceKafkaMessage$default (Lio/sentry/systemtest/util/RestTestClient;Ljava/lang/String;ILjava/lang/Object;)Ljava/lang/String; public final fun saveCachedTodo (Lio/sentry/systemtest/Todo;)Lio/sentry/systemtest/Todo; } diff --git a/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt b/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt index da552ff93b..b9dc0f3cca 100644 --- a/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt +++ b/sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt @@ -81,6 +81,12 @@ class RestTestClient(private val backendBaseUrl: String) : LoggingInsecureRestCl return response?.body?.string() } + fun produceKafkaMessage(message: String = "hello from sentry!"): String? { + val request = Request.Builder().url("$backendBaseUrl/kafka/produce?message=$message") + + return callTyped(request, true) + } + fun getCountMetric(): String? { val request = Request.Builder().url("$backendBaseUrl/metric/count") diff --git a/sentry/api/sentry.api b/sentry/api/sentry.api index b9cbb2ae1b..9e5f09320b 100644 --- a/sentry/api/sentry.api +++ b/sentry/api/sentry.api @@ -529,6 +529,7 @@ public final class io/sentry/ExternalOptions { public fun isEnableLogs ()Ljava/lang/Boolean; public fun isEnableMetrics ()Ljava/lang/Boolean; public fun isEnablePrettySerializationOutput ()Ljava/lang/Boolean; + public fun isEnableQueueTracing ()Ljava/lang/Boolean; public fun isEnableSpotlight ()Ljava/lang/Boolean; public fun isEnabled ()Ljava/lang/Boolean; public fun isForceInit ()Ljava/lang/Boolean; @@ -548,6 +549,7 @@ public final class io/sentry/ExternalOptions { public fun setEnableLogs (Ljava/lang/Boolean;)V public fun setEnableMetrics (Ljava/lang/Boolean;)V public fun setEnablePrettySerializationOutput (Ljava/lang/Boolean;)V + public fun setEnableQueueTracing (Ljava/lang/Boolean;)V public fun setEnableSpotlight (Ljava/lang/Boolean;)V public fun setEnableUncaughtExceptionHandler (Ljava/lang/Boolean;)V public fun setEnabled (Ljava/lang/Boolean;)V @@ -3688,6 +3690,7 @@ public class io/sentry/SentryOptions { public fun isEnableEventSizeLimiting ()Z public fun isEnableExternalConfiguration ()Z public fun isEnablePrettySerializationOutput ()Z + public fun isEnableQueueTracing ()Z public fun isEnableScopePersistence ()Z public fun isEnableScreenTracking ()Z public fun isEnableShutdownHook ()Z @@ -3748,6 +3751,7 @@ public class io/sentry/SentryOptions { public fun setEnableEventSizeLimiting (Z)V public fun setEnableExternalConfiguration (Z)V public fun setEnablePrettySerializationOutput (Z)V + public fun setEnableQueueTracing (Z)V public fun setEnableScopePersistence (Z)V public fun setEnableScreenTracking (Z)V public fun setEnableShutdownHook (Z)V @@ -4392,6 +4396,12 @@ public abstract interface class io/sentry/SpanDataConvention { public static final field HTTP_RESPONSE_CONTENT_LENGTH_KEY Ljava/lang/String; public static final field HTTP_START_TIMESTAMP Ljava/lang/String; public static final field HTTP_STATUS_CODE_KEY Ljava/lang/String; + public static final field MESSAGING_DESTINATION_NAME Ljava/lang/String; + public static final field MESSAGING_MESSAGE_BODY_SIZE Ljava/lang/String; + public static final field MESSAGING_MESSAGE_ID Ljava/lang/String; + public static final field MESSAGING_MESSAGE_RECEIVE_LATENCY Ljava/lang/String; + public static final field MESSAGING_MESSAGE_RETRY_COUNT Ljava/lang/String; + public static final field MESSAGING_SYSTEM Ljava/lang/String; public static final field PROFILER_ID Ljava/lang/String; public static final field THREAD_ID Ljava/lang/String; public static final field THREAD_NAME Ljava/lang/String; diff --git a/sentry/src/main/java/io/sentry/ExternalOptions.java b/sentry/src/main/java/io/sentry/ExternalOptions.java index e992c04466..4e44ea422e 100644 --- a/sentry/src/main/java/io/sentry/ExternalOptions.java +++ b/sentry/src/main/java/io/sentry/ExternalOptions.java @@ -58,6 +58,7 @@ public final class ExternalOptions { private @Nullable Boolean enableBackpressureHandling; private @Nullable Boolean enableDatabaseTransactionTracing; private @Nullable Boolean enableCacheTracing; + private @Nullable Boolean enableQueueTracing; private @Nullable Boolean globalHubMode; private @Nullable Boolean forceInit; private @Nullable Boolean captureOpenTelemetryEvents; @@ -168,6 +169,8 @@ public final class ExternalOptions { options.setEnableCacheTracing(propertiesProvider.getBooleanProperty("enable-cache-tracing")); + options.setEnableQueueTracing(propertiesProvider.getBooleanProperty("enable-queue-tracing")); + options.setGlobalHubMode(propertiesProvider.getBooleanProperty("global-hub-mode")); options.setCaptureOpenTelemetryEvents( @@ -541,6 +544,14 @@ public void setEnableCacheTracing(final @Nullable Boolean enableCacheTracing) { return enableCacheTracing; } + public void setEnableQueueTracing(final @Nullable Boolean enableQueueTracing) { + this.enableQueueTracing = enableQueueTracing; + } + + public @Nullable Boolean isEnableQueueTracing() { + return enableQueueTracing; + } + public void setGlobalHubMode(final @Nullable Boolean globalHubMode) { this.globalHubMode = globalHubMode; } diff --git a/sentry/src/main/java/io/sentry/SentryOptions.java b/sentry/src/main/java/io/sentry/SentryOptions.java index 86086f8816..819789678e 100644 --- a/sentry/src/main/java/io/sentry/SentryOptions.java +++ b/sentry/src/main/java/io/sentry/SentryOptions.java @@ -508,6 +508,9 @@ public class SentryOptions { /** Whether cache operations (get, put, remove, flush) should be traced. */ private boolean enableCacheTracing = false; + /** Whether queue operations (publish, process) should be traced. */ + private boolean enableQueueTracing = false; + /** Date provider to retrieve the current date from. */ @ApiStatus.Internal private final @NotNull LazyEvaluator dateProvider = @@ -2704,6 +2707,24 @@ public void setEnableCacheTracing(boolean enableCacheTracing) { this.enableCacheTracing = enableCacheTracing; } + /** + * Whether queue operations (publish, process) should be traced. + * + * @return true if queue operations should be traced + */ + public boolean isEnableQueueTracing() { + return enableQueueTracing; + } + + /** + * Whether queue operations (publish, process) should be traced. + * + * @param enableQueueTracing true if queue operations should be traced + */ + public void setEnableQueueTracing(boolean enableQueueTracing) { + this.enableQueueTracing = enableQueueTracing; + } + /** * Whether Sentry is enabled. * @@ -3545,6 +3566,9 @@ public void merge(final @NotNull ExternalOptions options) { if (options.isEnableCacheTracing() != null) { setEnableCacheTracing(options.isEnableCacheTracing()); } + if (options.isEnableQueueTracing() != null) { + setEnableQueueTracing(options.isEnableQueueTracing()); + } if (options.getMaxRequestBodySize() != null) { setMaxRequestBodySize(options.getMaxRequestBodySize()); } diff --git a/sentry/src/main/java/io/sentry/SpanDataConvention.java b/sentry/src/main/java/io/sentry/SpanDataConvention.java index 647c0dacdd..047a235422 100644 --- a/sentry/src/main/java/io/sentry/SpanDataConvention.java +++ b/sentry/src/main/java/io/sentry/SpanDataConvention.java @@ -30,4 +30,10 @@ public interface SpanDataConvention { String CACHE_KEY = "cache.key"; String CACHE_OPERATION = "cache.operation"; String CACHE_WRITE = "cache.write"; + String MESSAGING_SYSTEM = "messaging.system"; + String MESSAGING_DESTINATION_NAME = "messaging.destination.name"; + String MESSAGING_MESSAGE_ID = "messaging.message.id"; + String MESSAGING_MESSAGE_RETRY_COUNT = "messaging.message.retry.count"; + String MESSAGING_MESSAGE_BODY_SIZE = "messaging.message.body.size"; + String MESSAGING_MESSAGE_RECEIVE_LATENCY = "messaging.message.receive.latency"; } diff --git a/sentry/src/main/java/io/sentry/util/SpanUtils.java b/sentry/src/main/java/io/sentry/util/SpanUtils.java index cad4d48365..c324feed84 100644 --- a/sentry/src/main/java/io/sentry/util/SpanUtils.java +++ b/sentry/src/main/java/io/sentry/util/SpanUtils.java @@ -40,6 +40,10 @@ public final class SpanUtils { origins.add("auto.http.spring7.resttemplate"); origins.add("auto.http.openfeign"); origins.add("auto.http.ktor-client"); + origins.add("auto.queue.spring_jakarta.kafka.producer"); + origins.add("auto.queue.spring_jakarta.kafka.consumer"); + origins.add("auto.queue.kafka.producer"); + origins.add("auto.queue.kafka.consumer"); } if (SentryOpenTelemetryMode.AGENT == mode) { diff --git a/sentry/src/test/java/io/sentry/ExternalOptionsTest.kt b/sentry/src/test/java/io/sentry/ExternalOptionsTest.kt index 5463035555..fee707d31f 100644 --- a/sentry/src/test/java/io/sentry/ExternalOptionsTest.kt +++ b/sentry/src/test/java/io/sentry/ExternalOptionsTest.kt @@ -345,6 +345,20 @@ class ExternalOptionsTest { } } + @Test + fun `creates options with enableQueueTracing set to true`() { + withPropertiesFile("enable-queue-tracing=true") { options -> + assertTrue(options.isEnableQueueTracing == true) + } + } + + @Test + fun `creates options with enableQueueTracing set to false`() { + withPropertiesFile("enable-queue-tracing=false") { options -> + assertTrue(options.isEnableQueueTracing == false) + } + } + @Test fun `creates options with cron defaults`() { withPropertiesFile( diff --git a/sentry/src/test/java/io/sentry/SentryOptionsTest.kt b/sentry/src/test/java/io/sentry/SentryOptionsTest.kt index da014b30f7..e18438707b 100644 --- a/sentry/src/test/java/io/sentry/SentryOptionsTest.kt +++ b/sentry/src/test/java/io/sentry/SentryOptionsTest.kt @@ -708,6 +708,11 @@ class SentryOptionsTest { assertFalse(SentryOptions().isEnableCacheTracing) } + @Test + fun `when options are initialized, enableQueueTracing is set to false by default`() { + assertFalse(SentryOptions().isEnableQueueTracing) + } + @Test fun `when options are initialized, metrics is enabled by default`() { assertTrue(SentryOptions().metrics.isEnabled) @@ -1018,6 +1023,23 @@ class SentryOptionsTest { assertEquals("original", options.orgId) } + @Test + fun `merging options applies enableQueueTracing`() { + val externalOptions = ExternalOptions() + externalOptions.setEnableQueueTracing(true) + val options = SentryOptions() + options.merge(externalOptions) + assertTrue(options.isEnableQueueTracing) + } + + @Test + fun `merging options preserves enableQueueTracing default when not set`() { + val externalOptions = ExternalOptions() + val options = SentryOptions() + options.merge(externalOptions) + assertFalse(options.isEnableQueueTracing) + } + @Test fun `getEffectiveOrgId prefers explicit orgId over DSN`() { val options = SentryOptions() diff --git a/settings.gradle.kts b/settings.gradle.kts index 8d431d5fbd..4b1c606bc6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -58,6 +58,7 @@ include( "sentry-graphql-22", "sentry-graphql-core", "sentry-jdbc", + "sentry-kafka", "sentry-opentelemetry:sentry-opentelemetry-bootstrap", "sentry-opentelemetry:sentry-opentelemetry-core", "sentry-opentelemetry:sentry-opentelemetry-agentcustomization",