feat(components): support Opentelemetry baggage#22705
Conversation
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
Closes CAMEL-23349
ea3d8a5 to
7fc9650
Compare
|
🧪 CI tested the following changed modules:
All tested modules (9 modules)
|
|
This solution to baggage management has a severe restriction, which unfortunately makes it unusable for us: baggage set via the header is not actually set until the next span is created, just as the doc says:
The only reason the unit tests work, is because they explicitly set traceProcessors=true, which cause technical spans to be created for each and every processor in the route. Such technical spans are IMHO generally not acceptable, since they pollute the tracing information with low-level, technical details. Take the BaggageSettingTest as example, as is now: tst.setTraceProcessors(true);
...
from("direct:start") // 1. Route span CREATED here - no OTEL_BAGGAGE_ headers yet
.setHeader("OTEL_BAGGAGE_tenant.id", constant("1234")) // 2. Header set AFTER route span created
.routeId("start")
.log("A message") // 3. Log processor creates a new span → Header is added to baggage
.process(new Processor() { // 4. Processor creates a new span → Header is added to baggage
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("operation", "fake");
assertEquals("1234", Baggage.current().getEntryValue("tenant.id")); // This succeeds
}
})
.to("log:info"); // 5. Log processor creates a new span → Header is added to baggageBut if traceProcessor=false (which it is by default, for good reasons), this is what happens: tst.setTraceProcessors(false);
...
from("direct:start") // 1. Route span CREATED here - no OTEL_BAGGAGE_ headers yet
.setHeader("OTEL_BAGGAGE_tenant.id", constant("1234")) // 2. Header set AFTER route span created
.routeId("start")
.log("A message") // 3. Logs within route span's context → baggage is EMPTY
.process(new Processor() { // 4. Processor within route span's context → baggage is EMPTY
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("operation", "fake");
assertEquals("1234", Baggage.current().getEntryValue("tenant.id")); // This fails
}
})
.to("log:info"); // 5. Logs within route span's context → baggage is EMPTYThis means all other, legitimate use of Baggage.current() such as log appenders will no see the baggage and hence an important correlation between traces and logs is broken. Baggage propagation might still work, since most producer components such as I don't think this is a very good solution. When setting a baggage entry programmatically, it should be added to the current span, not any eventual future spans. Since baggage is an OTEL-specific concept, its handling must comply with the semantics of the OTEL api and sdk. I hope we can give it a second round of thoughts. |
|
Hello. No, the What it happens when you turn the flag on: The same, but with the traceprocessors flag off: you can see that the |
|
In BaggageSettingTest, I change these two lines:
and get mvn test -Dtest=BaggageSettingTest#testRouteProgrammaticBaggage
...
[INFO] Running org.apache.camel.opentelemetry2.BaggageSettingTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 10.38 s <<< FAILURE! -- in org.apache.camel.opentelemetry2.BaggageSettingTest
[ERROR] org.apache.camel.opentelemetry2.BaggageSettingTest.testRouteProgrammaticBaggage -- Time elapsed: 10.19 s <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <1234> but was: <null>
at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:158)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:139)
at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:201)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:184)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:179)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1188)
at org.apache.camel.opentelemetry2.BaggageSettingTest$1$1.process(BaggageSettingTest.java:98)
at org.apache.camel.support.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:67)
at org.apache.camel.telemetry.TraceProcessorsInterceptStrategy$TraceProcessor.process(TraceProcessorsInterceptStrategy.java:84)
at org.apache.camel.support.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:117)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.handleFirst(RedeliveryErrorHandler.java:442)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:418)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:202)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:192)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:169)
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:143)
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:162)
at org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:385)
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:361)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:103)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.processNonTransacted(SharedCamelInternalProcessor.java:156)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:133)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:89)
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:81)
at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:86)
at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:180)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:175)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:171)
at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:152)
at org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:186)
at org.apache.camel.impl.engine.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:194)
at org.apache.camel.opentelemetry2.BaggageSettingTest.testRouteProgrammaticBaggage(BaggageSettingTest.java:56)
Then change |
That is because to:log (as most other producer components) creates a new span, and hence the baggage header is copied to the new span before it is logged. |
|
That's exactly how it is supposed to work. See the design proposal: https://github.com/apache/camel/blob/main/proposals/tracing.adoc#tracing-storage |
|
Sure, I think it makes perfect sense for producer components to create a new span. I was just trying to explain why the baggage is in your The reason this is important is that we (and I think many others) rely on the ability to easily correlate all telemetry signals (especially traces and logs). Hence any baggage set (either from the incoming message via the With OpenTelemetry instrumentation, the Log4j or Logback appenders handles this automatically: With the current solution, this is broken if here is a dedicated test for log ContextData with baggage: package org.apache.camel.opentelemetry2;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.junit.jupiter.api.Test;
public class BaggageLoggingTest extends OpenTelemetryTracerTestSupport {
Tracer tracer = otelExtension.getOpenTelemetry().getTracer("baggageLogging");
@Override
protected CamelContext createCamelContext() throws Exception {
OpenTelemetryTracer tst = new OpenTelemetryTracer();
tst.setTracer(tracer);
tst.setContextPropagators(otelExtension.getOpenTelemetry().getPropagators());
tst.setTraceProcessors(false);
CamelContext context = super.createCamelContext();
CamelContextAware.trySetCamelContext(tst, context);
tst.init(context);
return context;
}
@Test
void testRouteProgrammaticBaggageLogging() throws IOException {
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
TestAppender logAppender = (TestAppender) ctx.getConfiguration().getAppender("test");
logAppender.clear();
template.sendBody("direct:start", "my-body");
checkLogs(logAppender);
}
private static void checkLogs(TestAppender logAppender) {
assertTrue(logAppender.getEvents().size() == 2);
var toLogEvent = logAppender.getEvents().get(1);
assertTrue(toLogEvent.getMessage().toString().contains("my-body"));
assertTrue(toLogEvent.getContextData().containsKey("baggage.tenant.id"));
assertEquals("1234", toLogEvent.getContextData().getValue("baggage.tenant.id"));
var logEvent = logAppender.getEvents().get(0);
assertTrue(logEvent.getMessage().toString().contains("A message"));
assertTrue(logEvent.getContextData().containsKey("baggage.tenant.id"));
assertEquals("1234", logEvent.getContextData().getValue("baggage.tenant.id"));
}
@Override
protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.setHeader("OTEL_BAGGAGE_tenant.id", constant("1234"))
.routeId("start")
.log("A message")
.to("log:info");
}
};
}
}with a dedicated TestAppender for the log event assertions: package org.apache.camel.opentelemetry2;
import java.util.*;
import org.apache.logging.log4j.core.*;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.*;
@Plugin(name = "TestAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
public class TestAppender extends AbstractAppender {
private final List<LogEvent> events = new ArrayList<>();
protected TestAppender(String name) {
super(name, null, null, true, null);
}
@PluginFactory
public static TestAppender createAppender(@PluginAttribute("name") String name) {
return new TestAppender(name);
}
@Override
public void append(LogEvent event) {
events.add(event.toImmutable());
}
public List<LogEvent> getEvents() {
return events;
}
public void clear() {
events.clear();
}
}and the following additions to slf4j.properties: packages=org.apache.camel.opentelemetry2
appender.test.type = TestAppender
appender.test.name = test
rootLogger.appenderRef.test.ref=test
...For this to work, additional dependencies must be added, as well as an additional system property for the test: <dependencies>
...
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-log4j-context-data-2.17-autoconfigure</artifactId>
<version>${opentelemetry-log4j2-version}</version>
<scope>runtime</scope>
</dependency>
...
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<otel.instrumentation.log4j-context-data.add-baggage>true</otel.instrumentation.log4j-context-data.add-baggage>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>The test fails: mvn test -Dtest=BaggageLoggingTest
...
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.camel.opentelemetry2.BaggageLoggingTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.392 s <<< FAILURE! -- in org.apache.camel.opentelemetry2.BaggageLoggingTest
[ERROR] org.apache.camel.opentelemetry2.BaggageLoggingTest.testRouteProgrammaticBaggageLogging -- Time elapsed: 0.200 s <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:158)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:139)
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:69)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:41)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:195)
at org.apache.camel.opentelemetry2.BaggageLoggingTest.checkLogs(BaggageLoggingTest.java:68)
at org.apache.camel.opentelemetry2.BaggageLoggingTest.testRouteProgrammaticBaggageLogging(BaggageLoggingTest.java:57)showing that the baggage is present in the Change to |
|
Sorry, I missed to include the |
That's because If it makes sense we could include an |
|
So the traceProcessors=true is indeed a requirement for this to work, can we agree on that? |
Sure. It's a requirement if you need to include the baggage in a processor such as |
|
I don't think it is our specific use case. Isn't it quite common for custom processors to use the logging api? If such a custom processor is not running in a separate span, any baggage will not lost in the logs. public class CustomProcessor {
private static final Logger LOG = LoggerFactory.getLogger(CustomProcessor.class);
public void customLogic() {
// ...
LOG.info("A message"); // This logging would not include baggage, if traceProcessors=false
}
} from("direct:start")
.setHeader("OTEL_BAGGAGE_tenant.id", constant("1234"))
.routeId("start")
.bean(new CustomProcessor())Being able to explicitly list (using |
Baggageis a way to attach key-value metadata to a request and carry it across service boundaries. In the context of OpenTelemetry, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide anyBaggageinformation via header settings. Whenever the component finds an header defined asOTEL_BAGGAGE_xyzit will consider it as a baggage variable namedxyz. For example, in Java DSL:Closes CAMEL-23349
Description
Target
mainbranch)Tracking
Apache Camel coding standards and style
mvn clean install -DskipTestslocally from root folder and I have committed all auto-generated changes.