diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
index 4ce42dd0ea6c..f2c75f9bd5cf 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
+++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml
@@ -132,6 +132,17 @@
io
com.google.bqjdbc.shaded.io
+
+
+ io.opentelemetry.api.*
+ io.opentelemetry.context.*
+
@@ -276,6 +287,16 @@
httpcore5
+
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
+
junit
@@ -323,6 +344,11 @@
junit-platform-suite-engine
test
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java
index af041dc2a649..d15e6deb2ee4 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java
@@ -27,6 +27,7 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
@@ -239,29 +240,31 @@ public boolean next() throws SQLException {
|| this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) {
/* Start of iteration or we have exhausted the current batch */
// Advance the cursor. Potentially blocking operation.
- BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
- if (batchWrapper.getException() != null) {
- throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
- }
- if (batchWrapper.isLast()) {
- /* Marks the end of the records */
- if (this.vectorSchemaRoot != null) {
- // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
- // the last batch
- this.vectorSchemaRoot.clear();
+ try (Scope scope = makeOriginalContextCurrent()) {
+ BigQueryArrowBatchWrapper batchWrapper = this.buffer.take();
+ if (batchWrapper.getException() != null) {
+ throw new BigQueryJdbcRuntimeException(batchWrapper.getException());
+ }
+ if (batchWrapper.isLast()) {
+ /* Marks the end of the records */
+ if (this.vectorSchemaRoot != null) {
+ // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds
+ // the last batch
+ this.vectorSchemaRoot.clear();
+ }
+ this.hasReachedEnd = true;
+ this.rowCount++;
+ return false;
}
- this.hasReachedEnd = true;
+ // Valid batch, process it
+ ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
+ // Populates vectorSchemaRoot
+ this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
+ // Pointing to the first row in this fresh batch
+ this.currentBatchRowIndex = 0;
this.rowCount++;
- return false;
+ return true;
}
- // Valid batch, process it
- ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch();
- // Populates vectorSchemaRoot
- this.arrowDeserializer.deserializeArrowBatch(arrowBatch);
- // Pointing to the first row in this fresh batch
- this.currentBatchRowIndex = 0;
- this.rowCount++;
- return true;
}
// There are rows left in the current batch.
else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) {
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java
index d63b72bb6c93..1fedd00e74e3 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java
@@ -27,6 +27,10 @@
import com.google.cloud.bigquery.exception.BigQueryConversionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException;
import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
@@ -58,6 +62,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet
protected boolean isClosed = false;
protected boolean wasNull = false;
protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE;
+ protected final SpanContext originalSpanContext;
protected BigQueryBaseResultSet(
BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) {
@@ -66,6 +71,11 @@ protected BigQueryBaseResultSet(
this.schema = schema;
this.schemaFieldList = schema != null ? schema.getFields() : null;
this.isNested = isNested;
+ this.originalSpanContext = Span.current().getSpanContext();
+ }
+
+ protected Scope makeOriginalContextCurrent() {
+ return Context.current().with(Span.wrap(this.originalSpanContext)).makeCurrent();
}
public QueryStatistics getQueryStatistics() {
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
index 822b4e282f74..c703cee798a2 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java
@@ -41,6 +41,8 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.http.HttpTransportOptions;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
import java.io.IOException;
import java.io.InputStream;
import java.sql.CallableStatement;
@@ -141,6 +143,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
Long connectionPoolSize;
Long listenerPoolSize;
String partnerToken;
+ Boolean enableGcpTraceExporter;
+ Boolean enableGcpLogExporter;
+ OpenTelemetry customOpenTelemetry;
+ Tracer tracer =
+ OpenTelemetry.noop().getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME);
DatabaseMetaData databaseMetaData;
Boolean reqGoogleDriveScope;
@@ -267,6 +274,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.partnerToken = ds.getPartnerToken();
this.headerProvider = createHeaderProvider();
+ this.enableGcpTraceExporter = ds.getEnableGcpTraceExporter();
+ this.enableGcpLogExporter = ds.getEnableGcpLogExporter();
+ this.customOpenTelemetry = ds.getCustomOpenTelemetry();
this.bigQuery = getBigQueryConnection();
}
}
@@ -1042,6 +1052,14 @@ private BigQuery getBigQueryConnection() {
bigQueryOptions.setTransportOptions(this.httpTransportOptions);
}
+ OpenTelemetry openTelemetry =
+ BigQueryJdbcOpenTelemetry.getOpenTelemetry(
+ this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);
+ if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
+ this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry);
+ bigQueryOptions.setOpenTelemetryTracer(this.tracer);
+ }
+
BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build();
options.setDefaultJobCreationMode(
this.useStatelessQueryMode
@@ -1090,6 +1108,13 @@ private BigQueryReadClient getBigQueryReadClientConnection() throws IOException
bigQueryReadSettings.setTransportChannelProvider(activeProvider);
+ OpenTelemetry openTelemetry =
+ BigQueryJdbcOpenTelemetry.getOpenTelemetry(
+ this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry);
+ if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
+ bigQueryReadSettings.setOpenTelemetryTracerProvider(openTelemetry.getTracerProvider());
+ }
+
return BigQueryReadClient.create(bigQueryReadSettings.build());
}
@@ -1193,4 +1218,8 @@ public CallableStatement prepareCall(
}
return prepareCall(sql);
}
+
+ public Tracer getTracer() {
+ return this.tracer;
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
index e81601bd77ab..3a01a4ae0bae 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java
@@ -42,6 +42,13 @@
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -1708,8 +1715,16 @@ Comparator defineGetProcedureColumnsComparator(FieldList resultS
@Override
public ResultSet getTables(
- String catalog, String schemaPattern, String tableNamePattern, String[] types) {
+ String catalog, String schemaPattern, String tableNamePattern, String[] types)
+ throws SQLException {
+ return withTracing(
+ "BigQueryDatabaseMetaData.getTables",
+ () -> getTablesImpl(catalog, schemaPattern, tableNamePattern, types));
+ }
+ private ResultSet getTablesImpl(
+ String catalog, String schemaPattern, String tableNamePattern, String[] types)
+ throws SQLException {
Tuple effectiveIdentifiers =
determineEffectiveCatalogAndSchema(catalog, schemaPattern);
String effectiveCatalog = effectiveIdentifiers.x();
@@ -1727,161 +1742,195 @@ public ResultSet getTables(
"getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types));
+ final Schema resultSchema = defineGetTablesSchema();
+ final BlockingQueue queue =
+ new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread =
+ runGetTablesTaskAsync(
+ effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getTables");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetTablesTaskAsync(
+ String effectiveCatalog,
+ String effectiveSchemaPattern,
+ String tableNamePattern,
+ String[] types,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
final Set requestedTypes =
(types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types));
- final Schema resultSchema = defineGetTablesSchema();
final FieldList resultSchemaFields = resultSchema.getFields();
-
- final BlockingQueue queue =
- new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;
-
+ SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable tableFetcher =
() -> {
- ExecutorService apiExecutor = null;
- ExecutorService tableProcessorExecutor = null;
- final FieldList localResultSchemaFields = resultSchemaFields;
- final List>> apiFutures = new ArrayList<>();
- final List> processingFutures = new ArrayList<>();
+ Span backgroundSpan =
+ this.connection
+ .getTracer()
+ .spanBuilder("BigQueryDatabaseMetaData.getTables.background")
+ .setNoParent()
+ .addLink(parentSpanContext)
+ .startSpan();
+
+ try (Scope scope = backgroundSpan.makeCurrent()) {
+ ExecutorService apiExecutor = null;
+ ExecutorService tableProcessorExecutor = null;
+ final FieldList localResultSchemaFields = resultSchemaFields;
+ final List>> apiFutures = new ArrayList<>();
+ final List> processingFutures = new ArrayList<>();
- try {
- List datasetsToScan =
- findMatchingBigQueryObjects(
- "Dataset",
- () ->
- bigquery.listDatasets(
- catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
- (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)),
- (ds) -> ds.getDatasetId().getDataset(),
- schemaParam,
- schemaRegex,
- LOG);
+ try {
+ List datasetsToScan =
+ findMatchingBigQueryObjects(
+ "Dataset",
+ () ->
+ bigquery.listDatasets(
+ catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
+ (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)),
+ (ds) -> ds.getDatasetId().getDataset(),
+ schemaParam,
+ schemaRegex,
+ LOG);
- if (datasetsToScan.isEmpty()) {
- LOG.info("Fetcher thread found no matching datasets. Returning empty resultset.");
- return;
- }
+ if (datasetsToScan.isEmpty()) {
+ LOG.info("Fetcher thread found no matching datasets. Returning empty resultset.");
+ return;
+ }
- apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE);
- tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount);
+ apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE);
+ tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount);
- LOG.fine("Submitting parallel findMatchingTables tasks...");
- for (Dataset dataset : datasetsToScan) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.warning("Table fetcher interrupted during dataset iteration.");
- break;
+ LOG.fine("Submitting parallel findMatchingTables tasks...");
+ for (Dataset dataset : datasetsToScan) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warning("Table fetcher interrupted during dataset iteration.");
+ break;
+ }
+
+ final DatasetId currentDatasetId = dataset.getDatasetId();
+ Callable> apiCallable =
+ () ->
+ findMatchingBigQueryObjects(
+ "Table",
+ () ->
+ bigquery.listTables(
+ currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)),
+ (name) ->
+ bigquery.getTable(
+ TableId.of(
+ currentDatasetId.getProject(),
+ currentDatasetId.getDataset(),
+ name)),
+ (tbl) -> tbl.getTableId().getTable(),
+ tableNamePattern,
+ tableNameRegex,
+ LOG);
+
+ Callable> wrappedApiCallable = Context.current().wrap(apiCallable);
+ Future> apiFuture = apiExecutor.submit(wrappedApiCallable);
+ apiFutures.add(apiFuture);
}
+ LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks.");
+ apiExecutor.shutdown();
- final DatasetId currentDatasetId = dataset.getDatasetId();
- Callable> apiCallable =
- () ->
- findMatchingBigQueryObjects(
- "Table",
+ LOG.fine("Processing results from findMatchingTables tasks...");
+ for (Future> apiFuture : apiFutures) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warning("Table fetcher interrupted while processing API futures.");
+ break;
+ }
+ try {
+ List tablesResult = apiFuture.get();
+ if (tablesResult != null) {
+ for (Table table : tablesResult) {
+ if (Thread.currentThread().isInterrupted()) break;
+
+ final Table currentTable = table;
+ Runnable processRunnable =
() ->
- bigquery.listTables(
- currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)),
- (name) ->
- bigquery.getTable(
- TableId.of(
- currentDatasetId.getProject(),
- currentDatasetId.getDataset(),
- name)),
- (tbl) -> tbl.getTableId().getTable(),
- tableNamePattern,
- tableNameRegex,
- LOG);
- Future> apiFuture = apiExecutor.submit(apiCallable);
- apiFutures.add(apiFuture);
- }
- LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks.");
- apiExecutor.shutdown();
+ processTableInfo(
+ currentTable,
+ requestedTypes,
+ collectedResults,
+ localResultSchemaFields);
+ Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable);
+ Future> processFuture =
+ tableProcessorExecutor.submit(wrappedProcessRunnable);
+ processingFutures.add(processFuture);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warning("Fetcher thread interrupted while waiting for API future result.");
+ break;
+ } catch (ExecutionException e) {
+ LOG.warning(
+ "Error executing findMatchingTables task: "
+ + e.getMessage()
+ + ". Cause: "
+ + e.getCause());
+ } catch (CancellationException e) {
+ LOG.warning("A findMatchingTables task was cancelled.");
+ }
+ }
+
+ LOG.fine(
+ "Finished submitting " + processingFutures.size() + " processTableInfo tasks.");
- LOG.fine("Processing results from findMatchingTables tasks...");
- for (Future> apiFuture : apiFutures) {
if (Thread.currentThread().isInterrupted()) {
- LOG.warning("Table fetcher interrupted while processing API futures.");
- break;
+ LOG.warning(
+ "Fetcher interrupted before waiting for processing tasks; cancelling remaining.");
+ processingFutures.forEach(f -> f.cancel(true));
+ } else {
+ LOG.fine("Waiting for processTableInfo tasks to complete...");
+ waitForTasksCompletion(processingFutures);
+ LOG.fine("All processTableInfo tasks completed.");
}
- try {
- List tablesResult = apiFuture.get();
- if (tablesResult != null) {
- for (Table table : tablesResult) {
- if (Thread.currentThread().isInterrupted()) break;
- final Table currentTable = table;
- Future> processFuture =
- tableProcessorExecutor.submit(
- () ->
- processTableInfo(
- currentTable,
- requestedTypes,
- collectedResults,
- localResultSchemaFields));
- processingFutures.add(processFuture);
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warning("Fetcher thread interrupted while waiting for API future result.");
- break;
- } catch (ExecutionException e) {
- LOG.warning(
- "Error executing findMatchingTables task: "
- + e.getMessage()
- + ". Cause: "
- + e.getCause());
- } catch (CancellationException e) {
- LOG.warning("A findMatchingTables task was cancelled.");
+ if (!Thread.currentThread().isInterrupted()) {
+ Comparator comparator =
+ defineGetTablesComparator(localResultSchemaFields);
+ sortResults(collectedResults, comparator, "getTables", LOG);
}
- }
- LOG.fine(
- "Finished submitting " + processingFutures.size() + " processTableInfo tasks.");
+ if (!Thread.currentThread().isInterrupted()) {
+ populateQueue(collectedResults, queue, localResultSchemaFields);
+ }
- if (Thread.currentThread().isInterrupted()) {
- LOG.warning(
- "Fetcher interrupted before waiting for processing tasks; cancelling remaining.");
+ } catch (Throwable t) {
+ LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage());
+ apiFutures.forEach(f -> f.cancel(true));
processingFutures.forEach(f -> f.cancel(true));
- } else {
- LOG.fine("Waiting for processTableInfo tasks to complete...");
- waitForTasksCompletion(processingFutures);
- LOG.fine("All processTableInfo tasks completed.");
+ } finally {
+ signalEndOfData(queue, localResultSchemaFields);
+ shutdownExecutor(apiExecutor);
+ shutdownExecutor(tableProcessorExecutor);
+ LOG.info("Table fetcher thread finished.");
}
-
- if (!Thread.currentThread().isInterrupted()) {
- Comparator comparator =
- defineGetTablesComparator(localResultSchemaFields);
- sortResults(collectedResults, comparator, "getTables", LOG);
- }
-
- if (!Thread.currentThread().isInterrupted()) {
- populateQueue(collectedResults, queue, localResultSchemaFields);
- }
-
- } catch (Throwable t) {
- LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage());
- apiFutures.forEach(f -> f.cancel(true));
- processingFutures.forEach(f -> f.cancel(true));
} finally {
- signalEndOfData(queue, localResultSchemaFields);
- shutdownExecutor(apiExecutor);
- shutdownExecutor(tableProcessorExecutor);
- LOG.info("Table fetcher thread finished.");
+ backgroundSpan.end();
}
};
- Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
+ Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher);
+ Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog);
fetcherThread.start();
- LOG.info("Started background thread for getTables");
- return resultSet;
+ return fetcherThread;
}
Schema defineGetTablesSchema() {
@@ -1995,16 +2044,19 @@ Comparator defineGetTablesComparator(FieldList resultSchemaField
}
@Override
- public ResultSet getSchemas() {
+ public ResultSet getSchemas() throws SQLException {
LOG.info("getSchemas() called");
return getSchemas(null, null);
}
@Override
- public ResultSet getCatalogs() {
- LOG.info("getCatalogs() called");
+ public ResultSet getCatalogs() throws SQLException {
+ return withTracing("BigQueryDatabaseMetaData.getCatalogs", () -> getCatalogsImpl());
+ }
+ private ResultSet getCatalogsImpl() throws SQLException {
+ LOG.info("getCatalogs() called");
final List accessibleCatalogs = getAccessibleCatalogNames();
final Schema catalogsSchema = defineGetCatalogsSchema();
final FieldList schemaFields = catalogsSchema.getFields();
@@ -2073,8 +2125,16 @@ static List prepareGetTableTypesRows(Schema schema) {
@Override
public ResultSet getColumns(
- String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) {
+ String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
+ throws SQLException {
+ return withTracing(
+ "BigQueryDatabaseMetaData.getColumns",
+ () -> getColumnsImpl(catalog, schemaPattern, tableNamePattern, columnNamePattern));
+ }
+ private ResultSet getColumnsImpl(
+ String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
+ throws SQLException {
Tuple effectiveIdentifiers =
determineEffectiveCatalogAndSchema(catalog, schemaPattern);
String effectiveCatalog = effectiveIdentifiers.x();
@@ -2094,120 +2154,160 @@ public ResultSet getColumns(
+ " columnNamePattern: %s",
effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern);
+ final Schema resultSchema = defineGetColumnsSchema();
+ final BlockingQueue queue =
+ new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread =
+ runGetColumnsTaskAsync(
+ effectiveCatalog,
+ effectiveSchemaPattern,
+ tableNamePattern,
+ columnNamePattern,
+ resultSchema,
+ queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getColumns");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetColumnsTaskAsync(
+ String effectiveCatalog,
+ String effectiveSchemaPattern,
+ String tableNamePattern,
+ String columnNamePattern,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern);
Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern);
Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern);
- final Schema resultSchema = defineGetColumnsSchema();
final FieldList resultSchemaFields = resultSchema.getFields();
- final BlockingQueue queue =
- new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = effectiveCatalog;
final String schemaParam = effectiveSchemaPattern;
+ SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable columnFetcher =
() -> {
- ExecutorService columnExecutor = null;
- final List> taskFutures = new ArrayList<>();
- final FieldList localResultSchemaFields = resultSchemaFields;
-
- try {
- List datasetsToScan =
- findMatchingBigQueryObjects(
- "Dataset",
- () ->
- bigquery.listDatasets(
- catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
- (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)),
- (ds) -> ds.getDatasetId().getDataset(),
- schemaParam,
- schemaRegex,
- LOG);
+ Span backgroundSpan =
+ this.connection
+ .getTracer()
+ .spanBuilder("BigQueryDatabaseMetaData.getColumns.background")
+ .setNoParent()
+ .addLink(parentSpanContext)
+ .startSpan();
+
+ try (Scope scope = backgroundSpan.makeCurrent()) {
+ ExecutorService columnExecutor = null;
+ final List> taskFutures = new ArrayList<>();
+ final FieldList localResultSchemaFields = resultSchemaFields;
- if (datasetsToScan.isEmpty()) {
- LOG.info("Fetcher thread found no matching datasets. Returning empty resultset.");
- return;
- }
-
- columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount);
-
- for (Dataset dataset : datasetsToScan) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.warning("Fetcher interrupted during dataset iteration.");
- break;
- }
-
- DatasetId datasetId = dataset.getDatasetId();
- LOG.info("Processing dataset: " + datasetId.getDataset());
-
- List tablesToScan =
+ try {
+ List datasetsToScan =
findMatchingBigQueryObjects(
- "Table",
+ "Dataset",
() ->
- bigquery.listTables(
- datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)),
- (name) ->
- bigquery.getTable(
- TableId.of(datasetId.getProject(), datasetId.getDataset(), name)),
- (tbl) -> tbl.getTableId().getTable(),
- tableNamePattern,
- tableNameRegex,
+ bigquery.listDatasets(
+ catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
+ (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)),
+ (ds) -> ds.getDatasetId().getDataset(),
+ schemaParam,
+ schemaRegex,
LOG);
- for (Table table : tablesToScan) {
+ if (datasetsToScan.isEmpty()) {
+ LOG.info("Fetcher thread found no matching datasets. Returning empty resultset.");
+ return;
+ }
+
+ columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount);
+
+ for (Dataset dataset : datasetsToScan) {
if (Thread.currentThread().isInterrupted()) {
- LOG.warning(
- "Fetcher interrupted during table iteration for dataset "
- + datasetId.getDataset());
+ LOG.warning("Fetcher interrupted during dataset iteration.");
break;
}
- TableId tableId = table.getTableId();
- LOG.fine("Submitting task for table: " + tableId);
- final Table finalTable = table;
- Future> future =
- columnExecutor.submit(
+ DatasetId datasetId = dataset.getDatasetId();
+ LOG.info("Processing dataset: " + datasetId.getDataset());
+
+ List tablesToScan =
+ findMatchingBigQueryObjects(
+ "Table",
() ->
- processTableColumns(
- finalTable,
- columnNameRegex,
- collectedResults,
- localResultSchemaFields));
- taskFutures.add(future);
+ bigquery.listTables(
+ datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)),
+ (name) ->
+ bigquery.getTable(
+ TableId.of(datasetId.getProject(), datasetId.getDataset(), name)),
+ (tbl) -> tbl.getTableId().getTable(),
+ tableNamePattern,
+ tableNameRegex,
+ LOG);
+
+ for (Table table : tablesToScan) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warning(
+ "Fetcher interrupted during table iteration for dataset "
+ + datasetId.getDataset());
+ break;
+ }
+
+ TableId tableId = table.getTableId();
+ LOG.fine("Submitting task for table: " + tableId);
+ final Table finalTable = table;
+
+ Runnable columnTask =
+ () ->
+ processTableColumns(
+ finalTable,
+ columnNameRegex,
+ collectedResults,
+ localResultSchemaFields);
+ Runnable wrappedColumnTask = Context.current().wrap(columnTask);
+ Future> future = columnExecutor.submit(wrappedColumnTask);
+ taskFutures.add(future);
+ }
+ if (Thread.currentThread().isInterrupted()) break;
}
- if (Thread.currentThread().isInterrupted()) break;
- }
- waitForTasksCompletion(taskFutures);
+ waitForTasksCompletion(taskFutures);
- if (!Thread.currentThread().isInterrupted()) {
- Comparator comparator =
- defineGetColumnsComparator(localResultSchemaFields);
- sortResults(collectedResults, comparator, "getColumns", LOG);
- }
+ if (!Thread.currentThread().isInterrupted()) {
+ Comparator comparator =
+ defineGetColumnsComparator(localResultSchemaFields);
+ sortResults(collectedResults, comparator, "getColumns", LOG);
+ }
- if (!Thread.currentThread().isInterrupted()) {
- populateQueue(collectedResults, queue, localResultSchemaFields);
- }
+ if (!Thread.currentThread().isInterrupted()) {
+ populateQueue(collectedResults, queue, localResultSchemaFields);
+ }
- } catch (Throwable t) {
- LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage());
- taskFutures.forEach(f -> f.cancel(true));
+ } catch (Throwable t) {
+ LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage());
+ taskFutures.forEach(f -> f.cancel(true));
+ } finally {
+ signalEndOfData(queue, localResultSchemaFields);
+ shutdownExecutor(columnExecutor);
+ LOG.info("Column fetcher thread finished.");
+ }
} finally {
- signalEndOfData(queue, localResultSchemaFields);
- shutdownExecutor(columnExecutor);
- LOG.info("Column fetcher thread finished.");
+ backgroundSpan.end();
}
};
- Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
+ Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher);
+ Thread fetcherThread =
+ new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog);
fetcherThread.start();
- LOG.info("Started background thread for getColumns");
- return resultSet;
+ return fetcherThread;
}
private void processTableColumns(
@@ -2274,7 +2374,7 @@ private void processTableColumns(
}
}
- private Schema defineGetColumnsSchema() {
+ Schema defineGetColumnsSchema() {
List fields = new ArrayList<>(24);
fields.add(
Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING)
@@ -3626,7 +3726,12 @@ public RowIdLifetime getRowIdLifetime() {
}
@Override
- public ResultSet getSchemas(String catalog, String schemaPattern) {
+ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException {
+ return withTracing(
+ "BigQueryDatabaseMetaData.getSchemas", () -> getSchemasImpl(catalog, schemaPattern));
+ }
+
+ private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQLException {
if ((catalog != null && catalog.isEmpty())
|| (schemaPattern != null && schemaPattern.isEmpty())) {
LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string.");
@@ -3635,99 +3740,126 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {
LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern);
- final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
final Schema resultSchema = defineGetSchemasSchema();
- final FieldList resultSchemaFields = resultSchema.getFields();
-
final BlockingQueue queue =
new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
+
+ Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue);
+
+ BigQueryJsonResultSet resultSet =
+ BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
+
+ LOG.info("Started background thread for getSchemas");
+ return resultSet;
+ }
+
+ @VisibleForTesting
+ Thread runGetSchemasTaskAsync(
+ String catalog,
+ String schemaPattern,
+ Schema resultSchema,
+ BlockingQueue queue)
+ throws SQLException {
+
+ final Pattern schemaRegex = compileSqlLikePattern(schemaPattern);
+ final FieldList resultSchemaFields = resultSchema.getFields();
final List collectedResults = Collections.synchronizedList(new ArrayList<>());
final String catalogParam = catalog;
+ SpanContext parentSpanContext = Span.current().getSpanContext();
Runnable schemaFetcher =
() -> {
- final FieldList localResultSchemaFields = resultSchemaFields;
- List projectsToScanList = new ArrayList<>();
-
- if (catalogParam != null) {
- projectsToScanList.add(catalogParam);
- } else {
- projectsToScanList.addAll(getAccessibleCatalogNames());
- }
-
- if (projectsToScanList.isEmpty()) {
- LOG.info(
- "No valid projects to scan (primary, specified, or additional). Returning empty"
- + " resultset.");
- return;
- }
-
- try {
- for (String currentProjectToScan : projectsToScanList) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.warning(
- "Schema fetcher interrupted during project iteration for project: "
- + currentProjectToScan);
- break;
- }
- LOG.info("Fetching schemas for project: " + currentProjectToScan);
- List datasetsInProject =
- findMatchingBigQueryObjects(
- "Dataset",
- () ->
- bigquery.listDatasets(
- currentProjectToScan,
- BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
- (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)),
- (ds) -> ds.getDatasetId().getDataset(),
- schemaPattern,
- schemaRegex,
- LOG);
+ Span backgroundSpan =
+ this.connection
+ .getTracer()
+ .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background")
+ .setNoParent()
+ .addLink(parentSpanContext)
+ .startSpan();
+
+ try (Scope scope = backgroundSpan.makeCurrent()) {
+ final FieldList localResultSchemaFields = resultSchemaFields;
+ List projectsToScanList = new ArrayList<>();
+
+ if (catalogParam != null) {
+ projectsToScanList.add(catalogParam);
+ } else {
+ projectsToScanList.addAll(getAccessibleCatalogNames());
+ }
- if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) {
- LOG.info(
- "Fetcher thread found no matching datasets in project: "
- + currentProjectToScan);
- continue;
- }
+ if (projectsToScanList.isEmpty()) {
+ LOG.info(
+ "No valid projects to scan (primary, specified, or additional). Returning empty"
+ + " resultset.");
+ return;
+ }
- LOG.fine("Processing found datasets for project: " + currentProjectToScan);
- for (Dataset dataset : datasetsInProject) {
+ try {
+ for (String currentProjectToScan : projectsToScanList) {
if (Thread.currentThread().isInterrupted()) {
LOG.warning(
- "Schema fetcher interrupted during dataset iteration for project: "
+ "Schema fetcher interrupted during project iteration for project: "
+ currentProjectToScan);
break;
}
- processSchemaInfo(dataset, collectedResults, localResultSchemaFields);
+ LOG.info("Fetching schemas for project: " + currentProjectToScan);
+ List datasetsInProject =
+ findMatchingBigQueryObjects(
+ "Dataset",
+ () ->
+ bigquery.listDatasets(
+ currentProjectToScan,
+ BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)),
+ (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)),
+ (ds) -> ds.getDatasetId().getDataset(),
+ schemaPattern,
+ schemaRegex,
+ LOG);
+
+ if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) {
+ LOG.info(
+ "Fetcher thread found no matching datasets in project: "
+ + currentProjectToScan);
+ continue;
+ }
+
+ LOG.fine("Processing found datasets for project: " + currentProjectToScan);
+ for (Dataset dataset : datasetsInProject) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warning(
+ "Schema fetcher interrupted during dataset iteration for project: "
+ + currentProjectToScan);
+ break;
+ }
+ processSchemaInfo(dataset, collectedResults, localResultSchemaFields);
+ }
}
- }
- if (!Thread.currentThread().isInterrupted()) {
- Comparator comparator =
- defineGetSchemasComparator(localResultSchemaFields);
- sortResults(collectedResults, comparator, "getSchemas", LOG);
- }
+ if (!Thread.currentThread().isInterrupted()) {
+ Comparator comparator =
+ defineGetSchemasComparator(localResultSchemaFields);
+ sortResults(collectedResults, comparator, "getSchemas", LOG);
+ }
- if (!Thread.currentThread().isInterrupted()) {
- populateQueue(collectedResults, queue, localResultSchemaFields);
- }
+ if (!Thread.currentThread().isInterrupted()) {
+ populateQueue(collectedResults, queue, localResultSchemaFields);
+ }
- } catch (Throwable t) {
- LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage());
+ } catch (Throwable t) {
+ LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage());
+ } finally {
+ signalEndOfData(queue, localResultSchemaFields);
+ LOG.info("Schema fetcher thread finished.");
+ }
} finally {
- signalEndOfData(queue, localResultSchemaFields);
- LOG.info("Schema fetcher thread finished.");
+ backgroundSpan.end();
}
};
- Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog);
- BigQueryJsonResultSet resultSet =
- BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
-
+ Runnable wrappedFetcher = Context.current().wrap(schemaFetcher);
+ Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog);
fetcherThread.start();
- LOG.info("Started background thread for getSchemas");
- return resultSet;
+ return fetcherThread;
}
Schema defineGetSchemasSchema() {
@@ -5304,4 +5436,23 @@ private void loadDriverVersionProperties() {
throw ex;
}
}
+
+ private interface TracedMetadataOperation {
+ T run() throws SQLException;
+ }
+
+ private T withTracing(String spanName, TracedMetadataOperation operation)
+ throws SQLException {
+ Tracer tracer = this.connection.getTracer();
+ Span span = tracer.spanBuilder(spanName).startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ return operation.run();
+ } catch (Exception ex) {
+ span.recordException(ex);
+ span.setStatus(StatusCode.ERROR, ex.getMessage());
+ throw ex;
+ } finally {
+ span.end();
+ }
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
index e62d93fca0ed..a83bdc5093e6 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java
@@ -20,6 +20,7 @@
import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException;
import io.grpc.LoadBalancerRegistry;
import io.grpc.internal.PickFirstLoadBalancerProvider;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
@@ -127,9 +128,12 @@ public Connection connect(String url, Properties info) throws SQLException {
LOG.finest("++enter++");
try {
if (acceptsURL(url)) {
- // strip 'jdbc:' from the URL, add any extra properties
+ Properties connectInfo = info == null ? new Properties() : (Properties) info.clone();
+ Object customOpenTelemetryObj = connectInfo.remove("customOpenTelemetry");
+
String connectionUri =
- BigQueryJdbcUrlUtility.appendPropertiesToURL(url.substring(5), this.toString(), info);
+ BigQueryJdbcUrlUtility.appendPropertiesToURL(
+ url.substring(5), this.toString(), connectInfo);
try {
BigQueryJdbcUrlUtility.parseUrl(connectionUri);
} catch (BigQueryJdbcRuntimeException e) {
@@ -137,6 +141,9 @@ public Connection connect(String url, Properties info) throws SQLException {
}
DataSource ds = DataSource.fromUrl(connectionUri);
+ if (customOpenTelemetryObj instanceof OpenTelemetry) {
+ ds.setCustomOpenTelemetry((OpenTelemetry) customOpenTelemetryObj);
+ }
// LogLevel
String logLevelStr = ds.getLogLevel();
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java
new file mode 100644
index 000000000000..181e15629c5b
--- /dev/null
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.jdbc;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
+
+public class BigQueryJdbcOpenTelemetry {
+
+ static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc";
+
+ private BigQueryJdbcOpenTelemetry() {}
+
+ /**
+ * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer
+ * customOpenTelemetry if provided; fallback to an auto-configured GCP exporter if requested.
+ */
+ public static OpenTelemetry getOpenTelemetry(
+ boolean enableGcpTraceExporter,
+ boolean enableGcpLogExporter,
+ OpenTelemetry customOpenTelemetry) {
+ if (customOpenTelemetry != null) {
+ return customOpenTelemetry;
+ }
+
+ if (enableGcpTraceExporter || enableGcpLogExporter) {
+ // TODO(b/491238299): Initialize and return GCP-specific auto-configured SDK
+ return OpenTelemetry.noop();
+ }
+
+ return OpenTelemetry.noop();
+ }
+
+ /** Gets a Tracer for the JDBC driver instrumentation scope. */
+ public static Tracer getTracer(OpenTelemetry openTelemetry) {
+ return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME);
+ }
+}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
index afc300a6d97c..89a215c6c689 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java
@@ -159,6 +159,10 @@ protected boolean removeEldestEntry(Map.Entry> eldes
static final int DEFAULT_SWA_APPEND_ROW_COUNT_VALUE = 1000;
static final String SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME = "SWA_ActivationRowCount";
static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3;
+ static final String ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME = "enableGcpTraceExporter";
+ static final boolean DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE = false;
+ static final String ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME = "enableGcpLogExporter";
+ static final boolean DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE = false;
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName());
static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME =
@@ -604,6 +608,18 @@ protected boolean removeEldestEntry(Map.Entry> eldes
.setDescription(
"Reason for the request, which is passed as the x-goog-request-reason"
+ " header.")
+ .build(),
+ BigQueryConnectionProperty.newBuilder()
+ .setName(ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME)
+ .setDescription(
+ "Enables or disables GCP OpenTelemetry Trace exporter. Disabled by default.")
+ .setDefaultValue(String.valueOf(DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE))
+ .build(),
+ BigQueryConnectionProperty.newBuilder()
+ .setName(ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME)
+ .setDescription(
+ "Enables or disables GCP OpenTelemetry Log exporter. Disabled by default.")
+ .setDefaultValue(String.valueOf(DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE))
.build())));
private static final List NETWORK_PROPERTIES =
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
index c2bcdc96a6c1..6c9de7377ebf 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java
@@ -57,6 +57,14 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import java.lang.ref.ReferenceQueue;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -239,7 +247,12 @@ public ResultSet executeQuery(String sql) throws SQLException {
BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) {
LOG.finest("++enter++");
checkClosed();
- return executeQueryImpl(sql);
+ return withTracing(
+ "BigQueryStatement.executeQuery",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ return executeQueryImpl(sql);
+ });
}
}
@@ -266,7 +279,12 @@ public long executeLargeUpdate(String sql) throws SQLException {
BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) {
LOG.finest("++enter++");
checkClosed();
- return executeLargeUpdateImpl(sql);
+ return withTracing(
+ "BigQueryStatement.executeLargeUpdate",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ return executeLargeUpdateImpl(sql);
+ });
}
}
@@ -312,7 +330,12 @@ public boolean execute(String sql) throws SQLException {
BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) {
LOG.finest("++enter++");
checkClosed();
- return executeImpl(sql);
+ return withTracing(
+ "BigQueryStatement.execute",
+ (span) -> {
+ span.setAttribute("db.statement", sql);
+ return executeImpl(sql);
+ });
}
}
@@ -853,81 +876,85 @@ Thread populateArrowBufferedQueue(
LOG.finest("++enter++");
Runnable arrowStreamProcessor =
- () -> {
- long rowsRead = 0;
- int retryCount = 0;
- try {
- // Use the first stream to perform reading.
- String streamName = readSession.getStreams(0).getName();
-
- while (true) {
- try {
- ReadRowsRequest readRowsRequest =
- ReadRowsRequest.newBuilder()
- .setReadStream(streamName)
- .setOffset(rowsRead)
- .build();
-
- // Process each block of rows as they arrive and decode using our simple row reader.
- com.google.api.gax.rpc.ServerStream stream =
- bqReadClient.readRowsCallable().call(readRowsRequest);
- for (ReadRowsResponse response : stream) {
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- break;
- }
-
- ArrowRecordBatch currentBatch = response.getArrowRecordBatch();
- Uninterruptibles.putUninterruptibly(
- arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch));
- rowsRead += response.getRowCount();
- }
- break;
- } catch (com.google.api.gax.rpc.ApiException e) {
- if (e.getStatusCode().getCode()
- == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
- LOG.warning("Read session expired or not found: %s", e.getMessage());
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- break;
- }
- if (retryCount >= MAX_RETRY_COUNT) {
- LOG.log(
- Level.SEVERE,
- "\n"
- + Thread.currentThread().getName()
- + " Interrupted @ arrowStreamProcessor, max retries exceeded",
- e);
- enqueueError(arrowBatchWrapperBlockingQueue, e);
- break;
- }
- retryCount++;
- LOG.warning(
- "Connection interrupted during arrow stream read, retrying. attempt: %d",
- retryCount);
- Thread.sleep(RETRY_DELAY_MS);
- }
+ Context.current()
+ .wrap(
+ () ->
+ processArrowStream(readSession, arrowBatchWrapperBlockingQueue, bqReadClient));
+
+ Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor);
+ populateBufferWorker.start();
+ return populateBufferWorker;
+ }
+
+ private void processArrowStream(
+ ReadSession readSession,
+ BlockingQueue arrowBatchWrapperBlockingQueue,
+ BigQueryReadClient bqReadClient) {
+ long rowsRead = 0;
+ int retryCount = 0;
+ try {
+ // Use the first stream to perform reading.
+ String streamName = readSession.getStreams(0).getName();
+
+ while (true) {
+ try {
+ ReadRowsRequest readRowsRequest =
+ ReadRowsRequest.newBuilder().setReadStream(streamName).setOffset(rowsRead).build();
+
+ // Process each block of rows as they arrive and decode using our simple row
+ // reader.
+ com.google.api.gax.rpc.ServerStream stream =
+ bqReadClient.readRowsCallable().call(readRowsRequest);
+ for (ReadRowsResponse response : stream) {
+ if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
+ break;
}
- } catch (InterruptedException e) {
- LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
- e);
+ ArrowRecordBatch currentBatch = response.getArrowRecordBatch();
+ Uninterruptibles.putUninterruptibly(
+ arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch));
+ rowsRead += response.getRowCount();
+ }
+ break;
+ } catch (com.google.api.gax.rpc.ApiException e) {
+ if (e.getStatusCode().getCode() == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) {
+ LOG.warning("Read session expired or not found: %s", e.getMessage());
enqueueError(arrowBatchWrapperBlockingQueue, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
+ break;
+ }
+ if (retryCount >= MAX_RETRY_COUNT) {
LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
+ Level.SEVERE,
+ "\n"
+ + Thread.currentThread().getName()
+ + " Interrupted @ arrowStreamProcessor, max retries exceeded",
e);
enqueueError(arrowBatchWrapperBlockingQueue, e);
- } finally { // logic needed for graceful shutdown
- enqueueEndOfStream(arrowBatchWrapperBlockingQueue);
+ break;
}
- };
+ retryCount++;
+ LOG.warning(
+ "Connection interrupted during arrow stream read, retrying. attempt: %d", retryCount);
+ Thread.sleep(RETRY_DELAY_MS);
+ }
+ }
- Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor);
- populateBufferWorker.start();
- return populateBufferWorker;
+ } catch (InterruptedException e) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
+ e);
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
+ e);
+ enqueueError(arrowBatchWrapperBlockingQueue, e);
+ } finally { // logic needed for graceful shutdown
+ enqueueEndOfStream(arrowBatchWrapperBlockingQueue);
+ }
}
/** Executes SQL query using either fast query path or read API */
@@ -1068,57 +1095,18 @@ Thread runNextPageTaskAsync(
BlockingQueue> rpcResponseQueue,
BlockingQueue bigQueryFieldValueListWrapperBlockingQueue) {
LOG.finest("++enter++");
- // parse and put the first page in the pageCache before the other pages are parsed from the RPC
- // calls
populateFirstPage(result, rpcResponseQueue);
- // This thread makes the RPC calls and paginates
Runnable nextPageTask =
- () -> {
- String currentPageToken = firstPageToken;
- TableResult currentResults = result;
- TableId destinationTable = null;
- if (firstPageToken != null) {
- destinationTable = getDestinationTable(jobId);
- }
-
- try {
- while (currentPageToken != null) {
- // do not process further pages and shutdown
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- LOG.warning(
- "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
- break;
- }
-
- long startTime = System.nanoTime();
- currentResults =
- this.bigQuery.listTableData(
- destinationTable,
- TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
- TableDataListOption.pageToken(currentPageToken));
-
- currentPageToken = currentResults.getNextPageToken();
- // this will be parsed asynchronously without blocking the current
- // thread
- Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true));
- LOG.fine(
- "Fetched %d results from the server in %d ms.",
- querySettings.getMaxResultPerPage(),
- (int) ((System.nanoTime() - startTime) / 1000000));
- }
- } catch (Exception ex) {
- Uninterruptibles.putUninterruptibly(
- bigQueryFieldValueListWrapperBlockingQueue,
- BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
- } finally {
- // this will stop the parseDataTask as well when the pagination
- // completes
- Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
- }
- // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
- // have finished processing the records and even that will be interrupted
- };
+ Context.current()
+ .wrap(
+ () ->
+ fetchNextPages(
+ firstPageToken,
+ jobId,
+ rpcResponseQueue,
+ bigQueryFieldValueListWrapperBlockingQueue,
+ result));
Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask);
nextPageWorker.start();
@@ -1137,63 +1125,11 @@ Thread parseAndPopulateRpcDataAsync(
LOG.finest("++enter++");
Runnable populateBufferRunnable =
- () -> { // producer thread populating the buffer
- try {
- Iterable fieldValueLists;
- // as we have to process the first page
- boolean hasRows = true;
- while (hasRows) {
- try {
- Tuple nextPageTuple = rpcResponseQueue.take();
- if (nextPageTuple.x() != null) {
- fieldValueLists = nextPageTuple.x().getValues();
- } else {
- fieldValueLists = null;
- }
- hasRows = nextPageTuple.y();
-
- } catch (InterruptedException e) {
- LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
- // Thread might get interrupted while calling the Cancel method, which is
- // expected, so logging this instead of throwing the exception back
- break;
- }
-
- if (Thread.currentThread().isInterrupted()
- || queryTaskExecutor.isShutdown()
- || fieldValueLists == null) {
- // do not process further pages and shutdown (outerloop)
- break;
- }
-
- long startTime = System.nanoTime();
- long results = 0;
- for (FieldValueList fieldValueList : fieldValueLists) {
-
- if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
- // do not process further pages and shutdown (inner loop)
- break;
- }
- Uninterruptibles.putUninterruptibly(
- bigQueryFieldValueListWrapperBlockingQueue,
- BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
- results += 1;
- }
- LOG.fine(
- "Processed %d results in %d ms.",
- results, (int) ((System.nanoTime() - startTime) / 1000000));
- }
-
- } catch (Exception ex) {
- LOG.log(
- Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
- ex);
- enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
- } finally {
- enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue);
- }
- };
+ Context.current()
+ .wrap(
+ () ->
+ parseAndPopulateRpcData(
+ schema, bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue));
Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable);
populateBufferWorker.start();
@@ -1448,13 +1384,31 @@ public void clearBatch() {
@Override
public int[] executeBatch() throws SQLException {
LOG.finest("++enter++");
+ return withTracing(
+ "BigQueryStatement.executeBatch",
+ (span) -> {
+ span.setAttribute("db.statement.count", this.batchQueries.size());
+ span.setAttribute(
+ AttributeKey.stringArrayKey("db.batch.statements"),
+ new ArrayList<>(this.batchQueries));
+
+ StringBuilder sb = new StringBuilder();
+ for (String query : this.batchQueries) {
+ sb.append(query);
+ }
+ String combinedQueries = sb.toString();
+
+ return executeBatchImpl(combinedQueries);
+ });
+ }
+
+ private int[] executeBatchImpl(String combinedQueries) throws SQLException {
int[] result = new int[this.batchQueries.size()];
if (this.batchQueries.isEmpty()) {
return result;
}
try {
- String combinedQueries = String.join("", this.batchQueries);
QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries);
jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH);
runQuery(combinedQueries, jobConfiguration.build());
@@ -1622,4 +1576,143 @@ private void enqueueBufferError(BlockingQueue que
private void enqueueBufferEndOfStream(BlockingQueue queue) {
Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true));
}
+
+ @FunctionalInterface
+ private interface TracedOperation {
+ T run(Span span) throws SQLException;
+ }
+
+ private T withTracing(String spanName, TracedOperation operation) throws SQLException {
+ Tracer tracer = this.connection.getTracer();
+ Span span = tracer.spanBuilder(spanName).startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ return operation.run(span);
+ } catch (SQLException | RuntimeException ex) {
+ span.recordException(ex);
+ span.setStatus(StatusCode.ERROR, ex.getMessage());
+ throw ex;
+ } finally {
+ span.end();
+ }
+ }
+
+ private void fetchNextPages(
+ String firstPageToken,
+ JobId jobId,
+ BlockingQueue> rpcResponseQueue,
+ BlockingQueue bigQueryFieldValueListWrapperBlockingQueue,
+ TableResult result) {
+ SpanContext parentSpanContext = Span.current().getSpanContext();
+ String currentPageToken = firstPageToken;
+ TableResult currentResults = result;
+ TableId destinationTable = null;
+ if (firstPageToken != null) {
+ destinationTable = getDestinationTable(jobId);
+ }
+
+ try {
+ Tracer tracer = this.connection.getTracer();
+ while (currentPageToken != null) {
+ if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
+ LOG.warning("%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName());
+ break;
+ }
+
+ SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination");
+ if (parentSpanContext.isValid()) {
+ spanBuilder.addLink(parentSpanContext);
+ }
+ Span paginationSpan = spanBuilder.startSpan();
+ try (Scope scope = paginationSpan.makeCurrent()) {
+ paginationSpan.setAttribute("db.pagination.page_token", currentPageToken);
+
+ long startTime = System.nanoTime();
+ currentResults =
+ this.bigQuery.listTableData(
+ destinationTable,
+ TableDataListOption.pageSize(querySettings.getMaxResultPerPage()),
+ TableDataListOption.pageToken(currentPageToken));
+
+ long duration = (System.nanoTime() - startTime) / 1000000;
+ paginationSpan.setAttribute("db.pagination.duration_ms", duration);
+ paginationSpan.setAttribute(
+ "db.pagination.rows_fetched", querySettings.getMaxResultPerPage());
+
+ currentPageToken = currentResults.getNextPageToken();
+ Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true));
+ LOG.fine(
+ "Fetched %d results from the server in %d ms.",
+ querySettings.getMaxResultPerPage(), (int) duration);
+ } catch (Exception e) {
+ paginationSpan.recordException(e);
+ paginationSpan.setStatus(StatusCode.ERROR, e.getMessage());
+ throw e;
+ } finally {
+ paginationSpan.end();
+ }
+ }
+ } catch (Exception ex) {
+ Uninterruptibles.putUninterruptibly(
+ bigQueryFieldValueListWrapperBlockingQueue,
+ BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex)));
+ } finally {
+ Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false));
+ }
+ }
+
+ private void parseAndPopulateRpcData(
+ Schema schema,
+ BlockingQueue bigQueryFieldValueListWrapperBlockingQueue,
+ BlockingQueue> rpcResponseQueue) {
+ try {
+ Iterable fieldValueLists;
+ boolean hasRows = true;
+ while (hasRows) {
+ try {
+ Tuple nextPageTuple = rpcResponseQueue.take();
+ if (nextPageTuple.x() != null) {
+ fieldValueLists = nextPageTuple.x().getValues();
+ } else {
+ fieldValueLists = null;
+ }
+ hasRows = nextPageTuple.y();
+
+ } catch (InterruptedException e) {
+ LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
+ break;
+ }
+
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()
+ || fieldValueLists == null) {
+ break;
+ }
+
+ long startTime = System.nanoTime();
+ long results = 0;
+ for (FieldValueList fieldValueList : fieldValueLists) {
+
+ if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) {
+ break;
+ }
+ Uninterruptibles.putUninterruptibly(
+ bigQueryFieldValueListWrapperBlockingQueue,
+ BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList));
+ results += 1;
+ }
+ LOG.fine(
+ "Processed %d results in %d ms.",
+ results, (int) ((System.nanoTime() - startTime) / 1000000));
+ }
+
+ } catch (Exception ex) {
+ LOG.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
+ ex);
+ enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
+ } finally {
+ enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue);
+ }
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
index 02142363866e..9c0224005b9e 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java
@@ -20,6 +20,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -113,6 +114,9 @@ public class DataSource implements javax.sql.DataSource {
private String privateServiceConnect;
private Long connectionPoolSize;
private Long listenerPoolSize;
+ private Boolean enableGcpTraceExporter;
+ private Boolean enableGcpLogExporter;
+ private OpenTelemetry customOpenTelemetry;
// Make sure the JDBC driver class is loaded.
static {
@@ -324,6 +328,18 @@ public class DataSource implements javax.sql.DataSource {
.put(
BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME,
(ds, val) -> ds.setListenerPoolSize(Long.parseLong(val)))
+ .put(
+ BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME,
+ (ds, val) ->
+ ds.setEnableGcpTraceExporter(
+ BigQueryJdbcUrlUtility.convertIntToBoolean(
+ val, BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME)))
+ .put(
+ BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME,
+ (ds, val) ->
+ ds.setEnableGcpLogExporter(
+ BigQueryJdbcUrlUtility.convertIntToBoolean(
+ val, BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME)))
.build();
public static DataSource fromUrl(String url) {
@@ -375,7 +391,11 @@ public Connection getConnection() throws SQLException {
throw new BigQueryJdbcException(
"The URL " + getURL() + " is invalid. Please specify a valid Connection URL. ");
}
- return DriverManager.getConnection(getURL(), createProperties());
+ Properties props = createProperties();
+ if (this.customOpenTelemetry != null) {
+ props.put("customOpenTelemetry", this.customOpenTelemetry);
+ }
+ return DriverManager.getConnection(getURL(), props);
}
private Properties createProperties() {
@@ -616,6 +636,16 @@ private Properties createProperties() {
BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME,
String.valueOf(this.listenerPoolSize));
}
+ if (this.enableGcpTraceExporter != null) {
+ connectionProperties.setProperty(
+ BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME,
+ String.valueOf(this.enableGcpTraceExporter));
+ }
+ if (this.enableGcpLogExporter != null) {
+ connectionProperties.setProperty(
+ BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME,
+ String.valueOf(this.enableGcpLogExporter));
+ }
return connectionProperties;
}
@@ -737,6 +767,34 @@ public void setListenerPoolSize(Long listenerPoolSize) {
this.listenerPoolSize = listenerPoolSize;
}
+ public Boolean getEnableGcpTraceExporter() {
+ return enableGcpTraceExporter != null
+ ? enableGcpTraceExporter
+ : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE;
+ }
+
+ public void setEnableGcpTraceExporter(Boolean enableGcpTraceExporter) {
+ this.enableGcpTraceExporter = enableGcpTraceExporter;
+ }
+
+ public Boolean getEnableGcpLogExporter() {
+ return enableGcpLogExporter != null
+ ? enableGcpLogExporter
+ : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE;
+ }
+
+ public void setEnableGcpLogExporter(Boolean enableGcpLogExporter) {
+ this.enableGcpLogExporter = enableGcpLogExporter;
+ }
+
+ public OpenTelemetry getCustomOpenTelemetry() {
+ return customOpenTelemetry;
+ }
+
+ public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) {
+ this.customOpenTelemetry = customOpenTelemetry;
+ }
+
public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) {
this.highThroughputMinTableSize = highThroughputMinTableSize;
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
index 4d108ee54f8b..c3fec1b4b5cc 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java
@@ -30,6 +30,12 @@
import com.google.api.gax.paging.Page;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.BigQuery.RoutineListOption;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.io.InputStream;
import java.sql.DatabaseMetaData;
@@ -39,16 +45,29 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BigQueryDatabaseMetaDataTest {
+ @RegisterExtension
+ public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();
+
private BigQueryConnection bigQueryConnection;
private BigQueryDatabaseMetaData dbMetadata;
private BigQuery bigqueryClient;
@@ -62,6 +81,20 @@ public void setUp() throws SQLException {
when(bigQueryConnection.getConnectionUrl()).thenReturn("jdbc:bigquery://test-project");
when(bigQueryConnection.getBigQuery()).thenReturn(bigqueryClient);
when(bigQueryConnection.createStatement()).thenReturn(mockStatement);
+ when(bigQueryConnection.getTracer())
+ .thenReturn(
+ otelTesting
+ .getOpenTelemetry()
+ .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME));
+
+ Page datasetPageMock = mock(Page.class);
+ when(bigqueryClient.listDatasets(anyString(), any())).thenReturn(datasetPageMock);
+
+ Page tablePageMock = mock(Page.class);
+ when(bigqueryClient.listTables(any(DatasetId.class), any())).thenReturn(tablePageMock);
+
+ Table mockTable = mock(Table.class);
+ when(bigqueryClient.getTable(any(TableId.class))).thenReturn(mockTable);
dbMetadata = new BigQueryDatabaseMetaData(bigQueryConnection);
}
@@ -2917,7 +2950,7 @@ public void testPrepareGetCatalogsRows() {
}
@Test
- public void testGetSchemas_NoArgs_DelegatesCorrectly() {
+ public void testGetSchemas_NoArgs_DelegatesCorrectly() throws Exception {
BigQueryDatabaseMetaData spiedDbMetadata = spy(dbMetadata);
ResultSet mockResultSet = mock(ResultSet.class);
doReturn(mockResultSet).when(spiedDbMetadata).getSchemas(null, null);
@@ -3206,4 +3239,96 @@ public void testSupportsResultSetConcurrency() throws SQLException {
public void testGetSQLStateType() throws SQLException {
assertEquals(DatabaseMetaData.sqlStateSQL, dbMetadata.getSQLStateType());
}
+
+ @ParameterizedTest
+ @MethodSource("metadataOperationProvider")
+ public void testMetadataOperation_generatesSpan(
+ MetadataOperation operation, String expectedSpanName) throws Exception {
+ operation.run();
+
+ SpanData span =
+ OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName);
+ OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET);
+ }
+
+ @FunctionalInterface
+ interface MetadataOperation {
+ void run() throws SQLException;
+ }
+
+ Stream metadataOperationProvider() {
+ return Stream.of(
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getCatalogs(),
+ "BigQueryDatabaseMetaData.getCatalogs"),
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getSchemas("catalog", "schema"),
+ "BigQueryDatabaseMetaData.getSchemas"),
+ Arguments.of(
+ (MetadataOperation)
+ () -> dbMetadata.getTables("catalog", "schema", "table", new String[] {"TABLE"}),
+ "BigQueryDatabaseMetaData.getTables"),
+ Arguments.of(
+ (MetadataOperation) () -> dbMetadata.getColumns("catalog", "schema", "table", "column"),
+ "BigQueryDatabaseMetaData.getColumns"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("asyncMetadataOperationProvider")
+ public void testAsyncMetadataOperation_createsDetachedLinkedSpan(
+ AsyncMetadataOperation operation, String expectedSpanName) throws Exception {
+ BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test");
+ Span parentSpan = testTracer.spanBuilder("parent-span").startSpan();
+
+ try (Scope scope = parentSpan.makeCurrent()) {
+ Thread workerThread = operation.run(queue);
+
+ Assertions.assertNotNull(workerThread, "Worker thread should not be null");
+ workerThread.join();
+
+ OpenTelemetryTestUtility.assertSpanLinkedToParent(
+ otelTesting.getSpans(), expectedSpanName, parentSpan);
+ } finally {
+ parentSpan.end();
+ }
+ }
+
+ @FunctionalInterface
+ interface AsyncMetadataOperation {
+ Thread run(BlockingQueue queue) throws Exception;
+ }
+
+ Stream asyncMetadataOperationProvider() {
+ return Stream.of(
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetTablesTaskAsync(
+ "catalog",
+ "schema",
+ "table",
+ new String[] {"TABLE"},
+ dbMetadata.defineGetTablesSchema(),
+ q),
+ "BigQueryDatabaseMetaData.getTables.background"),
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetColumnsTaskAsync(
+ "catalog",
+ "schema",
+ "table",
+ "column",
+ dbMetadata.defineGetColumnsSchema(),
+ q),
+ "BigQueryDatabaseMetaData.getColumns.background"),
+ Arguments.of(
+ (AsyncMetadataOperation)
+ (q) ->
+ dbMetadata.runGetSchemasTaskAsync(
+ "catalog", "schema", dbMetadata.defineGetSchemasSchema(), q),
+ "BigQueryDatabaseMetaData.getSchemas.background"));
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java
index 85eb254038f3..2fc1ec7f8f87 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java
@@ -16,7 +16,9 @@
package com.google.cloud.bigquery.jdbc;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import io.opentelemetry.api.OpenTelemetry;
import java.sql.Connection;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
@@ -104,4 +106,21 @@ public void testConnectWithInvalidUrlChainsNoException() throws SQLException {
new Properties());
assertThat(connection.isClosed()).isFalse();
}
+
+ @Test
+ public void testConnect_withCustomOpenTelemetry_injectsIntoDataSource() throws SQLException {
+ OpenTelemetry mockOtel = mock(OpenTelemetry.class);
+ Properties props = new Properties();
+ props.put("customOpenTelemetry", mockOtel);
+
+ // Connect using standard URL setup but pass the SDK via Properties
+ Connection connection =
+ bigQueryDriver.connect(
+ "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"
+ + "OAuthType=2;ProjectId=MyBigQueryProject;OAuthAccessToken=redacted;",
+ props);
+
+ assertThat(connection).isNotNull();
+ assertThat(connection.isClosed()).isFalse();
+ }
}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java
new file mode 100644
index 000000000000..24f77abdd071
--- /dev/null
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.jdbc;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
+import org.junit.jupiter.api.Test;
+
+public class BigQueryJdbcOpenTelemetryTest {
+
+ @Test
+ public void testGetOpenTelemetry_withCustomSdk_returnsCustom() {
+ OpenTelemetry mockCustomOtel = mock(OpenTelemetry.class);
+
+ OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(false, false, mockCustomOtel);
+
+ assertThat(result).isSameInstanceAs(mockCustomOtel);
+ }
+
+ @Test
+ public void testGetOpenTelemetry_withCustomSdkAndFlags_returnsCustom() {
+ OpenTelemetry mockCustomOtel = mock(OpenTelemetry.class);
+
+ // Custom SDK always takes precedence over individual flags
+ OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(true, true, mockCustomOtel);
+
+ assertThat(result).isSameInstanceAs(mockCustomOtel);
+ }
+
+ @Test
+ public void testGetOpenTelemetry_noFlags_returnsNoop() {
+ OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(false, false, null);
+
+ assertThat(result).isSameInstanceAs(OpenTelemetry.noop());
+ }
+
+ @Test
+ public void testGetTracer_respectsScopeName() {
+ Tracer result = BigQueryJdbcOpenTelemetry.getTracer(OpenTelemetry.noop());
+
+ assertThat(result).isNotNull();
+ }
+}
diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
index f3971bd71150..cffde5d4daec 100644
--- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
+++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java
@@ -27,8 +27,10 @@
import static org.mockito.Mockito.verify;
import com.google.cloud.ServiceOptions;
+import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQuery.QueryResultsOption;
+import com.google.cloud.bigquery.BigQuery.TableDataListOption;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
@@ -52,27 +54,47 @@
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.stream.Stream;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BigQueryStatementTest {
+ @RegisterExtension
+ public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();
+
private BigQueryConnection bigQueryConnection;
private static final String PROJECT = "project";
@@ -124,9 +146,38 @@ private Job getJobMock(
return job;
}
+ private TableResult setupMockQueryResults(JobId jobId, StatementType type, Long affectedRows)
+ throws Exception {
+ doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode();
+ TableResult tableResultMock = mock(TableResult.class);
+ doReturn(jobId).when(tableResultMock).getJobId();
+ doReturn(Schema.of()).when(tableResultMock).getSchema();
+ doReturn(tableResultMock)
+ .when(bigquery)
+ .queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
+
+ Job jobMock = getJobMock(tableResultMock, null, type);
+ if (affectedRows != null) {
+ JobStatistics.QueryStatistics stats = (JobStatistics.QueryStatistics) jobMock.getStatistics();
+ doReturn(affectedRows).when(stats).getNumDmlAffectedRows();
+ }
+ doReturn(jobMock).when(bigquery).getJob(any(JobId.class));
+ doReturn(jobMock).when(jobMock).waitFor();
+
+ Job dryRunJobMock = getJobMock(null, null, type);
+ doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class));
+ return tableResultMock;
+ }
+
@BeforeEach
public void setUp() throws IOException, SQLException {
bigQueryConnection = mock(BigQueryConnection.class);
+ doReturn(
+ otelTesting
+ .getOpenTelemetry()
+ .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))
+ .when(bigQueryConnection)
+ .getTracer();
rpcFactoryMock = mock(BigQueryRpcFactory.class);
bigquery = mock(BigQuery.class);
bigQueryConnection.bigQuery = bigquery;
@@ -449,21 +500,11 @@ public void testCloseCancelsJob() throws SQLException, InterruptedException {
}
@Test
- public void testCancelWithJoblessQuery() throws SQLException, InterruptedException {
- doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode();
+ public void testCancelWithJoblessQuery() throws Exception {
+ TableResult tableResultMock = setupMockQueryResults(null, StatementType.SELECT, null);
BigQueryStatement joblessStatement = new BigQueryStatement(bigQueryConnection);
BigQueryStatement joblessStatementSpy = Mockito.spy(joblessStatement);
- TableResult tableResultMock = mock(TableResult.class);
- doReturn(null).when(tableResultMock).getJobId();
-
- doReturn(tableResultMock)
- .when(bigquery)
- .queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
-
- Job dryRunJobMock = getJobMock(null, null, StatementType.SELECT);
- doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class));
-
BigQueryJsonResultSet resultSetMock = mock(BigQueryJsonResultSet.class);
doReturn(resultSetMock).when(joblessStatementSpy).processJsonResultSet(tableResultMock);
@@ -480,4 +521,115 @@ public void testCancelWithJoblessQuery() throws SQLException, InterruptedExcepti
// And no backend cancellation was attempted
verify(bigquery, Mockito.never()).cancel(any(JobId.class));
}
+
+ @Test
+ public void testFetchNextPages_addsLinkToParent() throws Exception {
+ Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test");
+ Span parentSpan = testTracer.spanBuilder("parent-span").startSpan();
+
+ try (Scope scope = parentSpan.makeCurrent()) {
+
+ BlockingQueue> rpcResponseQueue = new LinkedBlockingDeque<>();
+ BlockingQueue bigQueryFieldValueListWrapperBlockingQueue =
+ new LinkedBlockingDeque<>();
+ TableResult mockResult = mock(TableResult.class);
+ JobId mockJobId = JobId.of("job");
+
+ Job mockJob = mock(Job.class);
+ QueryJobConfiguration realConfig =
+ QueryJobConfiguration.newBuilder("SELECT 1")
+ .setDestinationTable(TableId.of("project", "dataset", "table"))
+ .build();
+ doReturn(mockJob).when(bigquery).getJob(any(JobId.class));
+ doReturn(realConfig).when(mockJob).getConfiguration();
+
+ TableResult mockNextResult = mock(TableResult.class);
+ doReturn(mockNextResult)
+ .when(bigquery)
+ .listTableData(
+ any(TableId.class), any(TableDataListOption.class), any(TableDataListOption.class));
+ doReturn(null).when(mockNextResult).getNextPageToken();
+
+ Thread workerThread =
+ bigQueryStatement.runNextPageTaskAsync(
+ mockResult,
+ "token",
+ mockJobId,
+ rpcResponseQueue,
+ bigQueryFieldValueListWrapperBlockingQueue);
+
+ Assertions.assertNotNull(workerThread, "Worker thread should not be null");
+ workerThread.join();
+
+ OpenTelemetryTestUtility.assertSpanLinkedToParent(
+ otelTesting.getSpans(), "BigQueryStatement.pagination", parentSpan);
+ } finally {
+ parentSpan.end();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("statementOperationProvider")
+ public void testExecuteOperation_generatesSpan(
+ StatementOperation operation,
+ String expectedSpanName,
+ StatementType type,
+ Map, Object> expectedAttributes)
+ throws Exception {
+ setupMockQueryResults(JobId.of("job"), type, 1L);
+ operation.run();
+
+ SpanData span =
+ OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName);
+ OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET);
+
+ if (expectedAttributes != null) {
+ for (Map.Entry, Object> entry : expectedAttributes.entrySet()) {
+ OpenTelemetryTestUtility.assertSpanHasAttribute(
+ span, (AttributeKey