diff --git a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml index 4ce42dd0ea6c..f2c75f9bd5cf 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/pom.xml +++ b/java-bigquery/google-cloud-bigquery-jdbc/pom.xml @@ -132,6 +132,17 @@ io com.google.bqjdbc.shaded.io + + + io.opentelemetry.api.* + io.opentelemetry.context.* + @@ -276,6 +287,16 @@ httpcore5 + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + junit @@ -323,6 +344,11 @@ junit-platform-suite-engine test + + io.opentelemetry + opentelemetry-sdk-testing + test + diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index af041dc2a649..d15e6deb2ee4 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -27,6 +27,7 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; import com.google.cloud.bigquery.storage.v1.ArrowSchema; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -239,29 +240,31 @@ public boolean next() throws SQLException { || this.currentBatchRowIndex == (this.vectorSchemaRoot.getRowCount() - 1)) { /* Start of iteration or we have exhausted the current batch */ // Advance the cursor. Potentially blocking operation. - BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); - if (batchWrapper.getException() != null) { - throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); - } - if (batchWrapper.isLast()) { - /* Marks the end of the records */ - if (this.vectorSchemaRoot != null) { - // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds - // the last batch - this.vectorSchemaRoot.clear(); + try (Scope scope = makeOriginalContextCurrent()) { + BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); + if (batchWrapper.getException() != null) { + throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); + } + if (batchWrapper.isLast()) { + /* Marks the end of the records */ + if (this.vectorSchemaRoot != null) { + // IMP: To avoid memory leak: clear vectorSchemaRoot as it still holds + // the last batch + this.vectorSchemaRoot.clear(); + } + this.hasReachedEnd = true; + this.rowCount++; + return false; } - this.hasReachedEnd = true; + // Valid batch, process it + ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); + // Populates vectorSchemaRoot + this.arrowDeserializer.deserializeArrowBatch(arrowBatch); + // Pointing to the first row in this fresh batch + this.currentBatchRowIndex = 0; this.rowCount++; - return false; + return true; } - // Valid batch, process it - ArrowRecordBatch arrowBatch = batchWrapper.getCurrentArrowBatch(); - // Populates vectorSchemaRoot - this.arrowDeserializer.deserializeArrowBatch(arrowBatch); - // Pointing to the first row in this fresh batch - this.currentBatchRowIndex = 0; - this.rowCount++; - return true; } // There are rows left in the current batch. else if (this.currentBatchRowIndex < this.vectorSchemaRoot.getRowCount()) { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java index d63b72bb6c93..1fedd00e74e3 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java @@ -27,6 +27,10 @@ import com.google.cloud.bigquery.exception.BigQueryConversionException; import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionException; import com.google.cloud.bigquery.exception.BigQueryJdbcCoercionNotFoundException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.InputStream; import java.io.Reader; import java.io.StringReader; @@ -58,6 +62,7 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet protected boolean isClosed = false; protected boolean wasNull = false; protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE; + protected final SpanContext originalSpanContext; protected BigQueryBaseResultSet( BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) { @@ -66,6 +71,11 @@ protected BigQueryBaseResultSet( this.schema = schema; this.schemaFieldList = schema != null ? schema.getFields() : null; this.isNested = isNested; + this.originalSpanContext = Span.current().getSpanContext(); + } + + protected Scope makeOriginalContextCurrent() { + return Context.current().with(Span.wrap(this.originalSpanContext)).makeCurrent(); } public QueryStatistics getQueryStatistics() { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 822b4e282f74..c703cee798a2 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -41,6 +41,8 @@ import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; import com.google.cloud.http.HttpTransportOptions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; import java.io.IOException; import java.io.InputStream; import java.sql.CallableStatement; @@ -141,6 +143,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { Long connectionPoolSize; Long listenerPoolSize; String partnerToken; + Boolean enableGcpTraceExporter; + Boolean enableGcpLogExporter; + OpenTelemetry customOpenTelemetry; + Tracer tracer = + OpenTelemetry.noop().getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME); DatabaseMetaData databaseMetaData; Boolean reqGoogleDriveScope; @@ -267,6 +274,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { this.partnerToken = ds.getPartnerToken(); this.headerProvider = createHeaderProvider(); + this.enableGcpTraceExporter = ds.getEnableGcpTraceExporter(); + this.enableGcpLogExporter = ds.getEnableGcpLogExporter(); + this.customOpenTelemetry = ds.getCustomOpenTelemetry(); this.bigQuery = getBigQueryConnection(); } } @@ -1042,6 +1052,14 @@ private BigQuery getBigQueryConnection() { bigQueryOptions.setTransportOptions(this.httpTransportOptions); } + OpenTelemetry openTelemetry = + BigQueryJdbcOpenTelemetry.getOpenTelemetry( + this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry); + if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) { + this.tracer = BigQueryJdbcOpenTelemetry.getTracer(openTelemetry); + bigQueryOptions.setOpenTelemetryTracer(this.tracer); + } + BigQueryOptions options = bigQueryOptions.setHeaderProvider(this.headerProvider).build(); options.setDefaultJobCreationMode( this.useStatelessQueryMode @@ -1090,6 +1108,13 @@ private BigQueryReadClient getBigQueryReadClientConnection() throws IOException bigQueryReadSettings.setTransportChannelProvider(activeProvider); + OpenTelemetry openTelemetry = + BigQueryJdbcOpenTelemetry.getOpenTelemetry( + this.enableGcpTraceExporter, this.enableGcpLogExporter, this.customOpenTelemetry); + if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) { + bigQueryReadSettings.setOpenTelemetryTracerProvider(openTelemetry.getTracerProvider()); + } + return BigQueryReadClient.create(bigQueryReadSettings.build()); } @@ -1193,4 +1218,8 @@ public CallableStatement prepareCall( } return prepareCall(sql); } + + public Tracer getTracer() { + return this.tracer; + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index e81601bd77ab..3a01a4ae0bae 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -42,6 +42,13 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.exception.BigQueryJdbcException; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -1708,8 +1715,16 @@ Comparator defineGetProcedureColumnsComparator(FieldList resultS @Override public ResultSet getTables( - String catalog, String schemaPattern, String tableNamePattern, String[] types) { + String catalog, String schemaPattern, String tableNamePattern, String[] types) + throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getTables", + () -> getTablesImpl(catalog, schemaPattern, tableNamePattern, types)); + } + private ResultSet getTablesImpl( + String catalog, String schemaPattern, String tableNamePattern, String[] types) + throws SQLException { Tuple effectiveIdentifiers = determineEffectiveCatalogAndSchema(catalog, schemaPattern); String effectiveCatalog = effectiveIdentifiers.x(); @@ -1727,161 +1742,195 @@ public ResultSet getTables( "getTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s, types: %s", effectiveCatalog, effectiveSchemaPattern, tableNamePattern, Arrays.toString(types)); + final Schema resultSchema = defineGetTablesSchema(); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + + Thread fetcherThread = + runGetTablesTaskAsync( + effectiveCatalog, effectiveSchemaPattern, tableNamePattern, types, resultSchema, queue); + + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + + LOG.info("Started background thread for getTables"); + return resultSet; + } + + @VisibleForTesting + Thread runGetTablesTaskAsync( + String effectiveCatalog, + String effectiveSchemaPattern, + String tableNamePattern, + String[] types, + Schema resultSchema, + BlockingQueue queue) + throws SQLException { + final Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); final Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); final Set requestedTypes = (types == null || types.length == 0) ? null : new HashSet<>(Arrays.asList(types)); - final Schema resultSchema = defineGetTablesSchema(); final FieldList resultSchemaFields = resultSchema.getFields(); - - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); final String catalogParam = effectiveCatalog; final String schemaParam = effectiveSchemaPattern; - + SpanContext parentSpanContext = Span.current().getSpanContext(); Runnable tableFetcher = () -> { - ExecutorService apiExecutor = null; - ExecutorService tableProcessorExecutor = null; - final FieldList localResultSchemaFields = resultSchemaFields; - final List>> apiFutures = new ArrayList<>(); - final List> processingFutures = new ArrayList<>(); + Span backgroundSpan = + this.connection + .getTracer() + .spanBuilder("BigQueryDatabaseMetaData.getTables.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope scope = backgroundSpan.makeCurrent()) { + ExecutorService apiExecutor = null; + ExecutorService tableProcessorExecutor = null; + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List> processingFutures = new ArrayList<>(); - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + try { + List datasetsToScan = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, + LOG); - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } - apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); - tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); + tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - LOG.fine("Submitting parallel findMatchingTables tasks..."); - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted during dataset iteration."); - break; + LOG.fine("Submitting parallel findMatchingTables tasks..."); + for (Dataset dataset : datasetsToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted during dataset iteration."); + break; + } + + final DatasetId currentDatasetId = dataset.getDatasetId(); + Callable> apiCallable = + () -> + findMatchingBigQueryObjects( + "Table", + () -> + bigquery.listTables( + currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of( + currentDatasetId.getProject(), + currentDatasetId.getDataset(), + name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + Callable> wrappedApiCallable = Context.current().wrap(apiCallable); + Future> apiFuture = apiExecutor.submit(wrappedApiCallable); + apiFutures.add(apiFuture); } + LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); + apiExecutor.shutdown(); - final DatasetId currentDatasetId = dataset.getDatasetId(); - Callable> apiCallable = - () -> - findMatchingBigQueryObjects( - "Table", + LOG.fine("Processing results from findMatchingTables tasks..."); + for (Future> apiFuture : apiFutures) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Table fetcher interrupted while processing API futures."); + break; + } + try { + List tablesResult = apiFuture.get(); + if (tablesResult != null) { + for (Table table : tablesResult) { + if (Thread.currentThread().isInterrupted()) break; + + final Table currentTable = table; + Runnable processRunnable = () -> - bigquery.listTables( - currentDatasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of( - currentDatasetId.getProject(), - currentDatasetId.getDataset(), - name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, - LOG); - Future> apiFuture = apiExecutor.submit(apiCallable); - apiFutures.add(apiFuture); - } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); - apiExecutor.shutdown(); + processTableInfo( + currentTable, + requestedTypes, + collectedResults, + localResultSchemaFields); + Runnable wrappedProcessRunnable = Context.current().wrap(processRunnable); + Future processFuture = + tableProcessorExecutor.submit(wrappedProcessRunnable); + processingFutures.add(processFuture); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warning("Fetcher thread interrupted while waiting for API future result."); + break; + } catch (ExecutionException e) { + LOG.warning( + "Error executing findMatchingTables task: " + + e.getMessage() + + ". Cause: " + + e.getCause()); + } catch (CancellationException e) { + LOG.warning("A findMatchingTables task was cancelled."); + } + } + + LOG.fine( + "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); - LOG.fine("Processing results from findMatchingTables tasks..."); - for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { - LOG.warning("Table fetcher interrupted while processing API futures."); - break; + LOG.warning( + "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); + processingFutures.forEach(f -> f.cancel(true)); + } else { + LOG.fine("Waiting for processTableInfo tasks to complete..."); + waitForTasksCompletion(processingFutures); + LOG.fine("All processTableInfo tasks completed."); } - try { - List
tablesResult = apiFuture.get(); - if (tablesResult != null) { - for (Table table : tablesResult) { - if (Thread.currentThread().isInterrupted()) break; - final Table currentTable = table; - Future processFuture = - tableProcessorExecutor.submit( - () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields)); - processingFutures.add(processFuture); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Fetcher thread interrupted while waiting for API future result."); - break; - } catch (ExecutionException e) { - LOG.warning( - "Error executing findMatchingTables task: " - + e.getMessage() - + ". Cause: " - + e.getCause()); - } catch (CancellationException e) { - LOG.warning("A findMatchingTables task was cancelled."); + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetTablesComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getTables", LOG); } - } - LOG.fine( - "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); + } catch (Throwable t) { + LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); + apiFutures.forEach(f -> f.cancel(true)); processingFutures.forEach(f -> f.cancel(true)); - } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); - waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(apiExecutor); + shutdownExecutor(tableProcessorExecutor); + LOG.info("Table fetcher thread finished."); } - - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetTablesComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getTables", LOG); - } - - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } - - } catch (Throwable t) { - LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - apiFutures.forEach(f -> f.cancel(true)); - processingFutures.forEach(f -> f.cancel(true)); } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(apiExecutor); - shutdownExecutor(tableProcessorExecutor); - LOG.info("Table fetcher thread finished."); + backgroundSpan.end(); } }; - Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - + Runnable wrappedTableFetcher = Context.current().wrap(tableFetcher); + Thread fetcherThread = new Thread(wrappedTableFetcher, "getTables-fetcher-" + effectiveCatalog); fetcherThread.start(); - LOG.info("Started background thread for getTables"); - return resultSet; + return fetcherThread; } Schema defineGetTablesSchema() { @@ -1995,16 +2044,19 @@ Comparator defineGetTablesComparator(FieldList resultSchemaField } @Override - public ResultSet getSchemas() { + public ResultSet getSchemas() throws SQLException { LOG.info("getSchemas() called"); return getSchemas(null, null); } @Override - public ResultSet getCatalogs() { - LOG.info("getCatalogs() called"); + public ResultSet getCatalogs() throws SQLException { + return withTracing("BigQueryDatabaseMetaData.getCatalogs", () -> getCatalogsImpl()); + } + private ResultSet getCatalogsImpl() throws SQLException { + LOG.info("getCatalogs() called"); final List accessibleCatalogs = getAccessibleCatalogNames(); final Schema catalogsSchema = defineGetCatalogsSchema(); final FieldList schemaFields = catalogsSchema.getFields(); @@ -2073,8 +2125,16 @@ static List prepareGetTableTypesRows(Schema schema) { @Override public ResultSet getColumns( - String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getColumns", + () -> getColumnsImpl(catalog, schemaPattern, tableNamePattern, columnNamePattern)); + } + private ResultSet getColumnsImpl( + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { Tuple effectiveIdentifiers = determineEffectiveCatalogAndSchema(catalog, schemaPattern); String effectiveCatalog = effectiveIdentifiers.x(); @@ -2094,120 +2154,160 @@ public ResultSet getColumns( + " columnNamePattern: %s", effectiveCatalog, effectiveSchemaPattern, tableNamePattern, columnNamePattern); + final Schema resultSchema = defineGetColumnsSchema(); + final BlockingQueue queue = + new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + + Thread fetcherThread = + runGetColumnsTaskAsync( + effectiveCatalog, + effectiveSchemaPattern, + tableNamePattern, + columnNamePattern, + resultSchema, + queue); + + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + + LOG.info("Started background thread for getColumns"); + return resultSet; + } + + @VisibleForTesting + Thread runGetColumnsTaskAsync( + String effectiveCatalog, + String effectiveSchemaPattern, + String tableNamePattern, + String columnNamePattern, + Schema resultSchema, + BlockingQueue queue) + throws SQLException { + Pattern schemaRegex = compileSqlLikePattern(effectiveSchemaPattern); Pattern tableNameRegex = compileSqlLikePattern(tableNamePattern); Pattern columnNameRegex = compileSqlLikePattern(columnNamePattern); - final Schema resultSchema = defineGetColumnsSchema(); final FieldList resultSchemaFields = resultSchema.getFields(); - final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); final String catalogParam = effectiveCatalog; final String schemaParam = effectiveSchemaPattern; + SpanContext parentSpanContext = Span.current().getSpanContext(); Runnable columnFetcher = () -> { - ExecutorService columnExecutor = null; - final List> taskFutures = new ArrayList<>(); - final FieldList localResultSchemaFields = resultSchemaFields; - - try { - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + Span backgroundSpan = + this.connection + .getTracer() + .spanBuilder("BigQueryDatabaseMetaData.getColumns.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope scope = backgroundSpan.makeCurrent()) { + ExecutorService columnExecutor = null; + final List> taskFutures = new ArrayList<>(); + final FieldList localResultSchemaFields = resultSchemaFields; - if (datasetsToScan.isEmpty()) { - LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); - return; - } - - columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - - for (Dataset dataset : datasetsToScan) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted during dataset iteration."); - break; - } - - DatasetId datasetId = dataset.getDatasetId(); - LOG.info("Processing dataset: " + datasetId.getDataset()); - - List
tablesToScan = + try { + List datasetsToScan = findMatchingBigQueryObjects( - "Table", + "Dataset", () -> - bigquery.listTables( - datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> - bigquery.getTable( - TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), - (tbl) -> tbl.getTableId().getTable(), - tableNamePattern, - tableNameRegex, + bigquery.listDatasets( + catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaParam, + schemaRegex, LOG); - for (Table table : tablesToScan) { + if (datasetsToScan.isEmpty()) { + LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); + return; + } + + columnExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); + + for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Fetcher interrupted during table iteration for dataset " - + datasetId.getDataset()); + LOG.warning("Fetcher interrupted during dataset iteration."); break; } - TableId tableId = table.getTableId(); - LOG.fine("Submitting task for table: " + tableId); - final Table finalTable = table; - Future future = - columnExecutor.submit( + DatasetId datasetId = dataset.getDatasetId(); + LOG.info("Processing dataset: " + datasetId.getDataset()); + + List
tablesToScan = + findMatchingBigQueryObjects( + "Table", () -> - processTableColumns( - finalTable, - columnNameRegex, - collectedResults, - localResultSchemaFields)); - taskFutures.add(future); + bigquery.listTables( + datasetId, TableListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> + bigquery.getTable( + TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), + (tbl) -> tbl.getTableId().getTable(), + tableNamePattern, + tableNameRegex, + LOG); + + for (Table table : tablesToScan) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Fetcher interrupted during table iteration for dataset " + + datasetId.getDataset()); + break; + } + + TableId tableId = table.getTableId(); + LOG.fine("Submitting task for table: " + tableId); + final Table finalTable = table; + + Runnable columnTask = + () -> + processTableColumns( + finalTable, + columnNameRegex, + collectedResults, + localResultSchemaFields); + Runnable wrappedColumnTask = Context.current().wrap(columnTask); + Future future = columnExecutor.submit(wrappedColumnTask); + taskFutures.add(future); + } + if (Thread.currentThread().isInterrupted()) break; } - if (Thread.currentThread().isInterrupted()) break; - } - waitForTasksCompletion(taskFutures); + waitForTasksCompletion(taskFutures); - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetColumnsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getColumns", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetColumnsComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getColumns", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - taskFutures.forEach(f -> f.cancel(true)); + } catch (Throwable t) { + LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); + taskFutures.forEach(f -> f.cancel(true)); + } finally { + signalEndOfData(queue, localResultSchemaFields); + shutdownExecutor(columnExecutor); + LOG.info("Column fetcher thread finished."); + } } finally { - signalEndOfData(queue, localResultSchemaFields); - shutdownExecutor(columnExecutor); - LOG.info("Column fetcher thread finished."); + backgroundSpan.end(); } }; - Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - + Runnable wrappedColumnFetcher = Context.current().wrap(columnFetcher); + Thread fetcherThread = + new Thread(wrappedColumnFetcher, "getColumns-fetcher-" + effectiveCatalog); fetcherThread.start(); - LOG.info("Started background thread for getColumns"); - return resultSet; + return fetcherThread; } private void processTableColumns( @@ -2274,7 +2374,7 @@ private void processTableColumns( } } - private Schema defineGetColumnsSchema() { + Schema defineGetColumnsSchema() { List fields = new ArrayList<>(24); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -3626,7 +3726,12 @@ public RowIdLifetime getRowIdLifetime() { } @Override - public ResultSet getSchemas(String catalog, String schemaPattern) { + public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { + return withTracing( + "BigQueryDatabaseMetaData.getSchemas", () -> getSchemasImpl(catalog, schemaPattern)); + } + + private ResultSet getSchemasImpl(String catalog, String schemaPattern) throws SQLException { if ((catalog != null && catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty())) { LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); @@ -3635,99 +3740,126 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { LOG.info("getSchemas called for catalog: %s, schemaPattern: %s", catalog, schemaPattern); - final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); final Schema resultSchema = defineGetSchemasSchema(); - final FieldList resultSchemaFields = resultSchema.getFields(); - final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + + Thread fetcherThread = runGetSchemasTaskAsync(catalog, schemaPattern, resultSchema, queue); + + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); + + LOG.info("Started background thread for getSchemas"); + return resultSet; + } + + @VisibleForTesting + Thread runGetSchemasTaskAsync( + String catalog, + String schemaPattern, + Schema resultSchema, + BlockingQueue queue) + throws SQLException { + + final Pattern schemaRegex = compileSqlLikePattern(schemaPattern); + final FieldList resultSchemaFields = resultSchema.getFields(); final List collectedResults = Collections.synchronizedList(new ArrayList<>()); final String catalogParam = catalog; + SpanContext parentSpanContext = Span.current().getSpanContext(); Runnable schemaFetcher = () -> { - final FieldList localResultSchemaFields = resultSchemaFields; - List projectsToScanList = new ArrayList<>(); - - if (catalogParam != null) { - projectsToScanList.add(catalogParam); - } else { - projectsToScanList.addAll(getAccessibleCatalogNames()); - } - - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } - - try { - for (String currentProjectToScan : projectsToScanList) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Schema fetcher interrupted during project iteration for project: " - + currentProjectToScan); - break; - } - LOG.info("Fetching schemas for project: " + currentProjectToScan); - List datasetsInProject = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - currentProjectToScan, - BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + Span backgroundSpan = + this.connection + .getTracer() + .spanBuilder("BigQueryDatabaseMetaData.getSchemas.background") + .setNoParent() + .addLink(parentSpanContext) + .startSpan(); + + try (Scope scope = backgroundSpan.makeCurrent()) { + final FieldList localResultSchemaFields = resultSchemaFields; + List projectsToScanList = new ArrayList<>(); + + if (catalogParam != null) { + projectsToScanList.add(catalogParam); + } else { + projectsToScanList.addAll(getAccessibleCatalogNames()); + } - if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { - LOG.info( - "Fetcher thread found no matching datasets in project: " - + currentProjectToScan); - continue; - } + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; + } - LOG.fine("Processing found datasets for project: " + currentProjectToScan); - for (Dataset dataset : datasetsInProject) { + try { + for (String currentProjectToScan : projectsToScanList) { if (Thread.currentThread().isInterrupted()) { LOG.warning( - "Schema fetcher interrupted during dataset iteration for project: " + "Schema fetcher interrupted during project iteration for project: " + currentProjectToScan); break; } - processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + LOG.info("Fetching schemas for project: " + currentProjectToScan); + List datasetsInProject = + findMatchingBigQueryObjects( + "Dataset", + () -> + bigquery.listDatasets( + currentProjectToScan, + BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + + if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { + LOG.info( + "Fetcher thread found no matching datasets in project: " + + currentProjectToScan); + continue; + } + + LOG.fine("Processing found datasets for project: " + currentProjectToScan); + for (Dataset dataset : datasetsInProject) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning( + "Schema fetcher interrupted during dataset iteration for project: " + + currentProjectToScan); + break; + } + processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + } } - } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); + } catch (Throwable t) { + LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); + } finally { + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Schema fetcher thread finished."); + } } finally { - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Schema fetcher thread finished."); + backgroundSpan.end(); } }; - Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread}); - + Runnable wrappedFetcher = Context.current().wrap(schemaFetcher); + Thread fetcherThread = new Thread(wrappedFetcher, "getSchemas-fetcher-" + catalog); fetcherThread.start(); - LOG.info("Started background thread for getSchemas"); - return resultSet; + return fetcherThread; } Schema defineGetSchemasSchema() { @@ -5304,4 +5436,23 @@ private void loadDriverVersionProperties() { throw ex; } } + + private interface TracedMetadataOperation { + T run() throws SQLException; + } + + private T withTracing(String spanName, TracedMetadataOperation operation) + throws SQLException { + Tracer tracer = this.connection.getTracer(); + Span span = tracer.spanBuilder(spanName).startSpan(); + try (Scope scope = span.makeCurrent()) { + return operation.run(); + } catch (Exception ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java index e62d93fca0ed..a83bdc5093e6 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDriver.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import io.grpc.LoadBalancerRegistry; import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; import java.sql.Connection; import java.sql.Driver; @@ -127,9 +128,12 @@ public Connection connect(String url, Properties info) throws SQLException { LOG.finest("++enter++"); try { if (acceptsURL(url)) { - // strip 'jdbc:' from the URL, add any extra properties + Properties connectInfo = info == null ? new Properties() : (Properties) info.clone(); + Object customOpenTelemetryObj = connectInfo.remove("customOpenTelemetry"); + String connectionUri = - BigQueryJdbcUrlUtility.appendPropertiesToURL(url.substring(5), this.toString(), info); + BigQueryJdbcUrlUtility.appendPropertiesToURL( + url.substring(5), this.toString(), connectInfo); try { BigQueryJdbcUrlUtility.parseUrl(connectionUri); } catch (BigQueryJdbcRuntimeException e) { @@ -137,6 +141,9 @@ public Connection connect(String url, Properties info) throws SQLException { } DataSource ds = DataSource.fromUrl(connectionUri); + if (customOpenTelemetryObj instanceof OpenTelemetry) { + ds.setCustomOpenTelemetry((OpenTelemetry) customOpenTelemetryObj); + } // LogLevel String logLevelStr = ds.getLogLevel(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java new file mode 100644 index 000000000000..181e15629c5b --- /dev/null +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetry.java @@ -0,0 +1,52 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.jdbc; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; + +public class BigQueryJdbcOpenTelemetry { + + static final String INSTRUMENTATION_SCOPE_NAME = "com.google.cloud.bigquery.jdbc"; + + private BigQueryJdbcOpenTelemetry() {} + + /** + * Initializes or returns the OpenTelemetry instance based on hybrid logic. Prefer + * customOpenTelemetry if provided; fallback to an auto-configured GCP exporter if requested. + */ + public static OpenTelemetry getOpenTelemetry( + boolean enableGcpTraceExporter, + boolean enableGcpLogExporter, + OpenTelemetry customOpenTelemetry) { + if (customOpenTelemetry != null) { + return customOpenTelemetry; + } + + if (enableGcpTraceExporter || enableGcpLogExporter) { + // TODO(b/491238299): Initialize and return GCP-specific auto-configured SDK + return OpenTelemetry.noop(); + } + + return OpenTelemetry.noop(); + } + + /** Gets a Tracer for the JDBC driver instrumentation scope. */ + public static Tracer getTracer(OpenTelemetry openTelemetry) { + return openTelemetry.getTracer(INSTRUMENTATION_SCOPE_NAME); + } +} diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java index afc300a6d97c..89a215c6c689 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcUrlUtility.java @@ -159,6 +159,10 @@ protected boolean removeEldestEntry(Map.Entry> eldes static final int DEFAULT_SWA_APPEND_ROW_COUNT_VALUE = 1000; static final String SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME = "SWA_ActivationRowCount"; static final int DEFAULT_SWA_ACTIVATION_ROW_COUNT_VALUE = 3; + static final String ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME = "enableGcpTraceExporter"; + static final boolean DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE = false; + static final String ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME = "enableGcpLogExporter"; + static final boolean DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE = false; private static final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName()); static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME = @@ -604,6 +608,18 @@ protected boolean removeEldestEntry(Map.Entry> eldes .setDescription( "Reason for the request, which is passed as the x-goog-request-reason" + " header.") + .build(), + BigQueryConnectionProperty.newBuilder() + .setName(ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME) + .setDescription( + "Enables or disables GCP OpenTelemetry Trace exporter. Disabled by default.") + .setDefaultValue(String.valueOf(DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE)) + .build(), + BigQueryConnectionProperty.newBuilder() + .setName(ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME) + .setDescription( + "Enables or disables GCP OpenTelemetry Log exporter. Disabled by default.") + .setDefaultValue(String.valueOf(DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE)) .build()))); private static final List NETWORK_PROPERTIES = diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index c2bcdc96a6c1..6c9de7377ebf 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -57,6 +57,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.Uninterruptibles; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.lang.ref.ReferenceQueue; import java.sql.Connection; import java.sql.ResultSet; @@ -239,7 +247,12 @@ public ResultSet executeQuery(String sql) throws SQLException { BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { LOG.finest("++enter++"); checkClosed(); - return executeQueryImpl(sql); + return withTracing( + "BigQueryStatement.executeQuery", + (span) -> { + span.setAttribute("db.statement", sql); + return executeQueryImpl(sql); + }); } } @@ -266,7 +279,12 @@ public long executeLargeUpdate(String sql) throws SQLException { BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { LOG.finest("++enter++"); checkClosed(); - return executeLargeUpdateImpl(sql); + return withTracing( + "BigQueryStatement.executeLargeUpdate", + (span) -> { + span.setAttribute("db.statement", sql); + return executeLargeUpdateImpl(sql); + }); } } @@ -312,7 +330,12 @@ public boolean execute(String sql) throws SQLException { BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { LOG.finest("++enter++"); checkClosed(); - return executeImpl(sql); + return withTracing( + "BigQueryStatement.execute", + (span) -> { + span.setAttribute("db.statement", sql); + return executeImpl(sql); + }); } } @@ -853,81 +876,85 @@ Thread populateArrowBufferedQueue( LOG.finest("++enter++"); Runnable arrowStreamProcessor = - () -> { - long rowsRead = 0; - int retryCount = 0; - try { - // Use the first stream to perform reading. - String streamName = readSession.getStreams(0).getName(); - - while (true) { - try { - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder() - .setReadStream(streamName) - .setOffset(rowsRead) - .build(); - - // Process each block of rows as they arrive and decode using our simple row reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - break; - } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - Uninterruptibles.putUninterruptibly( - arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch)); - rowsRead += response.getRowCount(); - } - break; - } catch (com.google.api.gax.rpc.ApiException e) { - if (e.getStatusCode().getCode() - == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { - LOG.warning("Read session expired or not found: %s", e.getMessage()); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - if (retryCount >= MAX_RETRY_COUNT) { - LOG.log( - Level.SEVERE, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, max retries exceeded", - e); - enqueueError(arrowBatchWrapperBlockingQueue, e); - break; - } - retryCount++; - LOG.warning( - "Connection interrupted during arrow stream read, retrying. attempt: %d", - retryCount); - Thread.sleep(RETRY_DELAY_MS); - } + Context.current() + .wrap( + () -> + processArrowStream(readSession, arrowBatchWrapperBlockingQueue, bqReadClient)); + + Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); + populateBufferWorker.start(); + return populateBufferWorker; + } + + private void processArrowStream( + ReadSession readSession, + BlockingQueue arrowBatchWrapperBlockingQueue, + BigQueryReadClient bqReadClient) { + long rowsRead = 0; + int retryCount = 0; + try { + // Use the first stream to perform reading. + String streamName = readSession.getStreams(0).getName(); + + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).setOffset(rowsRead).build(); + + // Process each block of rows as they arrive and decode using our simple row + // reader. + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + break; } - } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", - e); + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + Uninterruptibles.putUninterruptibly( + arrowBatchWrapperBlockingQueue, BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + } + break; + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found: %s", e.getMessage()); enqueueError(arrowBatchWrapperBlockingQueue, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { + break; + } + if (retryCount >= MAX_RETRY_COUNT) { LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", + Level.SEVERE, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, max retries exceeded", e); enqueueError(arrowBatchWrapperBlockingQueue, e); - } finally { // logic needed for graceful shutdown - enqueueEndOfStream(arrowBatchWrapperBlockingQueue); + break; } - }; + retryCount++; + LOG.warning( + "Connection interrupted during arrow stream read, retrying. attempt: %d", retryCount); + Thread.sleep(RETRY_DELAY_MS); + } + } - Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(arrowStreamProcessor); - populateBufferWorker.start(); - return populateBufferWorker; + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor", + e); + enqueueError(arrowBatchWrapperBlockingQueue, e); + } finally { // logic needed for graceful shutdown + enqueueEndOfStream(arrowBatchWrapperBlockingQueue); + } } /** Executes SQL query using either fast query path or read API */ @@ -1068,57 +1095,18 @@ Thread runNextPageTaskAsync( BlockingQueue> rpcResponseQueue, BlockingQueue bigQueryFieldValueListWrapperBlockingQueue) { LOG.finest("++enter++"); - // parse and put the first page in the pageCache before the other pages are parsed from the RPC - // calls populateFirstPage(result, rpcResponseQueue); - // This thread makes the RPC calls and paginates Runnable nextPageTask = - () -> { - String currentPageToken = firstPageToken; - TableResult currentResults = result; - TableId destinationTable = null; - if (firstPageToken != null) { - destinationTable = getDestinationTable(jobId); - } - - try { - while (currentPageToken != null) { - // do not process further pages and shutdown - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - LOG.warning( - "%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); - break; - } - - long startTime = System.nanoTime(); - currentResults = - this.bigQuery.listTableData( - destinationTable, - TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), - TableDataListOption.pageToken(currentPageToken)); - - currentPageToken = currentResults.getNextPageToken(); - // this will be parsed asynchronously without blocking the current - // thread - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); - LOG.fine( - "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) ((System.nanoTime() - startTime) / 1000000)); - } - } catch (Exception ex) { - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); - } finally { - // this will stop the parseDataTask as well when the pagination - // completes - Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); - } - // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not - // have finished processing the records and even that will be interrupted - }; + Context.current() + .wrap( + () -> + fetchNextPages( + firstPageToken, + jobId, + rpcResponseQueue, + bigQueryFieldValueListWrapperBlockingQueue, + result)); Thread nextPageWorker = JDBC_THREAD_FACTORY.newThread(nextPageTask); nextPageWorker.start(); @@ -1137,63 +1125,11 @@ Thread parseAndPopulateRpcDataAsync( LOG.finest("++enter++"); Runnable populateBufferRunnable = - () -> { // producer thread populating the buffer - try { - Iterable fieldValueLists; - // as we have to process the first page - boolean hasRows = true; - while (hasRows) { - try { - Tuple nextPageTuple = rpcResponseQueue.take(); - if (nextPageTuple.x() != null) { - fieldValueLists = nextPageTuple.x().getValues(); - } else { - fieldValueLists = null; - } - hasRows = nextPageTuple.y(); - - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - // Thread might get interrupted while calling the Cancel method, which is - // expected, so logging this instead of throwing the exception back - break; - } - - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() - || fieldValueLists == null) { - // do not process further pages and shutdown (outerloop) - break; - } - - long startTime = System.nanoTime(); - long results = 0; - for (FieldValueList fieldValueList : fieldValueLists) { - - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - // do not process further pages and shutdown (inner loop) - break; - } - Uninterruptibles.putUninterruptibly( - bigQueryFieldValueListWrapperBlockingQueue, - BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); - results += 1; - } - LOG.fine( - "Processed %d results in %d ms.", - results, (int) ((System.nanoTime() - startTime) / 1000000)); - } - - } catch (Exception ex) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", - ex); - enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); - } finally { - enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); - } - }; + Context.current() + .wrap( + () -> + parseAndPopulateRpcData( + schema, bigQueryFieldValueListWrapperBlockingQueue, rpcResponseQueue)); Thread populateBufferWorker = JDBC_THREAD_FACTORY.newThread(populateBufferRunnable); populateBufferWorker.start(); @@ -1448,13 +1384,31 @@ public void clearBatch() { @Override public int[] executeBatch() throws SQLException { LOG.finest("++enter++"); + return withTracing( + "BigQueryStatement.executeBatch", + (span) -> { + span.setAttribute("db.statement.count", this.batchQueries.size()); + span.setAttribute( + AttributeKey.stringArrayKey("db.batch.statements"), + new ArrayList<>(this.batchQueries)); + + StringBuilder sb = new StringBuilder(); + for (String query : this.batchQueries) { + sb.append(query); + } + String combinedQueries = sb.toString(); + + return executeBatchImpl(combinedQueries); + }); + } + + private int[] executeBatchImpl(String combinedQueries) throws SQLException { int[] result = new int[this.batchQueries.size()]; if (this.batchQueries.isEmpty()) { return result; } try { - String combinedQueries = String.join("", this.batchQueries); QueryJobConfiguration.Builder jobConfiguration = getJobConfig(combinedQueries); jobConfiguration.setPriority(QueryJobConfiguration.Priority.BATCH); runQuery(combinedQueries, jobConfiguration.build()); @@ -1622,4 +1576,143 @@ private void enqueueBufferError(BlockingQueue que private void enqueueBufferEndOfStream(BlockingQueue queue) { Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true)); } + + @FunctionalInterface + private interface TracedOperation { + T run(Span span) throws SQLException; + } + + private T withTracing(String spanName, TracedOperation operation) throws SQLException { + Tracer tracer = this.connection.getTracer(); + Span span = tracer.spanBuilder(spanName).startSpan(); + try (Scope scope = span.makeCurrent()) { + return operation.run(span); + } catch (SQLException | RuntimeException ex) { + span.recordException(ex); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + throw ex; + } finally { + span.end(); + } + } + + private void fetchNextPages( + String firstPageToken, + JobId jobId, + BlockingQueue> rpcResponseQueue, + BlockingQueue bigQueryFieldValueListWrapperBlockingQueue, + TableResult result) { + SpanContext parentSpanContext = Span.current().getSpanContext(); + String currentPageToken = firstPageToken; + TableResult currentResults = result; + TableId destinationTable = null; + if (firstPageToken != null) { + destinationTable = getDestinationTable(jobId); + } + + try { + Tracer tracer = this.connection.getTracer(); + while (currentPageToken != null) { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + LOG.warning("%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); + break; + } + + SpanBuilder spanBuilder = tracer.spanBuilder("BigQueryStatement.pagination"); + if (parentSpanContext.isValid()) { + spanBuilder.addLink(parentSpanContext); + } + Span paginationSpan = spanBuilder.startSpan(); + try (Scope scope = paginationSpan.makeCurrent()) { + paginationSpan.setAttribute("db.pagination.page_token", currentPageToken); + + long startTime = System.nanoTime(); + currentResults = + this.bigQuery.listTableData( + destinationTable, + TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), + TableDataListOption.pageToken(currentPageToken)); + + long duration = (System.nanoTime() - startTime) / 1000000; + paginationSpan.setAttribute("db.pagination.duration_ms", duration); + paginationSpan.setAttribute( + "db.pagination.rows_fetched", querySettings.getMaxResultPerPage()); + + currentPageToken = currentResults.getNextPageToken(); + Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(currentResults, true)); + LOG.fine( + "Fetched %d results from the server in %d ms.", + querySettings.getMaxResultPerPage(), (int) duration); + } catch (Exception e) { + paginationSpan.recordException(e); + paginationSpan.setStatus(StatusCode.ERROR, e.getMessage()); + throw e; + } finally { + paginationSpan.end(); + } + } + } catch (Exception ex) { + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } finally { + Uninterruptibles.putUninterruptibly(rpcResponseQueue, Tuple.of(null, false)); + } + } + + private void parseAndPopulateRpcData( + Schema schema, + BlockingQueue bigQueryFieldValueListWrapperBlockingQueue, + BlockingQueue> rpcResponseQueue) { + try { + Iterable fieldValueLists; + boolean hasRows = true; + while (hasRows) { + try { + Tuple nextPageTuple = rpcResponseQueue.take(); + if (nextPageTuple.x() != null) { + fieldValueLists = nextPageTuple.x().getValues(); + } else { + fieldValueLists = null; + } + hasRows = nextPageTuple.y(); + + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + break; + } + + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() + || fieldValueLists == null) { + break; + } + + long startTime = System.nanoTime(); + long results = 0; + for (FieldValueList fieldValueList : fieldValueLists) { + + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + break; + } + Uninterruptibles.putUninterruptibly( + bigQueryFieldValueListWrapperBlockingQueue, + BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); + results += 1; + } + LOG.fine( + "Processed %d results in %d ms.", + results, (int) ((System.nanoTime() - startTime) / 1000000)); + } + + } catch (Exception ex) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", + ex); + enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex); + } finally { + enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue); + } + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java index 02142363866e..9c0224005b9e 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/DataSource.java @@ -20,6 +20,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.opentelemetry.api.OpenTelemetry; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -113,6 +114,9 @@ public class DataSource implements javax.sql.DataSource { private String privateServiceConnect; private Long connectionPoolSize; private Long listenerPoolSize; + private Boolean enableGcpTraceExporter; + private Boolean enableGcpLogExporter; + private OpenTelemetry customOpenTelemetry; // Make sure the JDBC driver class is loaded. static { @@ -324,6 +328,18 @@ public class DataSource implements javax.sql.DataSource { .put( BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, (ds, val) -> ds.setListenerPoolSize(Long.parseLong(val))) + .put( + BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME, + (ds, val) -> + ds.setEnableGcpTraceExporter( + BigQueryJdbcUrlUtility.convertIntToBoolean( + val, BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME))) + .put( + BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME, + (ds, val) -> + ds.setEnableGcpLogExporter( + BigQueryJdbcUrlUtility.convertIntToBoolean( + val, BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME))) .build(); public static DataSource fromUrl(String url) { @@ -375,7 +391,11 @@ public Connection getConnection() throws SQLException { throw new BigQueryJdbcException( "The URL " + getURL() + " is invalid. Please specify a valid Connection URL. "); } - return DriverManager.getConnection(getURL(), createProperties()); + Properties props = createProperties(); + if (this.customOpenTelemetry != null) { + props.put("customOpenTelemetry", this.customOpenTelemetry); + } + return DriverManager.getConnection(getURL(), props); } private Properties createProperties() { @@ -616,6 +636,16 @@ private Properties createProperties() { BigQueryJdbcUrlUtility.LISTENER_POOL_SIZE_PROPERTY_NAME, String.valueOf(this.listenerPoolSize)); } + if (this.enableGcpTraceExporter != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.ENABLE_GCP_TRACE_EXPORTER_PROPERTY_NAME, + String.valueOf(this.enableGcpTraceExporter)); + } + if (this.enableGcpLogExporter != null) { + connectionProperties.setProperty( + BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME, + String.valueOf(this.enableGcpLogExporter)); + } return connectionProperties; } @@ -737,6 +767,34 @@ public void setListenerPoolSize(Long listenerPoolSize) { this.listenerPoolSize = listenerPoolSize; } + public Boolean getEnableGcpTraceExporter() { + return enableGcpTraceExporter != null + ? enableGcpTraceExporter + : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE; + } + + public void setEnableGcpTraceExporter(Boolean enableGcpTraceExporter) { + this.enableGcpTraceExporter = enableGcpTraceExporter; + } + + public Boolean getEnableGcpLogExporter() { + return enableGcpLogExporter != null + ? enableGcpLogExporter + : BigQueryJdbcUrlUtility.DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE; + } + + public void setEnableGcpLogExporter(Boolean enableGcpLogExporter) { + this.enableGcpLogExporter = enableGcpLogExporter; + } + + public OpenTelemetry getCustomOpenTelemetry() { + return customOpenTelemetry; + } + + public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) { + this.customOpenTelemetry = customOpenTelemetry; + } + public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) { this.highThroughputMinTableSize = highThroughputMinTableSize; } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 4d108ee54f8b..c3fec1b4b5cc 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -30,6 +30,12 @@ import com.google.api.gax.paging.Page; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.BigQuery.RoutineListOption; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.io.InputStream; import java.sql.DatabaseMetaData; @@ -39,16 +45,29 @@ import java.sql.Statement; import java.sql.Types; import java.util.*; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class BigQueryDatabaseMetaDataTest { + @RegisterExtension + public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create(); + private BigQueryConnection bigQueryConnection; private BigQueryDatabaseMetaData dbMetadata; private BigQuery bigqueryClient; @@ -62,6 +81,20 @@ public void setUp() throws SQLException { when(bigQueryConnection.getConnectionUrl()).thenReturn("jdbc:bigquery://test-project"); when(bigQueryConnection.getBigQuery()).thenReturn(bigqueryClient); when(bigQueryConnection.createStatement()).thenReturn(mockStatement); + when(bigQueryConnection.getTracer()) + .thenReturn( + otelTesting + .getOpenTelemetry() + .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME)); + + Page datasetPageMock = mock(Page.class); + when(bigqueryClient.listDatasets(anyString(), any())).thenReturn(datasetPageMock); + + Page
tablePageMock = mock(Page.class); + when(bigqueryClient.listTables(any(DatasetId.class), any())).thenReturn(tablePageMock); + + Table mockTable = mock(Table.class); + when(bigqueryClient.getTable(any(TableId.class))).thenReturn(mockTable); dbMetadata = new BigQueryDatabaseMetaData(bigQueryConnection); } @@ -2917,7 +2950,7 @@ public void testPrepareGetCatalogsRows() { } @Test - public void testGetSchemas_NoArgs_DelegatesCorrectly() { + public void testGetSchemas_NoArgs_DelegatesCorrectly() throws Exception { BigQueryDatabaseMetaData spiedDbMetadata = spy(dbMetadata); ResultSet mockResultSet = mock(ResultSet.class); doReturn(mockResultSet).when(spiedDbMetadata).getSchemas(null, null); @@ -3206,4 +3239,96 @@ public void testSupportsResultSetConcurrency() throws SQLException { public void testGetSQLStateType() throws SQLException { assertEquals(DatabaseMetaData.sqlStateSQL, dbMetadata.getSQLStateType()); } + + @ParameterizedTest + @MethodSource("metadataOperationProvider") + public void testMetadataOperation_generatesSpan( + MetadataOperation operation, String expectedSpanName) throws Exception { + operation.run(); + + SpanData span = + OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName); + OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET); + } + + @FunctionalInterface + interface MetadataOperation { + void run() throws SQLException; + } + + Stream metadataOperationProvider() { + return Stream.of( + Arguments.of( + (MetadataOperation) () -> dbMetadata.getCatalogs(), + "BigQueryDatabaseMetaData.getCatalogs"), + Arguments.of( + (MetadataOperation) () -> dbMetadata.getSchemas("catalog", "schema"), + "BigQueryDatabaseMetaData.getSchemas"), + Arguments.of( + (MetadataOperation) + () -> dbMetadata.getTables("catalog", "schema", "table", new String[] {"TABLE"}), + "BigQueryDatabaseMetaData.getTables"), + Arguments.of( + (MetadataOperation) () -> dbMetadata.getColumns("catalog", "schema", "table", "column"), + "BigQueryDatabaseMetaData.getColumns")); + } + + @ParameterizedTest + @MethodSource("asyncMetadataOperationProvider") + public void testAsyncMetadataOperation_createsDetachedLinkedSpan( + AsyncMetadataOperation operation, String expectedSpanName) throws Exception { + BlockingQueue queue = new LinkedBlockingQueue<>(); + + Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test"); + Span parentSpan = testTracer.spanBuilder("parent-span").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + Thread workerThread = operation.run(queue); + + Assertions.assertNotNull(workerThread, "Worker thread should not be null"); + workerThread.join(); + + OpenTelemetryTestUtility.assertSpanLinkedToParent( + otelTesting.getSpans(), expectedSpanName, parentSpan); + } finally { + parentSpan.end(); + } + } + + @FunctionalInterface + interface AsyncMetadataOperation { + Thread run(BlockingQueue queue) throws Exception; + } + + Stream asyncMetadataOperationProvider() { + return Stream.of( + Arguments.of( + (AsyncMetadataOperation) + (q) -> + dbMetadata.runGetTablesTaskAsync( + "catalog", + "schema", + "table", + new String[] {"TABLE"}, + dbMetadata.defineGetTablesSchema(), + q), + "BigQueryDatabaseMetaData.getTables.background"), + Arguments.of( + (AsyncMetadataOperation) + (q) -> + dbMetadata.runGetColumnsTaskAsync( + "catalog", + "schema", + "table", + "column", + dbMetadata.defineGetColumnsSchema(), + q), + "BigQueryDatabaseMetaData.getColumns.background"), + Arguments.of( + (AsyncMetadataOperation) + (q) -> + dbMetadata.runGetSchemasTaskAsync( + "catalog", "schema", dbMetadata.defineGetSchemasSchema(), q), + "BigQueryDatabaseMetaData.getSchemas.background")); + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java index 85eb254038f3..2fc1ec7f8f87 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDriverTest.java @@ -16,7 +16,9 @@ package com.google.cloud.bigquery.jdbc; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import io.opentelemetry.api.OpenTelemetry; import java.sql.Connection; import java.sql.DriverPropertyInfo; import java.sql.SQLException; @@ -104,4 +106,21 @@ public void testConnectWithInvalidUrlChainsNoException() throws SQLException { new Properties()); assertThat(connection.isClosed()).isFalse(); } + + @Test + public void testConnect_withCustomOpenTelemetry_injectsIntoDataSource() throws SQLException { + OpenTelemetry mockOtel = mock(OpenTelemetry.class); + Properties props = new Properties(); + props.put("customOpenTelemetry", mockOtel); + + // Connect using standard URL setup but pass the SDK via Properties + Connection connection = + bigQueryDriver.connect( + "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;" + + "OAuthType=2;ProjectId=MyBigQueryProject;OAuthAccessToken=redacted;", + props); + + assertThat(connection).isNotNull(); + assertThat(connection.isClosed()).isFalse(); + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java new file mode 100644 index 000000000000..24f77abdd071 --- /dev/null +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcOpenTelemetryTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.jdbc; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Tracer; +import org.junit.jupiter.api.Test; + +public class BigQueryJdbcOpenTelemetryTest { + + @Test + public void testGetOpenTelemetry_withCustomSdk_returnsCustom() { + OpenTelemetry mockCustomOtel = mock(OpenTelemetry.class); + + OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(false, false, mockCustomOtel); + + assertThat(result).isSameInstanceAs(mockCustomOtel); + } + + @Test + public void testGetOpenTelemetry_withCustomSdkAndFlags_returnsCustom() { + OpenTelemetry mockCustomOtel = mock(OpenTelemetry.class); + + // Custom SDK always takes precedence over individual flags + OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(true, true, mockCustomOtel); + + assertThat(result).isSameInstanceAs(mockCustomOtel); + } + + @Test + public void testGetOpenTelemetry_noFlags_returnsNoop() { + OpenTelemetry result = BigQueryJdbcOpenTelemetry.getOpenTelemetry(false, false, null); + + assertThat(result).isSameInstanceAs(OpenTelemetry.noop()); + } + + @Test + public void testGetTracer_respectsScopeName() { + Tracer result = BigQueryJdbcOpenTelemetry.getTracer(OpenTelemetry.noop()); + + assertThat(result).isNotNull(); + } +} diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java index f3971bd71150..cffde5d4daec 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryStatementTest.java @@ -27,8 +27,10 @@ import static org.mockito.Mockito.verify; import com.google.cloud.ServiceOptions; +import com.google.cloud.Tuple; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQuery.QueryResultsOption; +import com.google.cloud.bigquery.BigQuery.TableDataListOption; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; @@ -52,27 +54,47 @@ import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.stream.Stream; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class BigQueryStatementTest { + @RegisterExtension + public static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create(); + private BigQueryConnection bigQueryConnection; private static final String PROJECT = "project"; @@ -124,9 +146,38 @@ private Job getJobMock( return job; } + private TableResult setupMockQueryResults(JobId jobId, StatementType type, Long affectedRows) + throws Exception { + doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode(); + TableResult tableResultMock = mock(TableResult.class); + doReturn(jobId).when(tableResultMock).getJobId(); + doReturn(Schema.of()).when(tableResultMock).getSchema(); + doReturn(tableResultMock) + .when(bigquery) + .queryWithTimeout(any(QueryJobConfiguration.class), any(), any()); + + Job jobMock = getJobMock(tableResultMock, null, type); + if (affectedRows != null) { + JobStatistics.QueryStatistics stats = (JobStatistics.QueryStatistics) jobMock.getStatistics(); + doReturn(affectedRows).when(stats).getNumDmlAffectedRows(); + } + doReturn(jobMock).when(bigquery).getJob(any(JobId.class)); + doReturn(jobMock).when(jobMock).waitFor(); + + Job dryRunJobMock = getJobMock(null, null, type); + doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class)); + return tableResultMock; + } + @BeforeEach public void setUp() throws IOException, SQLException { bigQueryConnection = mock(BigQueryConnection.class); + doReturn( + otelTesting + .getOpenTelemetry() + .getTracer(BigQueryJdbcOpenTelemetry.INSTRUMENTATION_SCOPE_NAME)) + .when(bigQueryConnection) + .getTracer(); rpcFactoryMock = mock(BigQueryRpcFactory.class); bigquery = mock(BigQuery.class); bigQueryConnection.bigQuery = bigquery; @@ -449,21 +500,11 @@ public void testCloseCancelsJob() throws SQLException, InterruptedException { } @Test - public void testCancelWithJoblessQuery() throws SQLException, InterruptedException { - doReturn(true).when(bigQueryConnection).getUseStatelessQueryMode(); + public void testCancelWithJoblessQuery() throws Exception { + TableResult tableResultMock = setupMockQueryResults(null, StatementType.SELECT, null); BigQueryStatement joblessStatement = new BigQueryStatement(bigQueryConnection); BigQueryStatement joblessStatementSpy = Mockito.spy(joblessStatement); - TableResult tableResultMock = mock(TableResult.class); - doReturn(null).when(tableResultMock).getJobId(); - - doReturn(tableResultMock) - .when(bigquery) - .queryWithTimeout(any(QueryJobConfiguration.class), any(), any()); - - Job dryRunJobMock = getJobMock(null, null, StatementType.SELECT); - doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class)); - BigQueryJsonResultSet resultSetMock = mock(BigQueryJsonResultSet.class); doReturn(resultSetMock).when(joblessStatementSpy).processJsonResultSet(tableResultMock); @@ -480,4 +521,115 @@ public void testCancelWithJoblessQuery() throws SQLException, InterruptedExcepti // And no backend cancellation was attempted verify(bigquery, Mockito.never()).cancel(any(JobId.class)); } + + @Test + public void testFetchNextPages_addsLinkToParent() throws Exception { + Tracer testTracer = otelTesting.getOpenTelemetry().getTracer("test"); + Span parentSpan = testTracer.spanBuilder("parent-span").startSpan(); + + try (Scope scope = parentSpan.makeCurrent()) { + + BlockingQueue> rpcResponseQueue = new LinkedBlockingDeque<>(); + BlockingQueue bigQueryFieldValueListWrapperBlockingQueue = + new LinkedBlockingDeque<>(); + TableResult mockResult = mock(TableResult.class); + JobId mockJobId = JobId.of("job"); + + Job mockJob = mock(Job.class); + QueryJobConfiguration realConfig = + QueryJobConfiguration.newBuilder("SELECT 1") + .setDestinationTable(TableId.of("project", "dataset", "table")) + .build(); + doReturn(mockJob).when(bigquery).getJob(any(JobId.class)); + doReturn(realConfig).when(mockJob).getConfiguration(); + + TableResult mockNextResult = mock(TableResult.class); + doReturn(mockNextResult) + .when(bigquery) + .listTableData( + any(TableId.class), any(TableDataListOption.class), any(TableDataListOption.class)); + doReturn(null).when(mockNextResult).getNextPageToken(); + + Thread workerThread = + bigQueryStatement.runNextPageTaskAsync( + mockResult, + "token", + mockJobId, + rpcResponseQueue, + bigQueryFieldValueListWrapperBlockingQueue); + + Assertions.assertNotNull(workerThread, "Worker thread should not be null"); + workerThread.join(); + + OpenTelemetryTestUtility.assertSpanLinkedToParent( + otelTesting.getSpans(), "BigQueryStatement.pagination", parentSpan); + } finally { + parentSpan.end(); + } + } + + @ParameterizedTest + @MethodSource("statementOperationProvider") + public void testExecuteOperation_generatesSpan( + StatementOperation operation, + String expectedSpanName, + StatementType type, + Map, Object> expectedAttributes) + throws Exception { + setupMockQueryResults(JobId.of("job"), type, 1L); + operation.run(); + + SpanData span = + OpenTelemetryTestUtility.findSpanByName(otelTesting.getSpans(), expectedSpanName); + OpenTelemetryTestUtility.assertSpanStatus(span, StatusCode.UNSET); + + if (expectedAttributes != null) { + for (Map.Entry, Object> entry : expectedAttributes.entrySet()) { + OpenTelemetryTestUtility.assertSpanHasAttribute( + span, (AttributeKey) entry.getKey(), entry.getValue()); + } + } + } + + Stream statementOperationProvider() { + return Stream.of( + Arguments.of( + (StatementOperation) () -> bigQueryStatement.executeQuery("SELECT 1"), + "BigQueryStatement.executeQuery", + StatementType.SELECT, + Collections.singletonMap(AttributeKey.stringKey("db.statement"), "SELECT 1")), + Arguments.of( + (StatementOperation) () -> bigQueryStatement.execute("SELECT 1"), + "BigQueryStatement.execute", + StatementType.SELECT, + Collections.singletonMap(AttributeKey.stringKey("db.statement"), "SELECT 1")), + Arguments.of( + (StatementOperation) + () -> bigQueryStatement.executeLargeUpdate("UPDATE table SET col = 1"), + "BigQueryStatement.executeLargeUpdate", + StatementType.UPDATE, + Collections.singletonMap( + AttributeKey.stringKey("db.statement"), "UPDATE table SET col = 1")), + Arguments.of( + (StatementOperation) + () -> { + bigQueryStatement.addBatch("UPDATE table SET col = 1"); + bigQueryStatement.executeBatch(); + }, + "BigQueryStatement.executeBatch", + StatementType.UPDATE, + new HashMap, Object>() { + { + put(AttributeKey.longKey("db.statement.count"), 1L); + put( + AttributeKey.stringArrayKey("db.batch.statements"), + Collections.singletonList("UPDATE table SET col = 1; ")); + } + })); + } + + @FunctionalInterface + interface StatementOperation { + void run() throws Exception; + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/OpenTelemetryTestUtility.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/OpenTelemetryTestUtility.java new file mode 100644 index 000000000000..df8f025a0f80 --- /dev/null +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/OpenTelemetryTestUtility.java @@ -0,0 +1,141 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.jdbc; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.List; +import java.util.Optional; + +public class OpenTelemetryTestUtility { + + /** + * Asserts that a span with the given name exists in the provided list and returns it. + * + * @param spans The list of exported spans. + * @param spanName The name of the span to find. + * @return The found SpanData. + * @throws AssertionError if the span is not found. + */ + public static SpanData findSpanByName(List spans, String spanName) { + Optional span = spans.stream().filter(s -> s.getName().equals(spanName)).findFirst(); + assertTrue(span.isPresent(), "Span with name '" + spanName + "' not found"); + return span.get(); + } + + /** + * Asserts that a span exists in the list. + * + * @param spans The list of exported spans. + * @param spanName The name of the span to find. + */ + public static void assertSpanExists(List spans, String spanName) { + findSpanByName(spans, spanName); + } + + /** + * Asserts that a span has a specific attribute key and value. + * + * @param span The span to check. + * @param key The attribute key. + * @param expectedValue The expected value of the attribute. + * @param The type of the attribute value. + */ + public static void assertSpanHasAttribute( + SpanData span, AttributeKey key, T expectedValue) { + T actualValue = span.getAttributes().get(key); + assertNotNull( + actualValue, "Attribute '" + key.getKey() + "' not found on span '" + span.getName() + "'"); + assertEquals( + expectedValue, + actualValue, + "Attribute '" + key.getKey() + "' value mismatch on span '" + span.getName() + "'"); + } + + /** + * Asserts the status of a span. + * + * @param span The span to check. + * @param expectedStatus The expected StatusCode. + */ + public static void assertSpanStatus(SpanData span, StatusCode expectedStatus) { + assertEquals( + expectedStatus, + span.getStatus().getStatusCode(), + "Status code mismatch on span '" + span.getName() + "'"); + } + + /** + * Asserts that an exception of a specific type was recorded on the span. + * + * @param span The span to check. + * @param exceptionClass The class of the expected exception. + */ + public static void assertSpanHasException( + SpanData span, Class exceptionClass) { + boolean found = + span.getEvents().stream() + .anyMatch( + event -> + event.getName().equals("exception") + && exceptionClass + .getName() + .equals( + event + .getAttributes() + .get(AttributeKey.stringKey("exception.type")))); + assertTrue( + found, + "Exception of type " + + exceptionClass.getName() + + " not found in events of span '" + + span.getName() + + "'"); + } + + /** + * Asserts that a span is linked to a parent span. + * + * @param spans The list of exported spans. + * @param spanName The name of the span that should have the link. + * @param parentSpan The parent span it should be linked to. + */ + public static void assertSpanLinkedToParent( + List spans, String spanName, Span parentSpan) { + boolean found = + spans.stream() + .anyMatch( + span -> + span.getName().equals(spanName) + && span.getLinks().stream() + .anyMatch( + link -> + link.getSpanContext() + .getTraceId() + .equals(parentSpan.getSpanContext().getTraceId()) + && link.getSpanContext() + .getSpanId() + .equals(parentSpan.getSpanContext().getSpanId()))); + assertTrue(found, "Span " + spanName + " not found or not linked to parent"); + } +}