Skip to content
26 changes: 26 additions & 0 deletions java-bigquery/google-cloud-bigquery-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@
<relocation>
<pattern>io</pattern>
<shadedPattern>com.google.bqjdbc.shaded.io</shadedPattern>
<excludes>
<!--
OpenTelemetry API and Context must remain unshaded to ensure interoperability.
Shading these would prevent the driver from participating in the application's
existing tracing context. We are aware that unshaded dependencies can lead to
version mismatches, but this is a necessary trade-off for the OpenTelemetry
integration to function correctly across different applications.
-->
<exclude>io.opentelemetry.api.*</exclude>
<exclude>io.opentelemetry.context.*</exclude>
</excludes>
</relocation>
</relocations>
<filters>
Expand Down Expand Up @@ -276,6 +287,16 @@
<artifactId>httpcore5</artifactId>
</dependency>

<!-- OpenTelemetry APIs (unshaded) -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -323,6 +344,11 @@
<artifactId>junit-platform-suite-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
Expand Down Expand Up @@ -239,29 +240,31 @@ public boolean next() throws SQLException {
|| this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) {
/* Start of iteration or we have exhausted the current batch */
// Advance the cursor. Potentially blocking operation.
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
try (Scope scope = makeOriginalContextCurrent()) {
BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
if (batchWrapper.getException() != null) {
throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
}
if (batchWrapper.isLast()) {
/* Marks the end of the records */
if (this.vectorSchemaRoot != null) {
// IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
// the last batch
this.vectorSchemaRoot.clear();
}
this.hasReachedEnd = true;
this.rowCount++;
return false;
}
this.hasReachedEnd = true;
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return false;
return true;
}
// Valid batch, process it
ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
// Populates vectorSchemaRoot
this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
// Pointing to the first row in this fresh batch
this.currentBatchRowIndex = 0;
this.rowCount++;
return true;
}
// There are rows left in the current batch.
else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.google.cloud.bigquery.exception.BigQueryConversionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
Expand Down Expand Up @@ -58,6 +62,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet
protected boolean isClosed = false;
protected boolean wasNull = false;
protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE;
protected final SpanContext originalSpanContext;

protected BigQueryBaseResultSet(
BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) {
Expand All @@ -66,6 +71,11 @@ protected BigQueryBaseResultSet(
this.schema = schema;
this.schemaFieldList = schema != null ? schema.getFields() : null;
this.isNested = isNested;
this.originalSpanContext = Span.current().getSpanContext();
}

protected Scope makeOriginalContextCurrent() {
return Context.current().with(Span.wrap(this.originalSpanContext)).makeCurrent();
}

public QueryStatistics getQueryStatistics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.http.HttpTransportOptions;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.io.InputStream;
import java.sql.CallableStatement;
Expand Down Expand Up @@ -141,6 +143,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Long connectionPoolSize;
Long listenerPoolSize;
String partnerToken;
Boolean enableGcpTraceExporter;
Boolean enableGcpLogExporter;
OpenTelemetry customOpenTelemetry;
Tracer tracer =
OpenTelemetry.noop().getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME);
DatabaseMetaData databaseMetaData;
Boolean reqGoogleDriveScope;

Expand Down Expand Up @@ -267,6 +274,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.partnerToken = ds.getPartnerToken();

this.headerProvider = createHeaderProvider();
this.enableGcpTraceExporter = ds.getEnableGcpTraceExporter();
this.enableGcpLogExporter = ds.getEnableGcpLogExporter();
this.customOpenTelemetry = ds.getCustomOpenTelemetry();
this.bigQuery = getBigQueryConnection();
}
}
Expand Down Expand Up @@ -1042,6 +1052,14 @@ private BigQuery getBigQueryConnection() {
bigQueryOptions.setTransportOptions(this.httpTransportOptions);
}

OpenTelemetry openTelemetry =
BigQueryJdbcOpenTelemetry.getOpenTelemetry(
this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);
if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry);
bigQueryOptions.setOpenTelemetryTracer(this.tracer);
}

BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build();
options.setDefaultJobCreationMode(
this.useStatelessQueryMode
Expand Down Expand Up @@ -1090,6 +1108,13 @@ private BigQueryReadClient getBigQueryReadClientConnection() throws IOException

bigQueryReadSettings.setTransportChannelProvider(activeProvider);

OpenTelemetry openTelemetry =
BigQueryJdbcOpenTelemetry.getOpenTelemetry(
this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);
if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
bigQueryReadSettings.setOpenTelemetryTracerProvider(openTelemetry.getTracerProvider());
}

return BigQueryReadClient.create(bigQueryReadSettings.build());
}

Expand Down Expand Up @@ -1193,4 +1218,8 @@ public CallableStatement prepareCall(
}
return prepareCall(sql);
}

public Tracer getTracer() {
return this.tracer;
}
}
Loading
Loading