diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index af041dc2a649..84e36e28da4a 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -212,7 +212,13 @@ public void close() { @Override public boolean next() throws SQLException { + BigQueryJdbcMdc.setContextPersistent(this.connectionId); + LOG.finest("++enter++"); checkClosed(); + return nextImpl(); + } + + private boolean nextImpl() throws SQLException { if (this.isNested) { if (this.currentNestedBatch == null || this.currentNestedBatch.getNestedRecords() == null) { IllegalStateException ex = @@ -319,10 +325,14 @@ private Object getObjectInternal(int columnIndex) throws SQLException { @Override public Object getObject(int columnIndex) throws SQLException { - + BigQueryJdbcMdc.setContextPersistent(this.connectionId); // columnIndex is SQL index starting at 1 LOG.finest("++enter++"); checkClosed(); + return getObjectImpl(columnIndex); + } + + private Object getObjectImpl(int columnIndex) throws SQLException { Object value = getObjectInternal(columnIndex); if (value == null) { return null; @@ -451,14 +461,22 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp } @Override - public void close() { - LOG.fine("Closing BigqueryArrowResultSet %s.", this); - this.isClosed = true; - if (ownedThread != null && !ownedThread.isInterrupted()) { - // interrupt the producer thread when result set is closed - ownedThread.interrupt(); + public void close() throws SQLException { + if (isClosed()) { + return; + } + + try (BigQueryJdbcMdc.MdcCloseable mdc = + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + LOG.finest("++enter++"); + LOG.fine("Closing BigqueryArrowResultSet %s.", this); + this.isClosed = true; + if (ownedThread != null && !ownedThread.isInterrupted()) { + // interrupt the producer thread when result set is closed + ownedThread.interrupt(); + } + super.close(); } - super.close(); } @Override diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java index d63b72bb6c93..c1207d4a4167 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryBaseResultSet.java @@ -58,6 +58,8 @@ public abstract class BigQueryBaseResultSet extends BigQueryNoOpsResultSet protected boolean isClosed = false; protected boolean wasNull = false; protected final BigQueryTypeCoercer bigQueryTypeCoercer = BigQueryTypeCoercionUtility.INSTANCE; + protected BigQueryConnection connection = null; + protected String connectionId = null; protected BigQueryBaseResultSet( BigQuery bigQuery, BigQueryStatement statement, Schema schema, boolean isNested) { @@ -66,6 +68,8 @@ protected BigQueryBaseResultSet( this.schema = schema; this.schemaFieldList = schema != null ? schema.getFields() : null; this.isNested = isNested; + this.connection = statement != null ? (BigQueryConnection) statement.getConnection() : null; + this.connectionId = this.connection != null ? this.connection.getConnectionId() : null; } public QueryStatistics getQueryStatistics() { @@ -97,7 +101,7 @@ public String getQueryId() { } @Override - public void close() { + public void close() throws SQLException { try { if (statement != null && statement.isCloseOnCompletion() && !statement.hasMoreResults()) { statement.close(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java index 822b4e282f74..3a01f7aa475b 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryConnection.java @@ -291,6 +291,7 @@ String getLibraryVersion(Class libraryClass) { } HeaderProvider createHeaderProvider() { + LOG.finest("++enter++"); String headerToken = DEFAULT_JDBC_TOKEN_VALUE + "/" + getLibraryVersion(this.getClass()); if (this.partnerToken != null && !this.partnerToken.isEmpty()) { headerToken += this.partnerToken; @@ -304,11 +305,13 @@ HeaderProvider createHeaderProvider() { } protected void addOpenStatements(Statement statement) { + LOG.finest("++enter++"); LOG.finest("Statement %s added to Connection %s.", statement, this); this.openStatements.add(statement); } BigQueryReadClient getBigQueryReadClient() { + LOG.finest("++enter++"); try { if (this.bigQueryReadClient == null) { this.bigQueryReadClient = getBigQueryReadClientConnection(); @@ -320,6 +323,7 @@ BigQueryReadClient getBigQueryReadClient() { } BigQueryWriteClient getBigQueryWriteClient() { + LOG.finest("++enter++"); try { if (this.bigQueryWriteClient == null) { this.bigQueryWriteClient = getBigQueryWriteClientConnection(); @@ -331,14 +335,17 @@ BigQueryWriteClient getBigQueryWriteClient() { } BigQuery getBigQuery() { + LOG.finest("++enter++"); return this.bigQuery; } String getConnectionUrl() { + LOG.finest("++enter++"); return connectionUrl; } String getConnectionId() { + LOG.finest("++enter++"); return this.connectionId; } @@ -447,6 +454,7 @@ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) thr @Override public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + LOG.finest("++enter++"); throw new BigQueryJdbcSqlFeatureNotSupportedException("autoGeneratedKeys is not supported"); } @@ -483,77 +491,95 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res } public DatasetId getDefaultDataset() { + LOG.finest("++enter++"); checkClosed(); return this.defaultDataset; } String getDestinationDataset() { + LOG.finest("++enter++"); return this.destinationDataset; } String getDestinationTable() { + LOG.finest("++enter++"); return this.destinationTable; } long getDestinationDatasetExpirationTime() { + LOG.finest("++enter++"); return this.destinationDatasetExpirationTime; } String getKmsKeyName() { + LOG.finest("++enter++"); return this.kmsKeyName; } List getQueryProperties() { + LOG.finest("++enter++"); return this.queryProperties; } public String getLocation() { + LOG.finest("++enter++"); checkClosed(); return this.location; } public Map getAuthProperties() { + LOG.finest("++enter++"); checkClosed(); return this.authProperties; } long getMaxResults() { + LOG.finest("++enter++"); return maxResults; } long getRetryTimeoutInSeconds() { + LOG.finest("++enter++"); return this.retryTimeoutInSeconds; } Duration getRetryTimeoutDuration() { + LOG.finest("++enter++"); return this.retryTimeoutDuration; } long getRetryInitialDelayInSeconds() { + LOG.finest("++enter++"); return this.retryInitialDelayInSeconds; } Duration getRetryInitialDelayDuration() { + LOG.finest("++enter++"); return this.retryInitialDelayDuration; } long getRetryMaxDelayInSeconds() { + LOG.finest("++enter++"); return this.retryMaxDelayInSeconds; } Duration getRetryMaxDelayDuration() { + LOG.finest("++enter++"); return this.retryMaxDelayDuration; } long getJobTimeoutInSeconds() { + LOG.finest("++enter++"); return this.jobTimeoutInSeconds; } long getMaxBytesBilled() { + LOG.finest("++enter++"); return this.maxBytesBilled; } Map getLabels() { + LOG.finest("++enter++"); return this.labels; } @@ -566,6 +592,7 @@ Map getLabels() { */ private void beginTransaction() { LOG.finest("++enter++"); + LOG.info("Beginning transaction"); QueryJobConfiguration.Builder transactionBeginJobConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;"); try { @@ -592,110 +619,135 @@ private void beginTransaction() { } public boolean isTransactionStarted() { + LOG.finest("++enter++"); return this.transactionStarted; } boolean isSessionEnabled() { + LOG.finest("++enter++"); return this.enableSession; } boolean isUnsupportedHTAPIFallback() { + LOG.finest("++enter++"); return this.unsupportedHTAPIFallback; } ConnectionProperty getSessionInfoConnectionProperty() { + LOG.finest("++enter++"); return this.sessionInfoConnectionProperty; } boolean isEnableHighThroughputAPI() { + LOG.finest("++enter++"); return this.enableHighThroughputAPI; } boolean isUseQueryCache() { + LOG.finest("++enter++"); return useQueryCache; } boolean getUseStatelessQueryMode() { + LOG.finest("++enter++"); return useStatelessQueryMode; } boolean isAllowLargeResults() { + LOG.finest("++enter++"); return allowLargeResults; } String getQueryDialect() { + LOG.finest("++enter++"); return queryDialect; } Integer getNumBufferedRows() { + LOG.finest("++enter++"); return numBufferedRows; } int getHighThroughputMinTableSize() { + LOG.finest("++enter++"); return highThroughputMinTableSize; } String getAdditionalProjects() { + LOG.finest("++enter++"); return this.additionalProjects; } int getHighThroughputActivationRatio() { + LOG.finest("++enter++"); return highThroughputActivationRatio; } boolean isFilterTablesOnDefaultDataset() { + LOG.finest("++enter++"); return this.filterTablesOnDefaultDataset; } int isRequestGoogleDriveScope() { + LOG.finest("++enter++"); return requestGoogleDriveScope; } int getMetadataFetchThreadCount() { + LOG.finest("++enter++"); return this.metadataFetchThreadCount; } boolean isEnableWriteAPI() { + LOG.finest("++enter++"); return enableWriteAPI; } int getWriteAPIActivationRowCount() { + LOG.finest("++enter++"); return writeAPIActivationRowCount; } int getWriteAPIAppendRowCount() { + LOG.finest("++enter++"); return writeAPIAppendRowCount; } String getSSLTrustStorePath() { + LOG.finest("++enter++"); return sslTrustStorePath; } String getSSLTrustStorePassword() { + LOG.finest("++enter++"); return sslTrustStorePassword; } Integer getHttpConnectTimeout() { + LOG.finest("++enter++"); return httpConnectTimeout; } Integer getHttpReadTimeout() { + LOG.finest("++enter++"); return httpReadTimeout; } Long getConnectionPoolSize() { + LOG.finest("++enter++"); return connectionPoolSize; } Long getListenerPoolSize() { + LOG.finest("++enter++"); return listenerPoolSize; } @Override public boolean isValid(int timeout) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); + LOG.info("Validating connection"); if (timeout < 0) { throw new BigQueryJdbcException("timeout must be >= 0"); } @@ -718,8 +770,7 @@ public boolean isValid(int timeout) throws SQLException { @Override public void abort(Executor executor) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); close(); } @@ -727,43 +778,49 @@ public void abort(Executor executor) throws SQLException { @Override public void setClientInfo(String name, String value) { + LOG.finest("++enter++"); // no-op } @Override public String getClientInfo(String name) { + LOG.finest("++enter++"); return null; } @Override public String getCatalog() { + LOG.finest("++enter++"); return this.catalog; } @Override public Properties getClientInfo() { + LOG.finest("++enter++"); return null; } @Override public void setClientInfo(Properties properties) { + LOG.finest("++enter++"); // no-op } @Override public SQLWarning getWarnings() { + LOG.finest("++enter++"); return this.sqlWarnings.isEmpty() ? null : this.sqlWarnings.get(0); } @Override public void clearWarnings() { + LOG.finest("++enter++"); this.sqlWarnings.clear(); } @Override public boolean getAutoCommit() { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); return this.autoCommit; @@ -772,8 +829,7 @@ public boolean getAutoCommit() { @Override public void setAutoCommit(boolean autoCommit) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); checkIfEnabledSession("setAutoCommit"); @@ -834,6 +890,8 @@ public void rollback() throws SQLException { } private void rollbackImpl() throws SQLException { + LOG.finest("++enter++"); + LOG.info("Rolling back transaction"); try { QueryJobConfiguration transactionRollbackJobConfig = QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;") @@ -864,13 +922,13 @@ public DatabaseMetaData getMetaData() throws SQLException { @Override public int getTransactionIsolation() { + LOG.finest("++enter++"); return Connection.TRANSACTION_SERIALIZABLE; } @Override public void setTransactionIsolation(int level) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); if (level != Connection.TRANSACTION_SERIALIZABLE) { throw new BigQueryJdbcSqlFeatureNotSupportedException( @@ -882,13 +940,13 @@ public void setTransactionIsolation(int level) throws SQLException { @Override public int getHoldability() { + LOG.finest("++enter++"); return this.holdability; } @Override public void setHoldability(int holdability) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { if (holdability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { throw new BigQueryJdbcSqlFeatureNotSupportedException( "CLOSE_CURSORS_AT_COMMIT not supported"); @@ -906,6 +964,7 @@ public void setHoldability(int holdability) throws SQLException { */ @Override public void close() throws SQLException { + LOG.finest("++enter++"); if (isClosed()) { return; } @@ -919,6 +978,8 @@ public void close() throws SQLException { } private void closeImpl() throws SQLException { + LOG.finest("++enter++"); + LOG.info("Closing connection resources"); try { if (this.bigQueryReadClient != null) { this.bigQueryReadClient.shutdown(); @@ -949,10 +1010,12 @@ private void closeImpl() throws SQLException { @Override public boolean isClosed() { + LOG.finest("++enter++"); return this.isClosed; } private void checkClosed() { + LOG.finest("++enter++"); if (isClosed()) { IllegalStateException ex = new IllegalStateException("This " + getClass().getName() + " has been closed"); @@ -962,6 +1025,7 @@ private void checkClosed() { } private void checkIfEnabledSession(String methodName) { + LOG.finest("++enter++"); if (!this.enableSession) { IllegalStateException ex = new IllegalStateException( @@ -1002,10 +1066,12 @@ private List convertMapToConnectionPropertiesList( } void removeStatement(Statement statement) { + LOG.finest("++enter++"); this.openStatements.remove(statement); } private BigQuery getBigQueryConnection() { + LOG.finest("++enter++"); BigQueryOptions.Builder bigQueryOptions = BigQueryOptions.newBuilder(); if (this.retryTimeoutInSeconds > 0L || (this.retryInitialDelayInSeconds > 0L && this.retryMaxDelayInSeconds > 0L)) { @@ -1052,6 +1118,7 @@ private BigQuery getBigQueryConnection() { } private BigQueryReadClient getBigQueryReadClientConnection() throws IOException { + LOG.finest("++enter++"); BigQueryReadSettings.Builder bigQueryReadSettings = BigQueryReadSettings.newBuilder().setHeaderProvider(this.headerProvider); if (getRetrySettings() != null) { @@ -1094,6 +1161,7 @@ private BigQueryReadClient getBigQueryReadClientConnection() throws IOException } private BigQueryWriteClient getBigQueryWriteClientConnection() throws IOException { + LOG.finest("++enter++"); BigQueryWriteSettings.Builder bigQueryWriteSettings = BigQueryWriteSettings.newBuilder().setHeaderProvider(this.headerProvider); if (getRetrySettings() != null) { @@ -1124,6 +1192,7 @@ private BigQueryWriteClient getBigQueryWriteClientConnection() throws IOExceptio } RetrySettings getRetrySettings() { + LOG.finest("++enter++"); RetrySettings.Builder retrySettingsBuilder = null; if (this.retryTimeoutInSeconds > 0L @@ -1141,6 +1210,8 @@ RetrySettings getRetrySettings() { } private void commitTransaction() { + LOG.finest("++enter++"); + LOG.info("Committing transaction"); try { QueryJobConfiguration transactionCommitJobConfig = QueryJobConfiguration.newBuilder("COMMIT TRANSACTION;") diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index e81601bd77ab..717689258687 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -84,9 +84,9 @@ * * @see BigQueryStatement */ -// TODO(neenu): test and verify after post MVP implementation. class BigQueryDatabaseMetaData implements DatabaseMetaData { - final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); + static final BigQueryJdbcCustomLogger LOG = + new BigQueryJdbcCustomLogger(BigQueryDatabaseMetaData.class.getName()); private static final String DATABASE_PRODUCT_NAME = "Google BigQuery"; private static final String DATABASE_PRODUCT_VERSION = "2.0"; private static final String DRIVER_NAME = "GoogleJDBCDriverForGoogleBigQuery"; @@ -140,6 +140,7 @@ class BigQueryDatabaseMetaData implements DatabaseMetaData { String URL; BigQueryConnection connection; Statement statement = null; + private final String connectionId; private final BigQuery bigquery; private final int metadataFetchThreadCount; private static final AtomicReference parsedDriverVersion = new AtomicReference<>(null); @@ -149,8 +150,10 @@ class BigQueryDatabaseMetaData implements DatabaseMetaData { new AtomicReference<>(null); BigQueryDatabaseMetaData(BigQueryConnection connection) { + LOG.finest("++enter++"); this.URL = connection.getConnectionUrl(); this.connection = connection; + this.connectionId = connection != null ? connection.getConnectionId() : null; this.bigquery = connection.getBigQuery(); this.metadataFetchThreadCount = connection.getMetadataFetchThreadCount(); loadDriverVersionProperties(); @@ -158,6 +161,7 @@ class BigQueryDatabaseMetaData implements DatabaseMetaData { @Override public boolean allProceduresAreCallable() { + LOG.finest("++enter++"); // Returns false because BigQuery's IAM permissions can allow a user // to discover a procedure's existence without having rights to execute it. return false; @@ -165,6 +169,7 @@ public boolean allProceduresAreCallable() { @Override public boolean allTablesAreSelectable() { + LOG.finest("++enter++"); // Returns true to ensure maximum compatibility with client applications // that expect a positive response to discover and list all available tables. return true; @@ -172,136 +177,163 @@ public boolean allTablesAreSelectable() { @Override public String getURL() { + LOG.finest("++enter++"); return this.URL; } @Override public String getUserName() { + LOG.finest("++enter++"); return null; } @Override public boolean isReadOnly() { + LOG.finest("++enter++"); return false; } @Override public boolean nullsAreSortedHigh() { + LOG.finest("++enter++"); return false; } @Override public boolean nullsAreSortedLow() { + LOG.finest("++enter++"); return false; } @Override public boolean nullsAreSortedAtStart() { + LOG.finest("++enter++"); return false; } @Override public boolean nullsAreSortedAtEnd() { + LOG.finest("++enter++"); return false; } @Override public String getDatabaseProductName() { + LOG.finest("++enter++"); return DATABASE_PRODUCT_NAME; } @Override public String getDatabaseProductVersion() { + LOG.finest("++enter++"); return DATABASE_PRODUCT_VERSION; } @Override public String getDriverName() { + LOG.finest("++enter++"); return DRIVER_NAME; } @Override public String getDriverVersion() { + LOG.finest("++enter++"); return parsedDriverVersion.get() != null ? parsedDriverVersion.get() : DRIVER_DEFAULT_VERSION; } @Override public int getDriverMajorVersion() { + LOG.finest("++enter++"); return parsedDriverMajorVersion.get() != null ? parsedDriverMajorVersion.get() : 0; } @Override public int getDriverMinorVersion() { + LOG.finest("++enter++"); return parsedDriverMinorVersion.get() != null ? parsedDriverMinorVersion.get() : 0; } @Override public boolean usesLocalFiles() { + LOG.finest("++enter++"); return false; } @Override public boolean usesLocalFilePerTable() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMixedCaseIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesUpperCaseIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesLowerCaseIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesMixedCaseIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMixedCaseQuotedIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesUpperCaseQuotedIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesLowerCaseQuotedIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public boolean storesMixedCaseQuotedIdentifiers() { + LOG.finest("++enter++"); return false; } @Override public String getIdentifierQuoteString() { + LOG.finest("++enter++"); return GOOGLE_SQL_QUOTED_IDENTIFIER; } @Override public String getSQLKeywords() { + LOG.finest("++enter++"); return GOOGLE_SQL_RESERVED_KEYWORDS; } @Override public String getNumericFunctions() { + LOG.finest("++enter++"); return GOOGLE_SQL_NUMERIC_FNS; } @Override public String getStringFunctions() { + LOG.finest("++enter++"); return GOOGLE_SQL_STRING_FNS; } @@ -309,319 +341,382 @@ public String getStringFunctions() { // GoogleSQL has UDF (user defined functions). // System functions like DATABASE(), USER() are not supported. public String getSystemFunctions() { + LOG.finest("++enter++"); return null; } @Override public String getTimeDateFunctions() { + LOG.finest("++enter++"); return GOOGLE_SQL_TIME_DATE_FNS; } @Override public String getSearchStringEscape() { + LOG.finest("++enter++"); return GOOGLE_SQL_ESCAPE; } @Override // No extra characters beyond a-z, A-Z, 0-9 and _ public String getExtraNameCharacters() { + LOG.finest("++enter++"); return null; } @Override public boolean supportsAlterTableWithAddColumn() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsAlterTableWithDropColumn() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsColumnAliasing() { + LOG.finest("++enter++"); return true; } @Override public boolean nullPlusNonNullIsNull() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsConvert() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsConvert(int fromType, int toType) { + LOG.finest("++enter++"); return false; } @Override public boolean supportsTableCorrelationNames() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsDifferentTableCorrelationNames() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsExpressionsInOrderBy() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsOrderByUnrelated() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsGroupBy() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsGroupByUnrelated() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsGroupByBeyondSelect() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsLikeEscapeClause() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMultipleResultSets() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMultipleTransactions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsNonNullableColumns() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMinimumSQLGrammar() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCoreSQLGrammar() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsExtendedSQLGrammar() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsANSI92EntryLevelSQL() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsANSI92IntermediateSQL() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsANSI92FullSQL() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsIntegrityEnhancementFacility() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsOuterJoins() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsFullOuterJoins() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsLimitedOuterJoins() { + LOG.finest("++enter++"); return false; } @Override public String getSchemaTerm() { + LOG.finest("++enter++"); return SCHEMA_TERM; } @Override public String getProcedureTerm() { + LOG.finest("++enter++"); return PROCEDURE_TERM; } @Override public String getCatalogTerm() { + LOG.finest("++enter++"); return CATALOG_TERM; } @Override public boolean isCatalogAtStart() { + LOG.finest("++enter++"); return true; } @Override public String getCatalogSeparator() { + LOG.finest("++enter++"); return GOOGLE_SQL_CATALOG_SEPARATOR; } @Override public boolean supportsSchemasInDataManipulation() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSchemasInProcedureCalls() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSchemasInTableDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSchemasInIndexDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSchemasInPrivilegeDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCatalogsInDataManipulation() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCatalogsInProcedureCalls() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCatalogsInTableDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCatalogsInIndexDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCatalogsInPrivilegeDefinitions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsPositionedDelete() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsPositionedUpdate() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSelectForUpdate() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsStoredProcedures() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSubqueriesInComparisons() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSubqueriesInExists() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSubqueriesInIns() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsSubqueriesInQuantifieds() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsCorrelatedSubqueries() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsUnion() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsUnionAll() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsOpenCursorsAcrossCommit() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsOpenCursorsAcrossRollback() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsOpenStatementsAcrossCommit() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsOpenStatementsAcrossRollback() { + LOG.finest("++enter++"); return false; } @Override // No limit public int getMaxBinaryLiteralLength() { + LOG.finest("++enter++"); return 0; } @Override // No Limit public int getMaxCharLiteralLength() { + LOG.finest("++enter++"); return 0; } @@ -629,58 +724,68 @@ public int getMaxCharLiteralLength() { // GoogleSQL documentation says 300. // https://cloud.google.com/bigquery/quotas#all_tables public int getMaxColumnNameLength() { + LOG.finest("++enter++"); return GOOGLE_SQL_MAX_COL_NAME_LEN; } @Override // No specific limits for group by. public int getMaxColumnsInGroupBy() { + LOG.finest("++enter++"); return 0; } @Override // No specific limits for index. public int getMaxColumnsInIndex() { + LOG.finest("++enter++"); return 0; } @Override // No specific limit for Order By. public int getMaxColumnsInOrderBy() { + LOG.finest("++enter++"); return 0; } @Override // All columns can be selected. No specific limits. public int getMaxColumnsInSelect() { + LOG.finest("++enter++"); return 0; } @Override public int getMaxColumnsInTable() { + LOG.finest("++enter++"); return GOOGLE_SQL_MAX_COLS_PER_TABLE; } @Override public int getMaxConnections() { + LOG.finest("++enter++"); // Per JDBC spec, returns 0 as there is no connection limit or is unknown. return 0; } @Override public int getMaxCursorNameLength() { + LOG.finest("++enter++"); // BigQuery does not support named cursors or positioned updates/deletes. return 0; } @Override public int getMaxIndexLength() { + LOG.finest("++enter++"); // Per the JDBC spec, 0 indicates this feature is not supported. return 0; } @Override public int getMaxSchemaNameLength() { + LOG.finest("++enter++"); // Dataset IDs can be up to 1024 characters long. // See: https://cloud.google.com/bigquery/docs/datasets#dataset-naming return 1024; @@ -688,6 +793,7 @@ public int getMaxSchemaNameLength() { @Override public int getMaxProcedureNameLength() { + LOG.finest("++enter++"); // Routine IDs can be up to 256 characters long. // See: // https://cloud.google.com/bigquery/docs/reference/rest/v2/routines#RoutineReference.FIELDS.routine_id @@ -696,6 +802,7 @@ public int getMaxProcedureNameLength() { @Override public int getMaxCatalogNameLength() { + LOG.finest("++enter++"); // Corresponds to the BigQuery Project ID, which can be a maximum of 30 characters. // See: // https://cloud.google.com/resource-manager/docs/creating-managing-projects#before_you_begin @@ -704,17 +811,20 @@ public int getMaxCatalogNameLength() { @Override public int getMaxRowSize() { + LOG.finest("++enter++"); // Per JDBC spec, returns 0 as there is no fixed limit or is unknown. return 0; } @Override public boolean doesMaxRowSizeIncludeBlobs() { + LOG.finest("++enter++"); return false; } @Override public int getMaxStatementLength() { + LOG.finest("++enter++"); // Per JDBC spec, returns 0 as there is no fixed limit or is unknown. // See: https://cloud.google.com/bigquery/quotas#query_jobs return 0; @@ -722,12 +832,14 @@ public int getMaxStatementLength() { @Override public int getMaxStatements() { + LOG.finest("++enter++"); // Per JDBC spec, returns 0 as there is no fixed limit or is unknown. return 0; } @Override public int getMaxTableNameLength() { + LOG.finest("++enter++"); // Table IDs can be up to 1024 characters long. // See: https://cloud.google.com/bigquery/docs/tables#table-naming return 1024; @@ -735,6 +847,7 @@ public int getMaxTableNameLength() { @Override public int getMaxTablesInSelect() { + LOG.finest("++enter++"); // BigQuery allows up to 1,000 tables to be referenced per query. // See: https://cloud.google.com/bigquery/quotas#query_jobs return 1000; @@ -742,47 +855,76 @@ public int getMaxTablesInSelect() { @Override public int getMaxUserNameLength() { + LOG.finest("++enter++"); return 0; } @Override public int getDefaultTransactionIsolation() { + LOG.finest("++enter++"); return Connection.TRANSACTION_SERIALIZABLE; } @Override public boolean supportsTransactions() { + LOG.finest("++enter++"); return true; } @Override public boolean supportsTransactionIsolationLevel(int level) { + LOG.finest("++enter++"); return level == Connection.TRANSACTION_SERIALIZABLE; } @Override public boolean supportsDataDefinitionAndDataManipulationTransactions() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsDataManipulationTransactionsOnly() { + LOG.finest("++enter++"); return false; } @Override public boolean dataDefinitionCausesTransactionCommit() { + LOG.finest("++enter++"); return false; } @Override public boolean dataDefinitionIgnoredInTransactions() { + LOG.finest("++enter++"); return false; } @Override public ResultSet getProcedures( String catalog, String schemaPattern, String procedureNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest( + "++enter++ catalog: " + + catalog + + ", schemaPattern: " + + schemaPattern + + ", procedureNamePattern: " + + procedureNamePattern); + return getProceduresImpl(catalog, schemaPattern, procedureNamePattern); + } + } + + private ResultSet getProceduresImpl( + String catalog, String schemaPattern, String procedureNamePattern) { + LOG.finest( + "++enter++ catalog: " + + catalog + + ", schemaPattern: " + + schemaPattern + + ", procedureNamePattern: " + + procedureNamePattern); if ((catalog == null || catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty()) || (procedureNamePattern != null && procedureNamePattern.isEmpty())) { @@ -821,8 +963,7 @@ public ResultSet getProcedures( (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaPattern, - schemaRegex, - LOG); + schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Finishing."); @@ -832,7 +973,7 @@ public ResultSet getProcedures( apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); routineProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - LOG.fine("Submitting parallel findMatchingRoutines tasks..."); + LOG.info("Submitting parallel findMatchingRoutines tasks..."); for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { LOG.warning("Fetcher interrupted during dataset iteration submission."); @@ -855,15 +996,14 @@ public ResultSet getProcedures( name)), (rt) -> rt.getRoutineId().getRoutine(), procedureNamePattern, - procedureNameRegex, - LOG); + procedureNameRegex); Future> apiFuture = apiExecutor.submit(apiCallable); apiFutures.add(apiFuture); } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingRoutines tasks."); + LOG.info("Finished submitting " + apiFutures.size() + " findMatchingRoutines tasks."); apiExecutor.shutdown(); - LOG.fine("Processing results from findMatchingRoutines tasks..."); + LOG.info("Processing results from findMatchingRoutines tasks..."); for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { LOG.warning("Fetcher interrupted while processing API futures."); @@ -876,7 +1016,7 @@ public ResultSet getProcedures( if (Thread.currentThread().isInterrupted()) break; if ("PROCEDURE".equalsIgnoreCase(routine.getRoutineType())) { - LOG.fine( + LOG.info( "Submitting processing task for procedure: " + routine.getRoutineId()); final Routine finalRoutine = routine; Future processFuture = @@ -905,7 +1045,7 @@ public ResultSet getProcedures( } } - LOG.fine( + LOG.info( "Finished submitting " + processingTaskFutures.size() + " processProcedureInfo tasks."); @@ -915,15 +1055,15 @@ public ResultSet getProcedures( "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); processingTaskFutures.forEach(f -> f.cancel(true)); } else { - LOG.fine("Waiting for processProcedureInfo tasks to complete..."); + LOG.info("Waiting for processProcedureInfo tasks to complete..."); waitForTasksCompletion(processingTaskFutures); - LOG.fine("All processProcedureInfo tasks completed or handled."); + LOG.info("All processProcedureInfo tasks completed or handled."); } if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetProceduresComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getProcedures", LOG); + sortResults(collectedResults, comparator, "getProcedures"); } if (!Thread.currentThread().isInterrupted()) { @@ -952,6 +1092,7 @@ public ResultSet getProcedures( } Schema defineGetProceduresSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(9); fields.add( Field.newBuilder("PROCEDURE_CAT", StandardSQLTypeName.STRING) @@ -996,7 +1137,7 @@ void processProcedureInfo( Routine routine, List collectedResults, FieldList resultSchemaFields) { RoutineId routineId = routine.getRoutineId(); - LOG.fine("Processing procedure info for: " + routineId); + LOG.info("Processing procedure info for: " + routineId); try { if (!"PROCEDURE".equalsIgnoreCase(routine.getRoutineType())) { @@ -1030,7 +1171,7 @@ void processProcedureInfo( FieldValueList rowFvl = FieldValueList.of(values, resultSchemaFields); collectedResults.add(rowFvl); - LOG.fine("Processed and added procedure info row for: " + routineId); + LOG.info("Processed and added procedure info row for: " + routineId); } catch (Exception e) { LOG.warning( @@ -1040,6 +1181,7 @@ void processProcedureInfo( } Comparator defineGetProceduresComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int PROC_CAT_IDX = resultSchemaFields.getIndex("PROCEDURE_CAT"); final int PROC_SCHEM_IDX = resultSchemaFields.getIndex("PROCEDURE_SCHEM"); final int PROC_NAME_IDX = resultSchemaFields.getIndex("PROCEDURE_NAME"); @@ -1061,6 +1203,32 @@ Comparator defineGetProceduresComparator(FieldList resultSchemaF @Override public ResultSet getProcedureColumns( String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest( + "++enter++ catalog: " + + catalog + + ", schemaPattern: " + + schemaPattern + + ", procedureNamePattern: " + + procedureNamePattern + + ", columnNamePattern: " + + columnNamePattern); + return getProcedureColumnsImpl( + catalog, schemaPattern, procedureNamePattern, columnNamePattern); + } + } + + private ResultSet getProcedureColumnsImpl( + String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) { + LOG.finest( + "++enter++ catalog: " + + catalog + + ", schemaPattern: " + + schemaPattern + + ", procedureNamePattern: " + + procedureNamePattern + + ", columnNamePattern: " + + columnNamePattern); if (catalog == null || catalog.isEmpty()) { LOG.warning("Returning empty ResultSet because catalog (project) is null or empty."); @@ -1117,8 +1285,7 @@ public ResultSet getProcedureColumns( procedureNamePattern, procedureNameRegex, listRoutinesExecutor, - catalogParam, - LOG); + catalogParam); shutdownExecutor(listRoutinesExecutor); listRoutinesExecutor = null; @@ -1132,7 +1299,7 @@ public ResultSet getProcedureColumns( 100, runnable -> new Thread(runnable, "pcol-get-details" + fetcherThreadNameSuffix)); List fullRoutines = - fetchFullRoutineDetailsForIds(procedureIdsToGet, getRoutineDetailsExecutor, LOG); + fetchFullRoutineDetailsForIds(procedureIdsToGet, getRoutineDetailsExecutor); shutdownExecutor(getRoutineDetailsExecutor); getRoutineDetailsExecutor = null; @@ -1152,8 +1319,7 @@ public ResultSet getProcedureColumns( collectedResults, resultSchema.getFields(), processArgsExecutor, - processingTaskFutures, - LOG); + processingTaskFutures); if (Thread.currentThread().isInterrupted()) { LOG.warning( @@ -1161,13 +1327,13 @@ public ResultSet getProcedureColumns( + catalogParam); processingTaskFutures.forEach(f -> f.cancel(true)); } else { - LOG.fine( + LOG.info( "Fetcher: Waiting for " + processingTaskFutures.size() + " argument processing tasks. Catalog: " + catalogParam); waitForTasksCompletion(processingTaskFutures); - LOG.fine( + LOG.info( "Fetcher: All argument processing tasks completed or handled. Catalog: " + catalogParam); } @@ -1175,7 +1341,7 @@ public ResultSet getProcedureColumns( if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetProcedureColumnsComparator(resultSchema.getFields()); - sortResults(collectedResults, comparator, "getProcedureColumns", LOG); + sortResults(collectedResults, comparator, "getProcedureColumns"); populateQueue(collectedResults, queue, resultSchema.getFields()); } @@ -1215,7 +1381,8 @@ public ResultSet getProcedureColumns( private List fetchMatchingDatasetsForProcedureColumns( String catalogParam, String schemaPattern, Pattern schemaRegex) throws InterruptedException { - LOG.fine( + LOG.finest("++enter++"); + LOG.info( "Fetching matching datasets for catalog '%s', schemaPattern '%s'", catalogParam, schemaPattern); List datasetsToScan = @@ -1226,8 +1393,7 @@ private List fetchMatchingDatasetsForProcedureColumns( (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaPattern, - schemaRegex, - LOG); + schemaRegex); LOG.info( "Found %d datasets to scan for procedures in catalog '%s'.", datasetsToScan.size(), catalogParam); @@ -1239,11 +1405,11 @@ List listMatchingProcedureIdsFromDatasets( String procedureNamePattern, Pattern procedureNameRegex, ExecutorService listRoutinesExecutor, - String catalogParam, - BigQueryJdbcCustomLogger logger) + String catalogParam) throws InterruptedException { + LOG.finest("++enter++"); - logger.fine( + LOG.info( "Listing matching procedure IDs from %d datasets for catalog '%s'.", datasetsToScan.size(), catalogParam); final List>> listRoutineFutures = new ArrayList<>(); @@ -1254,7 +1420,7 @@ List listMatchingProcedureIdsFromDatasets( InterruptedException ex = new InterruptedException( "Interrupted while listing routines for catalog: " + catalogParam); - logger.severe(ex.getMessage(), ex); + LOG.severe(ex.getMessage(), ex); throw ex; } final DatasetId currentDatasetId = dataset.getDatasetId(); @@ -1271,11 +1437,10 @@ List listMatchingProcedureIdsFromDatasets( currentDatasetId.getProject(), currentDatasetId.getDataset(), name)), (rt) -> rt.getRoutineId().getRoutine(), procedureNamePattern, - procedureNameRegex, - logger); + procedureNameRegex); listRoutineFutures.add(listRoutinesExecutor.submit(listCallable)); } - logger.fine( + LOG.info( "Submitted " + listRoutineFutures.size() + " routine list tasks for catalog: " @@ -1287,7 +1452,7 @@ List listMatchingProcedureIdsFromDatasets( InterruptedException ex = new InterruptedException( "Interrupted while collecting routine lists for catalog: " + catalogParam); - logger.severe(ex.getMessage(), ex); + LOG.severe(ex.getMessage(), ex); throw ex; } try { @@ -1299,7 +1464,7 @@ List listMatchingProcedureIdsFromDatasets( if (listedRoutine.getRoutineId() != null) { procedureIdsToGet.add(listedRoutine.getRoutineId()); } else { - logger.warning( + LOG.warning( "Found a procedure type routine with a null ID during listing phase for" + " catalog: " + catalogParam); @@ -1308,24 +1473,23 @@ List listMatchingProcedureIdsFromDatasets( } } } catch (ExecutionException e) { - logger.warning( + LOG.warning( "Error getting routine list result for catalog " + catalogParam + ": " + e.getCause()); } catch (CancellationException e) { - logger.warning("Routine list task cancelled for catalog: " + catalogParam); + LOG.warning("Routine list task cancelled for catalog: " + catalogParam); } } - logger.info( + LOG.info( "Found %d procedure IDs to fetch details for in catalog '%s'.", procedureIdsToGet.size(), catalogParam); return procedureIdsToGet; } List fetchFullRoutineDetailsForIds( - List procedureIdsToGet, - ExecutorService getRoutineDetailsExecutor, - BigQueryJdbcCustomLogger logger) + List procedureIdsToGet, ExecutorService getRoutineDetailsExecutor) throws InterruptedException { - logger.fine("Fetching full details for %d procedure IDs.", procedureIdsToGet.size()); + LOG.finest("++enter++"); + LOG.info("Fetching full details for %d procedure IDs.", procedureIdsToGet.size()); final List> getRoutineFutures = new ArrayList<>(); final List fullRoutines = Collections.synchronizedList(new ArrayList<>()); @@ -1333,7 +1497,7 @@ List fetchFullRoutineDetailsForIds( if (Thread.currentThread().isInterrupted()) { InterruptedException ex = new InterruptedException("Interrupted while submitting getRoutine tasks"); - logger.severe(ex.getMessage(), ex); + LOG.severe(ex.getMessage(), ex); throw ex; } final RoutineId currentProcId = procId; @@ -1342,7 +1506,7 @@ List fetchFullRoutineDetailsForIds( try { return bigquery.getRoutine(currentProcId); } catch (Exception e) { - logger.warning( + LOG.warning( "Failed to get full details for routine " + currentProcId + ": " @@ -1352,14 +1516,14 @@ List fetchFullRoutineDetailsForIds( }; getRoutineFutures.add(getRoutineDetailsExecutor.submit(getCallable)); } - logger.fine("Submitted " + getRoutineFutures.size() + " getRoutine detail tasks."); + LOG.info("Submitted " + getRoutineFutures.size() + " getRoutine detail tasks."); for (Future getFuture : getRoutineFutures) { if (Thread.currentThread().isInterrupted()) { getRoutineFutures.forEach(f -> f.cancel(true)); // Cancel remaining InterruptedException ex = new InterruptedException("Interrupted while collecting Routine details"); - logger.severe(ex.getMessage(), ex); + LOG.severe(ex.getMessage(), ex); throw ex; } try { @@ -1368,12 +1532,12 @@ List fetchFullRoutineDetailsForIds( fullRoutines.add(fullRoutine); } } catch (ExecutionException e) { - logger.warning("Error processing getRoutine future result: " + e.getCause()); + LOG.warning("Error processing getRoutine future result: " + e.getCause()); } catch (CancellationException e) { - logger.warning("getRoutine detail task cancelled."); + LOG.warning("getRoutine detail task cancelled."); } } - logger.info("Successfully fetched full details for %d routines.", fullRoutines.size()); + LOG.info("Successfully fetched full details for %d routines.", fullRoutines.size()); return fullRoutines; } @@ -1383,16 +1547,15 @@ void submitProcedureArgumentProcessingJobs( List collectedResults, FieldList resultSchemaFields, ExecutorService processArgsExecutor, - List> outArgumentProcessingFutures, - BigQueryJdbcCustomLogger logger) + List> outArgumentProcessingFutures) throws InterruptedException { - logger.fine("Submitting argument processing jobs for %d routines.", fullRoutines.size()); + LOG.info("Submitting argument processing jobs for %d routines.", fullRoutines.size()); for (Routine fullRoutine : fullRoutines) { if (Thread.currentThread().isInterrupted()) { InterruptedException ex = new InterruptedException("Interrupted while submitting argument processing jobs"); - logger.severe(ex.getMessage(), ex); + LOG.severe(ex.getMessage(), ex); throw ex; } if (fullRoutine != null) { @@ -1405,7 +1568,7 @@ void submitProcedureArgumentProcessingJobs( finalFullRoutine, columnNameRegex, collectedResults, resultSchemaFields)); outArgumentProcessingFutures.add(processFuture); } else { - logger.warning( + LOG.warning( "Routine " + (fullRoutine.getRoutineId() != null ? fullRoutine.getRoutineId().toString() @@ -1416,13 +1579,16 @@ void submitProcedureArgumentProcessingJobs( } } } - logger.fine( + LOG.info( "Finished submitting " + outArgumentProcessingFutures.size() + " processProcedureArguments tasks."); + LOG.info( + "Submitted %d procedure argument processing jobs.", outArgumentProcessingFutures.size()); } Schema defineGetProcedureColumnsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(20); fields.add( Field.newBuilder("PROCEDURE_CAT", StandardSQLTypeName.STRING) @@ -1500,7 +1666,7 @@ void processProcedureArguments( } if (arguments == null || arguments.isEmpty()) { - LOG.fine("Procedure " + routineId + " has no arguments."); + LOG.info("Procedure " + routineId + " has no arguments."); return; } @@ -1558,6 +1724,7 @@ List createProcedureColumnRow( @Nullable RoutineArgument argument, int ordinalPosition, String columnName) { + LOG.finest("++enter++"); List values = new ArrayList<>(20); ColumnTypeInfo typeInfo; @@ -1650,6 +1817,7 @@ ColumnTypeInfo determineTypeInfoFromDataType( String procedureName, String columnName, int ordinalPosition) { + LOG.finest("++enter++"); ColumnTypeInfo defaultVarcharTypeInfo = new ColumnTypeInfo(Types.VARCHAR, "VARCHAR", null, null, null); @@ -1672,6 +1840,7 @@ ColumnTypeInfo determineTypeInfoFromDataType( } Comparator defineGetProcedureColumnsComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int PROC_CAT_IDX = resultSchemaFields.getIndex("PROCEDURE_CAT"); final int PROC_SCHEM_IDX = resultSchemaFields.getIndex("PROCEDURE_SCHEM"); final int PROC_NAME_IDX = resultSchemaFields.getIndex("PROCEDURE_NAME"); @@ -1709,6 +1878,17 @@ Comparator defineGetProcedureColumnsComparator(FieldList resultS @Override public ResultSet getTables( String catalog, String schemaPattern, String tableNamePattern, String[] types) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest( + "++enter++ getTables(catalog=%s, schemaPattern=%s, tableNamePattern=%s, types=%s)", + catalog, schemaPattern, tableNamePattern, java.util.Arrays.toString(types)); + return getTablesImpl(catalog, schemaPattern, tableNamePattern, types); + } + } + + private ResultSet getTablesImpl( + String catalog, String schemaPattern, String tableNamePattern, String[] types) { + LOG.finest("++enter++"); Tuple effectiveIdentifiers = determineEffectiveCatalogAndSchema(catalog, schemaPattern); @@ -1759,8 +1939,7 @@ public ResultSet getTables( (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaParam, - schemaRegex, - LOG); + schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -1770,7 +1949,7 @@ public ResultSet getTables( apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); tableProcessorExecutor = Executors.newFixedThreadPool(this.metadataFetchThreadCount); - LOG.fine("Submitting parallel findMatchingTables tasks..."); + LOG.info("Submitting parallel findMatchingTables tasks..."); for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { LOG.warning("Table fetcher interrupted during dataset iteration."); @@ -1793,15 +1972,14 @@ public ResultSet getTables( name)), (tbl) -> tbl.getTableId().getTable(), tableNamePattern, - tableNameRegex, - LOG); + tableNameRegex); Future> apiFuture = apiExecutor.submit(apiCallable); apiFutures.add(apiFuture); } - LOG.fine("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); + LOG.info("Finished submitting " + apiFutures.size() + " findMatchingTables tasks."); apiExecutor.shutdown(); - LOG.fine("Processing results from findMatchingTables tasks..."); + LOG.info("Processing results from findMatchingTables tasks..."); for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { LOG.warning("Table fetcher interrupted while processing API futures."); @@ -1816,12 +1994,13 @@ public ResultSet getTables( final Table currentTable = table; Future processFuture = tableProcessorExecutor.submit( - () -> - processTableInfo( - currentTable, - requestedTypes, - collectedResults, - localResultSchemaFields)); + wrapWithMdc( + () -> + processTableInfo( + currentTable, + requestedTypes, + collectedResults, + localResultSchemaFields))); processingFutures.add(processFuture); } } @@ -1840,7 +2019,7 @@ public ResultSet getTables( } } - LOG.fine( + LOG.info( "Finished submitting " + processingFutures.size() + " processTableInfo tasks."); if (Thread.currentThread().isInterrupted()) { @@ -1848,15 +2027,15 @@ public ResultSet getTables( "Fetcher interrupted before waiting for processing tasks; cancelling remaining."); processingFutures.forEach(f -> f.cancel(true)); } else { - LOG.fine("Waiting for processTableInfo tasks to complete..."); + LOG.info("Waiting for processTableInfo tasks to complete..."); waitForTasksCompletion(processingFutures); - LOG.fine("All processTableInfo tasks completed."); + LOG.info("All processTableInfo tasks completed."); } if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetTablesComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getTables", LOG); + sortResults(collectedResults, comparator, "getTables"); } if (!Thread.currentThread().isInterrupted()) { @@ -1885,6 +2064,7 @@ public ResultSet getTables( } Schema defineGetTablesSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(10); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -1936,7 +2116,7 @@ void processTableInfo( FieldList resultSchemaFields) { TableId tableId = table.getTableId(); - LOG.fine("Processing table info for: " + tableId); + LOG.info("Processing table info for: " + tableId); try { String catalogName = tableId.getProject(); @@ -1968,7 +2148,7 @@ void processTableInfo( FieldValueList rowFvl = FieldValueList.of(values, resultSchemaFields); collectedResults.add(rowFvl); - LOG.fine("Processed and added table info row for: " + tableId); + LOG.info("Processed and added table info row for: " + tableId); } catch (Exception e) { LOG.warning( "Error processing table info for %s: %s. Skipping this table.", tableId, e.getMessage()); @@ -1976,6 +2156,7 @@ void processTableInfo( } Comparator defineGetTablesComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int TABLE_TYPE_IDX = resultSchemaFields.getIndex("TABLE_TYPE"); final int TABLE_CAT_IDX = resultSchemaFields.getIndex("TABLE_CAT"); final int TABLE_SCHEM_IDX = resultSchemaFields.getIndex("TABLE_SCHEM"); @@ -1996,13 +2177,24 @@ Comparator defineGetTablesComparator(FieldList resultSchemaField @Override public ResultSet getSchemas() { - LOG.info("getSchemas() called"); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + LOG.info("getSchemas() called"); - return getSchemas(null, null); + return getSchemas(null, null); + } } @Override public ResultSet getCatalogs() { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getCatalogsImpl(); + } + } + + private ResultSet getCatalogsImpl() { + LOG.finest("++enter++"); LOG.info("getCatalogs() called"); final List accessibleCatalogs = getAccessibleCatalogNames(); @@ -2021,12 +2213,14 @@ public ResultSet getCatalogs() { } Schema defineGetCatalogsSchema() { + LOG.finest("++enter++"); return Schema.of( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING).setMode(Mode.REQUIRED).build()); } List prepareGetCatalogsRows( FieldList schemaFields, List accessibleCatalogs) { + LOG.finest("++enter++"); List catalogRows = new ArrayList<>(); for (String catalogName : accessibleCatalogs) { FieldValue fieldValue = FieldValue.of(FieldValue.Attribute.PRIMITIVE, catalogName); @@ -2037,6 +2231,14 @@ List prepareGetCatalogsRows( @Override public ResultSet getTableTypes() { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getTableTypesImpl(); + } + } + + private ResultSet getTableTypesImpl() { + LOG.finest("++enter++"); LOG.info("getTableTypes() called"); final Schema tableTypesSchema = defineGetTableTypesSchema(); @@ -2053,6 +2255,7 @@ public ResultSet getTableTypes() { } static Schema defineGetTableTypesSchema() { + LOG.finest("++enter++"); return Schema.of( Field.newBuilder("TABLE_TYPE", StandardSQLTypeName.STRING) .setMode(Field.Mode.REQUIRED) @@ -2060,6 +2263,7 @@ static Schema defineGetTableTypesSchema() { } static List prepareGetTableTypesRows(Schema schema) { + LOG.finest("++enter++"); final String[] tableTypes = {"EXTERNAL", "MATERIALIZED VIEW", "SNAPSHOT", "TABLE", "VIEW"}; List rows = new ArrayList<>(tableTypes.length); FieldList schemaFields = schema.getFields(); @@ -2074,6 +2278,17 @@ static List prepareGetTableTypesRows(Schema schema) { @Override public ResultSet getColumns( String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest( + "++enter++ getColumns(catalog=%s, schemaPattern=%s, tableNamePattern=%s, columnNamePattern=%s)", + catalog, schemaPattern, tableNamePattern, columnNamePattern); + return getColumnsImpl(catalog, schemaPattern, tableNamePattern, columnNamePattern); + } + } + + private ResultSet getColumnsImpl( + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { + LOG.finest("++enter++"); Tuple effectiveIdentifiers = determineEffectiveCatalogAndSchema(catalog, schemaPattern); @@ -2122,8 +2337,7 @@ public ResultSet getColumns( (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaParam, - schemaRegex, - LOG); + schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -2152,8 +2366,7 @@ public ResultSet getColumns( TableId.of(datasetId.getProject(), datasetId.getDataset(), name)), (tbl) -> tbl.getTableId().getTable(), tableNamePattern, - tableNameRegex, - LOG); + tableNameRegex); for (Table table : tablesToScan) { if (Thread.currentThread().isInterrupted()) { @@ -2164,16 +2377,17 @@ public ResultSet getColumns( } TableId tableId = table.getTableId(); - LOG.fine("Submitting task for table: " + tableId); + LOG.info("Submitting task for table: " + tableId); final Table finalTable = table; Future future = columnExecutor.submit( - () -> - processTableColumns( - finalTable, - columnNameRegex, - collectedResults, - localResultSchemaFields)); + wrapWithMdc( + () -> + processTableColumns( + finalTable, + columnNameRegex, + collectedResults, + localResultSchemaFields))); taskFutures.add(future); } if (Thread.currentThread().isInterrupted()) break; @@ -2184,7 +2398,7 @@ public ResultSet getColumns( if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetColumnsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getColumns", LOG); + sortResults(collectedResults, comparator, "getColumns"); } if (!Thread.currentThread().isInterrupted()) { @@ -2215,14 +2429,15 @@ private void processTableColumns( Pattern columnNameRegex, List collectedResults, FieldList resultSchemaFields) { + LOG.finest("++enter++"); TableId tableId = table.getTableId(); - LOG.fine("Processing columns for table: " + tableId); + LOG.info("Processing columns for table: " + tableId); TableDefinition definition = table.getDefinition(); Schema tableSchema = (definition != null) ? definition.getSchema() : null; try { if (tableSchema == null) { - LOG.fine( + LOG.info( "Schema not included in table object for " + tableId + ", fetching full table details..."); @@ -2264,7 +2479,7 @@ private void processTableColumns( FieldValueList rowFvl = FieldValueList.of(values, resultSchemaFields); collectedResults.add(rowFvl); } - LOG.fine("Finished processing columns for table: " + tableId); + LOG.info("Finished processing columns for table: " + tableId); } catch (BigQueryException e) { LOG.warning( "BigQueryException processing table %s: %s (Code: %d)", @@ -2275,6 +2490,7 @@ private void processTableColumns( } private Schema defineGetColumnsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(24); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -2377,6 +2593,7 @@ private Schema defineGetColumnsSchema() { List createColumnRow( String catalog, String schemaName, String tableName, Field field, int ordinalPosition) { + LOG.finest("++enter++"); List values = new ArrayList<>(24); Field.Mode mode = (field.getMode() == null) ? Field.Mode.NULLABLE : field.getMode(); ColumnTypeInfo typeInfo = mapBigQueryTypeToJdbc(field); @@ -2457,6 +2674,7 @@ static class ColumnTypeInfo { } ColumnTypeInfo mapBigQueryTypeToJdbc(Field field) { + LOG.finest("++enter++"); Mode mode = (field.getMode() == null) ? Mode.NULLABLE : field.getMode(); if (mode == Mode.REPEATED) { return new ColumnTypeInfo(Types.ARRAY, "ARRAY", null, null, null); @@ -2470,6 +2688,7 @@ ColumnTypeInfo mapBigQueryTypeToJdbc(Field field) { } private Comparator defineGetColumnsComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int TABLE_CAT_IDX = resultSchemaFields.getIndex("TABLE_CAT"); final int TABLE_SCHEM_IDX = resultSchemaFields.getIndex("TABLE_SCHEM"); final int TABLE_NAME_IDX = resultSchemaFields.getIndex("TABLE_NAME"); @@ -2491,6 +2710,15 @@ private Comparator defineGetColumnsComparator(FieldList resultSc @Override public ResultSet getColumnPrivileges( String catalog, String schema, String table, String columnNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getColumnPrivilegesImpl(catalog, schema, table, columnNamePattern); + } + } + + private ResultSet getColumnPrivilegesImpl( + String catalog, String schema, String table, String columnNamePattern) { + LOG.finest("++enter++"); LOG.info( "getColumnPrivileges called for catalog: %s, schema: %s, table: %s, columnNamePattern:" + " %s. BigQuery IAM model differs from SQL privileges; returning empty ResultSet.", @@ -2505,6 +2733,7 @@ public ResultSet getColumnPrivileges( } Schema defineGetColumnPrivilegesSchema() { + LOG.finest("++enter++"); List fields = defineBasePrivilegeFields(); Field columnNameField = @@ -2519,6 +2748,15 @@ Schema defineGetColumnPrivilegesSchema() { @Override public ResultSet getTablePrivileges( String catalog, String schemaPattern, String tableNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getTablePrivilegesImpl(catalog, schemaPattern, tableNamePattern); + } + } + + private ResultSet getTablePrivilegesImpl( + String catalog, String schemaPattern, String tableNamePattern) { + LOG.finest("++enter++"); LOG.info( "getTablePrivileges called for catalog: %s, schemaPattern: %s, tableNamePattern: %s. " + "BigQuery IAM model differs from SQL privileges; returning empty ResultSet.", @@ -2533,6 +2771,7 @@ public ResultSet getTablePrivileges( } Schema defineGetTablePrivilegesSchema() { + LOG.finest("++enter++"); List fields = defineBasePrivilegeFields(); return Schema.of(fields); } @@ -2540,6 +2779,15 @@ Schema defineGetTablePrivilegesSchema() { @Override public ResultSet getBestRowIdentifier( String catalog, String schema, String table, int scope, boolean nullable) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getBestRowIdentifierImpl(catalog, schema, table, scope, nullable); + } + } + + private ResultSet getBestRowIdentifierImpl( + String catalog, String schema, String table, int scope, boolean nullable) { + LOG.finest("++enter++"); LOG.info( "getBestRowIdentifier called for catalog: %s, schema: %s, table: %s, scope: %d," + " nullable: %s. BigQuery does not support best row identifiers; returning empty" @@ -2555,6 +2803,7 @@ public ResultSet getBestRowIdentifier( } Schema defineGetBestRowIdentifierSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(8); fields.add( Field.newBuilder("SCOPE", StandardSQLTypeName.INT64).setMode(Field.Mode.REQUIRED).build()); @@ -2591,6 +2840,7 @@ Schema defineGetBestRowIdentifierSchema() { @Override public ResultSet getVersionColumns(String catalog, String schema, String table) { + LOG.finest("++enter++"); LOG.info( "getVersionColumns called for catalog: %s, schema: %s, table: %s. " + "Automatic version columns not supported by BigQuery; returning empty ResultSet.", @@ -2605,6 +2855,7 @@ public ResultSet getVersionColumns(String catalog, String schema, String table) } Schema defineGetVersionColumnsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(8); fields.add( Field.newBuilder("SCOPE", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build()); @@ -2641,6 +2892,17 @@ Schema defineGetVersionColumnsSchema() { @Override public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + return getPrimaryKeysImpl(catalog, schema, table); + } + } + + private ResultSet getPrimaryKeysImpl(String catalog, String schema, String table) + throws SQLException { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + LOG.info( + "getPrimaryKeys called for catalog: %s, schema: %s, table: %s", catalog, schema, table); String sql = readSqlFromFile(GET_PRIMARY_KEYS_SQL); try { if (this.statement == null) { @@ -2656,6 +2918,17 @@ public ResultSet getPrimaryKeys(String catalog, String schema, String table) thr @Override public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + return getImportedKeysImpl(catalog, schema, table); + } + } + + private ResultSet getImportedKeysImpl(String catalog, String schema, String table) + throws SQLException { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + LOG.info( + "getImportedKeys called for catalog: %s, schema: %s, table: %s", catalog, schema, table); String sql = readSqlFromFile(GET_IMPORTED_KEYS_SQL); try { if (this.statement == null) { @@ -2671,6 +2944,17 @@ public ResultSet getImportedKeys(String catalog, String schema, String table) @Override public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + return getExportedKeysImpl(catalog, schema, table); + } + } + + private ResultSet getExportedKeysImpl(String catalog, String schema, String table) + throws SQLException { + LOG.finest("++enter++ catalog: " + catalog + ", schema: " + schema + ", table: " + table); + LOG.info( + "getExportedKeys called for catalog: %s, schema: %s, table: %s", catalog, schema, table); String sql = readSqlFromFile(GET_EXPORTED_KEYS_SQL); try { if (this.statement == null) { @@ -2692,6 +2976,25 @@ public ResultSet getCrossReference( String foreignSchema, String foreignTable) throws SQLException { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getCrossReferenceImpl( + parentCatalog, parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable); + } + } + + private ResultSet getCrossReferenceImpl( + String parentCatalog, + String parentSchema, + String parentTable, + String foreignCatalog, + String foreignSchema, + String foreignTable) + throws SQLException { + LOG.finest("++enter++"); + LOG.info( + "getCrossReference called for parentCatalog: %s, parentSchema: %s, parentTable: %s, foreignCatalog: %s, foreignSchema: %s, foreignTable: %s", + parentCatalog, parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable); String sql = readSqlFromFile(GET_CROSS_REFERENCE_SQL); try { if (this.statement == null) { @@ -2714,6 +3017,14 @@ public ResultSet getCrossReference( @Override public ResultSet getTypeInfo() { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getTypeInfoImpl(); + } + } + + private ResultSet getTypeInfoImpl() { + LOG.finest("++enter++"); LOG.info("getTypeInfo() called"); final Schema typeInfoSchema = defineGetTypeInfoSchema(); @@ -2721,7 +3032,7 @@ public ResultSet getTypeInfo() { final List typeInfoRows = prepareGetTypeInfoRows(schemaFields); final Comparator comparator = defineGetTypeInfoComparator(schemaFields); - sortResults(typeInfoRows, comparator, "getTypeInfo", LOG); + sortResults(typeInfoRows, comparator, "getTypeInfo"); final BlockingQueue queue = new LinkedBlockingQueue<>(typeInfoRows.size() + 1); @@ -2732,6 +3043,7 @@ public ResultSet getTypeInfo() { } Schema defineGetTypeInfoSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(18); fields.add( Field.newBuilder("TYPE_NAME", StandardSQLTypeName.STRING) @@ -2809,6 +3121,7 @@ Schema defineGetTypeInfoSchema() { } List prepareGetTypeInfoRows(FieldList schemaFields) { + LOG.finest("++enter++"); List rows = new ArrayList<>(); Function createRow = @@ -3163,6 +3476,7 @@ List prepareGetTypeInfoRows(FieldList schemaFields) { } Comparator defineGetTypeInfoComparator(FieldList schemaFields) { + LOG.finest("++enter++"); final int DATA_TYPE_IDX = schemaFields.getIndex("DATA_TYPE"); if (DATA_TYPE_IDX < 0) { LOG.severe( @@ -3180,6 +3494,15 @@ Comparator defineGetTypeInfoComparator(FieldList schemaFields) { @Override public ResultSet getIndexInfo( String catalog, String schema, String table, boolean unique, boolean approximate) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getIndexInfoImpl(catalog, schema, table, unique, approximate); + } + } + + private ResultSet getIndexInfoImpl( + String catalog, String schema, String table, boolean unique, boolean approximate) { + LOG.finest("++enter++"); LOG.info( "getIndexInfo called for catalog: %s, schema: %s, table: %s, unique: %s, approximate:" + " %s. Traditional indexes not supported by BigQuery; returning empty ResultSet.", @@ -3194,6 +3517,7 @@ public ResultSet getIndexInfo( } Schema defineGetIndexInfoSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(13); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -3248,69 +3572,82 @@ Schema defineGetIndexInfoSchema() { @Override public boolean supportsResultSetType(int type) { + LOG.finest("++enter++"); // BigQuery primarily supports forward-only result sets. return type == ResultSet.TYPE_FORWARD_ONLY; } @Override public boolean supportsResultSetConcurrency(int type, int concurrency) { + LOG.finest("++enter++"); // BigQuery primarily supports forward-only, read-only result sets. return type == ResultSet.TYPE_FORWARD_ONLY && concurrency == ResultSet.CONCUR_READ_ONLY; } @Override public boolean ownUpdatesAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean ownDeletesAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean ownInsertsAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean othersUpdatesAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean othersDeletesAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean othersInsertsAreVisible(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean updatesAreDetected(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean deletesAreDetected(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean insertsAreDetected(int type) { + LOG.finest("++enter++"); return false; } @Override public boolean supportsBatchUpdates() { + LOG.finest("++enter++"); return false; } @Override public ResultSet getUDTs( String catalog, String schemaPattern, String typeNamePattern, int[] types) { + LOG.finest("++enter++"); LOG.info( "getUDTs called for catalog: %s, schemaPattern: %s, typeNamePattern: %s, types: %s. " + "Feature not supported by BigQuery; returning empty ResultSet.", @@ -3325,6 +3662,7 @@ public ResultSet getUDTs( } Schema defineGetUDTsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(7); fields.add( Field.newBuilder("TYPE_CAT", StandardSQLTypeName.STRING) @@ -3359,31 +3697,37 @@ Schema defineGetUDTsSchema() { @Override public Connection getConnection() { + LOG.finest("++enter++"); return connection; } @Override public boolean supportsSavepoints() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsNamedParameters() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsMultipleOpenResults() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsGetGeneratedKeys() { + LOG.finest("++enter++"); return false; } @Override public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) { + LOG.finest("++enter++"); LOG.info( "getSuperTables called for catalog: %s, schemaPattern: %s, tableNamePattern: %s. " + "BigQuery does not support super tables; returning empty ResultSet.", @@ -3399,6 +3743,7 @@ public ResultSet getSuperTables(String catalog, String schemaPattern, String tab } Schema defineGetSuperTablesSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(4); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -3421,6 +3766,7 @@ Schema defineGetSuperTablesSchema() { @Override public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern) { + LOG.finest("++enter++"); LOG.info( "getSuperTypes called for catalog: %s, schemaPattern: %s, typeNamePattern: %s. BigQuery" + " does not support user-defined type hierarchies; returning empty ResultSet.", @@ -3436,6 +3782,7 @@ public ResultSet getSuperTypes(String catalog, String schemaPattern, String type } Schema defineGetSuperTypesSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(6); fields.add( Field.newBuilder("TYPE_CAT", StandardSQLTypeName.STRING) @@ -3467,6 +3814,7 @@ Schema defineGetSuperTypesSchema() { @Override public ResultSet getAttributes( String catalog, String schemaPattern, String typeNamePattern, String attributeNamePattern) { + LOG.finest("++enter++"); LOG.info( "getAttributes called for catalog: %s, schemaPattern: %s, typeNamePattern: %s," + " attributeNamePattern: %s. Feature not supported by BigQuery; returning empty" @@ -3482,6 +3830,7 @@ public ResultSet getAttributes( } Schema defineGetAttributesSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(21); fields.add( Field.newBuilder("TYPE_CAT", StandardSQLTypeName.STRING) @@ -3572,6 +3921,7 @@ Schema defineGetAttributesSchema() { @Override public boolean supportsResultSetHoldability(int holdability) { + LOG.finest("++enter++"); if (holdability == ResultSet.CLOSE_CURSORS_AT_COMMIT) { return true; } @@ -3580,6 +3930,7 @@ public boolean supportsResultSetHoldability(int holdability) { @Override public int getResultSetHoldability() { + LOG.finest("++enter++"); return ResultSet.CLOSE_CURSORS_AT_COMMIT; } @@ -3587,46 +3938,62 @@ public int getResultSetHoldability() { // Obtained from java libraries pom // https://github.com/googleapis/java-bigquery/blob/main/pom.xml public int getDatabaseMajorVersion() { + LOG.finest("++enter++"); return 2; } @Override public int getDatabaseMinorVersion() { + LOG.finest("++enter++"); return 0; } @Override public int getJDBCMajorVersion() { + LOG.finest("++enter++"); return 4; } @Override public int getJDBCMinorVersion() { + LOG.finest("++enter++"); return 2; } @Override public int getSQLStateType() { + LOG.finest("++enter++"); return DatabaseMetaData.sqlStateSQL; } @Override public boolean locatorsUpdateCopy() { + LOG.finest("++enter++"); return false; } @Override public boolean supportsStatementPooling() { + LOG.finest("++enter++"); return false; } @Override public RowIdLifetime getRowIdLifetime() { + LOG.finest("++enter++"); return null; } @Override public ResultSet getSchemas(String catalog, String schemaPattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ getSchemas(catalog=%s, schemaPattern=%s)", catalog, schemaPattern); + return getSchemasImpl(catalog, schemaPattern); + } + } + + private ResultSet getSchemasImpl(String catalog, String schemaPattern) { + LOG.finest("++enter++"); if ((catalog != null && catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty())) { LOG.warning("Returning empty ResultSet as catalog or schemaPattern is an empty string."); @@ -3681,8 +4048,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), (ds) -> ds.getDatasetId().getDataset(), schemaPattern, - schemaRegex, - LOG); + schemaRegex); if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { LOG.info( @@ -3691,7 +4057,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { continue; } - LOG.fine("Processing found datasets for project: " + currentProjectToScan); + LOG.info("Processing found datasets for project: " + currentProjectToScan); for (Dataset dataset : datasetsInProject) { if (Thread.currentThread().isInterrupted()) { LOG.warning( @@ -3706,7 +4072,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); + sortResults(collectedResults, comparator, "getSchemas"); } if (!Thread.currentThread().isInterrupted()) { @@ -3731,6 +4097,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { } Schema defineGetSchemasSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(2); fields.add( Field.newBuilder("TABLE_SCHEM", StandardSQLTypeName.STRING) @@ -3764,6 +4131,7 @@ void processSchemaInfo( } Comparator defineGetSchemasComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int TABLE_CATALOG_IDX = resultSchemaFields.getIndex("TABLE_CATALOG"); final int TABLE_SCHEM_IDX = resultSchemaFields.getIndex("TABLE_SCHEM"); return Comparator.comparing( @@ -3776,16 +4144,19 @@ Comparator defineGetSchemasComparator(FieldList resultSchemaFiel @Override public boolean supportsStoredFunctionsUsingCallSyntax() { + LOG.finest("++enter++"); return false; } @Override public boolean autoCommitFailureClosesAllResultSets() { + LOG.finest("++enter++"); return false; } @Override public ResultSet getClientInfoProperties() { + LOG.finest("++enter++"); LOG.info("getClientInfoProperties() called."); final Schema resultSchema = defineGetClientInfoPropertiesSchema(); @@ -3831,7 +4202,7 @@ public ResultSet getClientInfoProperties() { (FieldValueList fvl) -> getStringValueOrNull(fvl, resultSchemaFields.getIndex("NAME")), Comparator.nullsFirst(String::compareToIgnoreCase)); - sortResults(collectedResults, comparator, "getClientInfoProperties", LOG); + sortResults(collectedResults, comparator, "getClientInfoProperties"); populateQueue(collectedResults, queue, resultSchemaFields); } catch (Exception e) { @@ -3846,6 +4217,7 @@ public ResultSet getClientInfoProperties() { } Schema defineGetClientInfoPropertiesSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(4); fields.add( Field.newBuilder("NAME", StandardSQLTypeName.STRING) @@ -3868,6 +4240,15 @@ Schema defineGetClientInfoPropertiesSchema() { @Override public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getFunctionsImpl(catalog, schemaPattern, functionNamePattern); + } + } + + private ResultSet getFunctionsImpl( + String catalog, String schemaPattern, String functionNamePattern) { + LOG.finest("++enter++"); if ((catalog == null || catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty()) || (functionNamePattern != null && functionNamePattern.isEmpty())) { @@ -3908,8 +4289,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaPattern, - schemaRegex, - LOG); + schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -3929,7 +4309,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct Callable> apiCallable = () -> { - LOG.fine( + LOG.info( "Fetching all routines for dataset: %s, pattern: %s", currentDatasetId.getDataset(), functionNamePattern); return findMatchingBigQueryObjects( @@ -3945,13 +4325,12 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct name)), (rt) -> rt.getRoutineId().getRoutine(), functionNamePattern, - functionNameRegex, - LOG); + functionNameRegex); }; Future> apiFuture = apiExecutor.submit(apiCallable); apiFutures.add(apiFuture); } - LOG.fine( + LOG.info( "Finished submitting " + apiFutures.size() + " findMatchingRoutines (for functions) tasks."); @@ -3972,7 +4351,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct String routineType = routine.getRoutineType(); if ("SCALAR_FUNCTION".equalsIgnoreCase(routineType) || "TABLE_FUNCTION".equalsIgnoreCase(routineType)) { - LOG.fine( + LOG.info( "Submitting processing task for function: " + routine.getRoutineId() + " of type " @@ -4001,7 +4380,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct waitForTasksCompletion(processingTaskFutures); Comparator comparator = defineGetFunctionsComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getFunctions", LOG); + sortResults(collectedResults, comparator, "getFunctions"); populateQueue(collectedResults, queue, localResultSchemaFields); } catch (Throwable t) { LOG.severe("Unexpected error in function fetcher runnable: " + t.getMessage()); @@ -4025,6 +4404,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct } Schema defineGetFunctionsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(6); fields.add( Field.newBuilder("FUNCTION_CAT", StandardSQLTypeName.STRING) @@ -4054,7 +4434,7 @@ Schema defineGetFunctionsSchema() { void processFunctionInfo( Routine routine, List collectedResults, FieldList resultSchemaFields) { RoutineId routineId = routine.getRoutineId(); - LOG.fine("Processing function info for: " + routineId); + LOG.info("Processing function info for: " + routineId); try { String catalogName = routineId.getProject(); @@ -4083,7 +4463,7 @@ void processFunctionInfo( FieldValueList rowFvl = FieldValueList.of(values, resultSchemaFields); collectedResults.add(rowFvl); - LOG.fine("Processed and added function info row for: " + routineId); + LOG.info("Processed and added function info row for: " + routineId); } catch (Exception e) { LOG.warning( @@ -4093,6 +4473,7 @@ void processFunctionInfo( } Comparator defineGetFunctionsComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int FUNC_CAT_IDX = resultSchemaFields.getIndex("FUNCTION_CAT"); final int FUNC_SCHEM_IDX = resultSchemaFields.getIndex("FUNCTION_SCHEM"); final int FUNC_NAME_IDX = resultSchemaFields.getIndex("FUNCTION_NAME"); @@ -4115,6 +4496,15 @@ Comparator defineGetFunctionsComparator(FieldList resultSchemaFi @Override public ResultSet getFunctionColumns( String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return getFunctionColumnsImpl(catalog, schemaPattern, functionNamePattern, columnNamePattern); + } + } + + private ResultSet getFunctionColumnsImpl( + String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) { + LOG.finest("++enter++"); if (catalog == null || catalog.isEmpty()) { LOG.warning("Returning empty ResultSet catalog (project) is null or empty."); return new BigQueryJsonResultSet(); @@ -4161,8 +4551,7 @@ public ResultSet getFunctionColumns( (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), (ds) -> ds.getDatasetId().getDataset(), schemaPattern, - schemaRegex, - LOG); + schemaRegex); if (datasetsToScan.isEmpty() || Thread.currentThread().isInterrupted()) { LOG.info( @@ -4180,8 +4569,7 @@ public ResultSet getFunctionColumns( functionNamePattern, functionNameRegex, listRoutinesExecutor, - catalogParam, - LOG); + catalogParam); shutdownExecutor(listRoutinesExecutor); listRoutinesExecutor = null; @@ -4196,7 +4584,7 @@ public ResultSet getFunctionColumns( runnable -> new Thread(runnable, "funcol-get-details" + fetcherThreadNameSuffix)); List fullFunctions = - fetchFullRoutineDetailsForIds(functionIdsToGet, getRoutineDetailsExecutor, LOG); + fetchFullRoutineDetailsForIds(functionIdsToGet, getRoutineDetailsExecutor); shutdownExecutor(getRoutineDetailsExecutor); getRoutineDetailsExecutor = null; @@ -4217,8 +4605,7 @@ public ResultSet getFunctionColumns( collectedResults, resultSchemaFields, processParamsExecutor, - processingTaskFutures, - LOG); + processingTaskFutures); if (Thread.currentThread().isInterrupted()) { LOG.warning( @@ -4226,13 +4613,13 @@ public ResultSet getFunctionColumns( + catalogParam); processingTaskFutures.forEach(f -> f.cancel(true)); } else { - LOG.fine( + LOG.info( "Fetcher: Waiting for " + processingTaskFutures.size() + " parameter processing tasks. Catalog: " + catalogParam); waitForTasksCompletion(processingTaskFutures); - LOG.fine( + LOG.info( "Fetcher: All parameter processing tasks completed or handled. Catalog: " + catalogParam); } @@ -4240,7 +4627,7 @@ public ResultSet getFunctionColumns( if (!Thread.currentThread().isInterrupted()) { Comparator comparator = defineGetFunctionColumnsComparator(resultSchemaFields); - sortResults(collectedResults, comparator, "getFunctionColumns", LOG); + sortResults(collectedResults, comparator, "getFunctionColumns"); populateQueue(collectedResults, queue, resultSchemaFields); } @@ -4279,6 +4666,7 @@ public ResultSet getFunctionColumns( } Schema defineGetFunctionColumnsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(17); fields.add( Field.newBuilder("FUNCTION_CAT", StandardSQLTypeName.STRING) @@ -4350,11 +4738,11 @@ List listMatchingFunctionIdsFromDatasets( String functionNamePattern, Pattern functionNameRegex, ExecutorService listRoutinesExecutor, - String catalogParam, - BigQueryJdbcCustomLogger logger) + String catalogParam) throws InterruptedException { + LOG.finest("++enter++"); - logger.fine( + LOG.info( "Listing matching function IDs from %d datasets for catalog '%s'.", datasetsToScan.size(), catalogParam); final List>> listRoutineFutures = new ArrayList<>(); @@ -4362,7 +4750,7 @@ List listMatchingFunctionIdsFromDatasets( for (Dataset dataset : datasetsToScan) { if (Thread.currentThread().isInterrupted()) { - logger.warning( + LOG.warning( "Interrupted during submission of routine (function) listing tasks for catalog: " + catalogParam); throw new InterruptedException("Interrupted while listing functions"); @@ -4381,11 +4769,10 @@ List listMatchingFunctionIdsFromDatasets( currentDatasetId.getProject(), currentDatasetId.getDataset(), name)), (rt) -> rt.getRoutineId().getRoutine(), functionNamePattern, - functionNameRegex, - logger); + functionNameRegex); listRoutineFutures.add(listRoutinesExecutor.submit(listCallable)); } - logger.fine( + LOG.info( "Submitted " + listRoutineFutures.size() + " routine (function) list tasks for catalog: " @@ -4393,7 +4780,7 @@ List listMatchingFunctionIdsFromDatasets( for (Future> listFuture : listRoutineFutures) { if (Thread.currentThread().isInterrupted()) { - logger.warning( + LOG.warning( "Interrupted while collecting routine (function) list results for catalog: " + catalogParam); listRoutineFutures.forEach(f -> f.cancel(true)); @@ -4409,7 +4796,7 @@ List listMatchingFunctionIdsFromDatasets( if (listedRoutine.getRoutineId() != null) { functionIdsToGet.add(listedRoutine.getRoutineId()); } else { - logger.warning( + LOG.warning( "Found a function type routine with a null ID during listing phase for catalog:" + " " + catalogParam); @@ -4418,16 +4805,16 @@ List listMatchingFunctionIdsFromDatasets( } } } catch (ExecutionException e) { - logger.warning( + LOG.warning( "Error getting routine (function) list result for catalog " + catalogParam + ": " + e.getCause()); } catch (CancellationException e) { - logger.warning("Routine (function) list task cancelled for catalog: " + catalogParam); + LOG.warning("Routine (function) list task cancelled for catalog: " + catalogParam); } } - logger.info( + LOG.info( "Found %d function IDs to fetch details for in catalog '%s'.", functionIdsToGet.size(), catalogParam); return functionIdsToGet; @@ -4439,14 +4826,13 @@ void submitFunctionParameterProcessingJobs( List collectedResults, FieldList resultSchemaFields, ExecutorService processParamsExecutor, - List> outParameterProcessingFutures, - BigQueryJdbcCustomLogger logger) + List> outParameterProcessingFutures) throws InterruptedException { - logger.fine("Submitting parameter processing jobs for %d functions.", fullFunctions.size()); + LOG.info("Submitting parameter processing jobs for %d functions.", fullFunctions.size()); for (Routine fullFunction : fullFunctions) { if (Thread.currentThread().isInterrupted()) { - logger.warning("Interrupted during submission of function parameter processing tasks."); + LOG.warning("Interrupted during submission of function parameter processing tasks."); throw new InterruptedException( "Interrupted while submitting function parameter processing jobs"); } @@ -4465,7 +4851,7 @@ void submitFunctionParameterProcessingJobs( resultSchemaFields)); outParameterProcessingFutures.add(processFuture); } else { - logger.warning( + LOG.warning( "Routine " + (fullFunction.getRoutineId() != null ? fullFunction.getRoutineId().toString() @@ -4476,7 +4862,7 @@ void submitFunctionParameterProcessingJobs( } } } - logger.fine( + LOG.info( "Finished submitting " + outParameterProcessingFutures.size() + " processFunctionParametersAndReturnValue tasks."); @@ -4573,6 +4959,7 @@ List createFunctionColumnRow( int columnType, StandardSQLDataType dataType, int ordinalPosition) { + LOG.finest("++enter++"); List values = new ArrayList<>(17); ColumnTypeInfo typeInfo = @@ -4641,6 +5028,7 @@ List createFunctionColumnRow( } Comparator defineGetFunctionColumnsComparator(FieldList resultSchemaFields) { + LOG.finest("++enter++"); final int FUNC_CAT_IDX = resultSchemaFields.getIndex("FUNCTION_CAT"); final int FUNC_SCHEM_IDX = resultSchemaFields.getIndex("FUNCTION_SCHEM"); final int FUNC_NAME_IDX = resultSchemaFields.getIndex("FUNCTION_NAME"); @@ -4667,6 +5055,7 @@ Comparator defineGetFunctionColumnsComparator(FieldList resultSc @Override public ResultSet getPseudoColumns( String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) { + LOG.finest("++enter++"); LOG.info( "getPseudoColumns called for catalog: %s, schemaPattern: %s, tableNamePattern: %s," + " columnNamePattern: %s. Pseudo columns not supported by BigQuery; returning" @@ -4682,6 +5071,7 @@ public ResultSet getPseudoColumns( } Schema defineGetPseudoColumnsSchema() { + LOG.finest("++enter++"); List fields = new ArrayList<>(12); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -4736,16 +5126,19 @@ Schema defineGetPseudoColumnsSchema() { @Override public boolean generatedKeyAlwaysReturned() { + LOG.finest("++enter++"); return false; } @Override public T unwrap(Class iface) { + LOG.finest("++enter++"); return null; } @Override public boolean isWrapperFor(Class iface) { + LOG.finest("++enter++"); return false; } @@ -4768,6 +5161,7 @@ public boolean isWrapperFor(Class iface) { */ private Tuple determineEffectiveCatalogAndSchema( String catalog, String schemaPattern) { + LOG.finest("++enter++"); String effectiveCatalog = catalog; String effectiveSchemaPattern = schemaPattern; @@ -4819,6 +5213,7 @@ private Tuple determineEffectiveCatalogAndSchema( } private ColumnTypeInfo getColumnTypeInfoForSqlType(StandardSQLTypeName bqType) { + LOG.finest("++enter++"); if (bqType == null) { LOG.warning("Null BigQuery type encountered: " + bqType.name() + ". Mapping to VARCHAR."); return new ColumnTypeInfo(Types.VARCHAR, bqType.name(), null, null, null); @@ -4865,8 +5260,7 @@ List findMatchingBigQueryObjects( Function getSpecificOperation, Function nameExtractor, String pattern, - Pattern regex, - BigQueryJdbcCustomLogger logger) { + Pattern regex) { boolean needsList = needsListing(pattern); List resultList = new ArrayList<>(); @@ -4874,30 +5268,29 @@ List findMatchingBigQueryObjects( try { Iterable objects; if (needsList) { - logger.info( + LOG.info( "Listing all %ss (pattern: %s)...", objectTypeName, pattern == null ? "" : pattern); Page firstPage = listAllOperation.get(); objects = firstPage.iterateAll(); - logger.fine( - "Retrieved initial %s list, iterating & filtering if needed...", objectTypeName); + LOG.info("Retrieved initial %s list, iterating & filtering if needed...", objectTypeName); } else { - logger.info("Getting specific %s: '%s'", objectTypeName, pattern); + LOG.info("Getting specific %s: '%s'", objectTypeName, pattern); T specificObject = getSpecificOperation.apply(pattern); objects = (specificObject == null) ? Collections.emptyList() : Collections.singletonList(specificObject); if (specificObject == null) { - logger.info("Specific %s not found: '%s'", objectTypeName, pattern); + LOG.info("Specific %s not found: '%s'", objectTypeName, pattern); } } boolean wasListing = needsList; for (T obj : objects) { if (Thread.currentThread().isInterrupted()) { - logger.warning("Thread interrupted during " + objectTypeName + " processing loop."); + LOG.warning("Thread interrupted during " + objectTypeName + " processing loop."); throw new InterruptedException( "Interrupted during " + objectTypeName + " processing loop"); } @@ -4915,17 +5308,17 @@ List findMatchingBigQueryObjects( } catch (BigQueryException e) { if (!needsList && e.getCode() == 404) { - logger.info("%s '%s' not found (API error 404).", objectTypeName, pattern); + LOG.info("%s '%s' not found (API error 404).", objectTypeName, pattern); } else { - logger.warning( + LOG.warning( "BigQueryException finding %ss for pattern '%s': %s (Code: %d)", objectTypeName, pattern, e.getMessage(), e.getCode()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - logger.warning("Interrupted while finding " + objectTypeName + "s."); + LOG.warning("Interrupted while finding " + objectTypeName + "s."); } catch (Exception e) { - logger.severe( + LOG.severe( "Unexpected exception finding %ss for pattern '%s': %s", objectTypeName, pattern, e.getMessage()); } @@ -4989,28 +5382,28 @@ private static class TypeInfoRowData { void sortResults( List collectedResults, Comparator comparator, - String operationName, - BigQueryJdbcCustomLogger logger) { + String operationName) { if (collectedResults == null || collectedResults.isEmpty()) { - logger.info("No results collected for %s, skipping sort.", operationName); + LOG.info("No results collected for %s, skipping sort.", operationName); return; } if (comparator == null) { - logger.info("No comparator provided for %s, skipping sort.", operationName); + LOG.info("No comparator provided for %s, skipping sort.", operationName); return; } - logger.info("Sorting %d collected %s results...", collectedResults.size(), operationName); + LOG.info("Sorting %d collected %s results...", collectedResults.size(), operationName); try { collectedResults.sort(comparator); - logger.info("%s result sorting completed.", operationName); + LOG.info("%s result sorting completed.", operationName); } catch (Exception e) { - logger.severe("Error during sorting %s results: %s", operationName, e.getMessage()); + LOG.severe("Error during sorting %s results: %s", operationName, e.getMessage()); } } private List defineBasePrivilegeFields() { + LOG.finest("++enter++"); List fields = new ArrayList<>(7); fields.add( Field.newBuilder("TABLE_CAT", StandardSQLTypeName.STRING) @@ -5044,6 +5437,8 @@ private List defineBasePrivilegeFields() { } Pattern compileSqlLikePattern(String sqlLikePattern) { + LOG.finest("++enter++"); + LOG.info("Compiling SQL LIKE pattern: %s", sqlLikePattern); if (sqlLikePattern == null) { return null; } @@ -5091,32 +5486,38 @@ boolean needsListing(String pattern) { } FieldValue createStringFieldValue(String value) { + LOG.finest("++enter++"); return FieldValue.of(FieldValue.Attribute.PRIMITIVE, value); } FieldValue createLongFieldValue(Long value) { + LOG.finest("++enter++"); return (value == null) ? FieldValue.of(FieldValue.Attribute.PRIMITIVE, null) : FieldValue.of(FieldValue.Attribute.PRIMITIVE, String.valueOf(value)); } FieldValue createNullFieldValue() { + LOG.finest("++enter++"); return FieldValue.of(FieldValue.Attribute.PRIMITIVE, null); } FieldValue createBooleanFieldValue(Boolean value) { + LOG.finest("++enter++"); return (value == null) ? FieldValue.of(FieldValue.Attribute.PRIMITIVE, null) : FieldValue.of(FieldValue.Attribute.PRIMITIVE, value ? "1" : "0"); } private String getStringValueOrNull(FieldValueList fvl, int index) { + LOG.finest("++enter++"); if (fvl == null || index < 0 || index >= fvl.size()) return null; FieldValue fv = fvl.get(index); return (fv == null || fv.isNull()) ? null : fv.getStringValue(); } private Long getLongValueOrNull(FieldValueList fvl, int index) { + LOG.finest("++enter++"); if (fvl == null || index < 0 || index >= fvl.size()) return null; FieldValue fv = fvl.get(index); try { @@ -5128,6 +5529,7 @@ private Long getLongValueOrNull(FieldValueList fvl, int index) { } private void waitForTasksCompletion(List> taskFutures) { + LOG.finest("++enter++"); LOG.info("Waiting for %d submitted tasks to complete...", taskFutures.size()); for (Future future : taskFutures) { try { @@ -5156,6 +5558,7 @@ private void populateQueue( List collectedResults, BlockingQueue queue, FieldList resultSchemaFields) { + LOG.finest("++enter++"); LOG.info("Populating queue with %d results...", collectedResults.size()); try { for (FieldValueList sortedRow : collectedResults) { @@ -5176,6 +5579,7 @@ private void populateQueue( private void signalEndOfData( BlockingQueue queue, FieldList resultSchemaFields) { + LOG.finest("++enter++"); try { LOG.info("Adding end signal to queue."); queue.put(BigQueryFieldValueListWrapper.of(resultSchemaFields, null, true)); @@ -5188,6 +5592,7 @@ private void signalEndOfData( } private void shutdownExecutor(ExecutorService executor) { + LOG.finest("++enter++"); if (executor == null || executor.isShutdown()) { return; } @@ -5213,10 +5618,12 @@ private void shutdownExecutor(ExecutorService executor) { } private String getCurrentCatalogName() { + LOG.finest("++enter++"); return this.connection.getCatalog(); } private List getAccessibleCatalogNames() { + LOG.finest("++enter++"); Set accessibleCatalogs = new HashSet<>(); String primaryCatalog = getCurrentCatalogName(); if (primaryCatalog != null && !primaryCatalog.isEmpty()) { @@ -5239,10 +5646,12 @@ private List getAccessibleCatalogNames() { List sortedCatalogs = new ArrayList<>(accessibleCatalogs); Collections.sort(sortedCatalogs); + LOG.info("Accessible catalogs: %s", sortedCatalogs); return sortedCatalogs; } static String readSqlFromFile(String filename) { + LOG.finest("++enter++"); InputStream in; in = BigQueryDatabaseMetaData.class.getResourceAsStream(filename); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -5257,10 +5666,28 @@ static String readSqlFromFile(String filename) { } String replaceSqlParameters(String sql, String... params) throws SQLException { + LOG.finest("++enter++"); return String.format(sql, (Object[]) params); } + private java.util.concurrent.Callable wrapWithMdc(java.util.concurrent.Callable task) { + return () -> { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + return task.call(); + } + }; + } + + private java.lang.Runnable wrapWithMdc(java.lang.Runnable task) { + return () -> { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + task.run(); + } + }; + } + private void loadDriverVersionProperties() { + LOG.finest("++enter++"); if (parsedDriverVersion.get() != null) { return; } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcBulkInsertWriter.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcBulkInsertWriter.java index d4e9702621dc..cf687674762c 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcBulkInsertWriter.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcBulkInsertWriter.java @@ -47,6 +47,7 @@ class BigQueryJdbcBulkInsertWriter { void initialize(TableName parentTable, BigQueryWriteClient client, RetrySettings retrySettings) throws IOException, DescriptorValidationException, InterruptedException { + LOG.fine("Initializing BulkInsertWriter for table: %s", parentTable); WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build(); CreateWriteStreamRequest createWriteStreamRequest = @@ -67,6 +68,7 @@ void initialize(TableName parentTable, BigQueryWriteClient client, RetrySettings } void append(JsonArray data, long offset) throws DescriptorValidationException, IOException { + LOG.fine("Appending %d rows at offset %d", data.size(), offset); synchronized (this.streamLock) { if (this.error != null) { throw this.error; @@ -113,6 +115,7 @@ public void onSuccess(AppendRowsResponse response) { } public void onFailure(Throwable throwable) { + parent.LOG.fine(throwable, "Failed to append rows in background callback"); synchronized (this.parent.streamLock) { if (this.parent.error == null) { StorageException storageException = Exceptions.toStorageException(throwable); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcCustomLogger.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcCustomLogger.java index 6932f9b1a2a8..88bf3a76e592 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcCustomLogger.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcCustomLogger.java @@ -79,6 +79,10 @@ void fine(String format, Object... args) { logWithCaller(Level.FINE, () -> String.format(format, args)); } + void fine(Throwable thrown, String msg) { + logWithCaller(Level.FINE, thrown, () -> msg); + } + void config(String format, Object... args) { logWithCaller(Level.CONFIG, () -> String.format(format, args)); } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java index c27ec67e4560..919709db0efd 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdc.java @@ -36,6 +36,7 @@ class BigQueryJdbcMdc { new InheritableThreadLocal<>(); static MdcCloseable registerInstance(BigQueryConnection connection, String id) { + String prevId = currentConnectionId.get(); if (connection != null) { String cleanId = instanceIds.computeIfAbsent( @@ -51,7 +52,37 @@ static MdcCloseable registerInstance(BigQueryConnection connection, String id) { instanceLocals.computeIfAbsent(connection, k -> new InheritableThreadLocal<>()); threadLocal.set(cleanId); } - return () -> clear(); + return () -> { + if (prevId == null) { + clear(); + } else { + currentConnectionId.set(prevId); + } + }; + } + + static MdcCloseable setContext(String connectionId) { + String prevId = currentConnectionId.get(); + if (connectionId != null && !connectionId.isEmpty()) { + currentConnectionId.set("JdbcConnection-" + connectionId); + } + return () -> { + if (prevId == null) { + currentConnectionId.remove(); + } else { + currentConnectionId.set(prevId); + } + }; + } + + static void setContextPersistent(String connectionId) { + if (connectionId != null && !connectionId.isEmpty()) { + currentConnectionId.set("JdbcConnection-" + connectionId); + } + } + + static void clearContext() { + currentConnectionId.remove(); } /** diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index c59061b25467..f7a594c4ea58 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -132,7 +132,13 @@ static BigQueryJsonResultSet getNestedResultSet( /* Advances the result set to the next row, returning false if no such row exists. Potentially blocking operation */ public boolean next() throws SQLException { + BigQueryJdbcMdc.setContextPersistent(this.connectionId); + LOG.finest("++enter++"); checkClosed(); + return nextImpl(); + } + + private boolean nextImpl() throws SQLException { if (this.isNested) { // We are working with the nested record, the cursor would have been // populated. @@ -186,9 +192,13 @@ public boolean next() throws SQLException { @Override public Object getObject(int columnIndex) throws SQLException { - // columnIndex is SQL index starting at 1 - checkClosed(); + BigQueryJdbcMdc.setContextPersistent(this.connectionId); LOG.finest("++enter++"); + checkClosed(); + return getObjectImpl(columnIndex); + } + + private Object getObjectImpl(int columnIndex) throws SQLException { FieldValue value = getObjectInternal(columnIndex); if (value == null || value.isNull()) { return null; @@ -209,8 +219,8 @@ public Object getObject(int columnIndex) throws SQLException { return this.bigQueryTypeCoercer.coerceTo(targetClass, value); } - int extraIndex = this.isNested ? 2 : 1; - Field fieldSchema = this.schemaFieldList.get(columnIndex - extraIndex); + int fieldIndex = this.isNested ? 2 : 1; + Field fieldSchema = this.schemaFieldList.get(columnIndex - fieldIndex); if (isArray(fieldSchema)) { return new BigQueryJsonArray(fieldSchema, value); } else if (isStruct(fieldSchema)) { @@ -274,17 +284,24 @@ private FieldValue getObjectInternal(int columnIndex) throws SQLException { } @Override - public void close() { - LOG.fine("Closing BigqueryJsonResultSet %s.", this); - this.isClosed = true; - if (ownedThreads != null) { - for (Thread ownedThread : ownedThreads) { - if (!ownedThread.isInterrupted()) { - ownedThread.interrupt(); + public void close() throws SQLException { + LOG.finest("++enter++"); + if (isClosed()) { + return; + } + try (BigQueryJdbcMdc.MdcCloseable mdc = + BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + LOG.fine("Closing BigqueryJsonResultSet %s.", this); + this.isClosed = true; + if (ownedThreads != null) { + for (Thread ownedThread : ownedThreads) { + if (!ownedThread.isInterrupted()) { + ownedThread.interrupt(); + } } } + super.close(); } - super.close(); } @Override diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryParameterHandler.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryParameterHandler.java index 8daaf99b62a8..edf4186d97af 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryParameterHandler.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryParameterHandler.java @@ -115,27 +115,36 @@ Object getParameter(int index) { // Index is 1-based. Converting to 0 based for java. int arrayIndex = index - 1; if (parametersList.size() <= arrayIndex || parametersList.get(arrayIndex) == null) { + LOG.finest("getParameter(index=%d) returned null", index); return null; } - return parametersList.get(arrayIndex).getValue(); + Object value = parametersList.get(arrayIndex).getValue(); + LOG.finest("getParameter(index=%d) returned %s", index, value); + return value; } Class getType(int index) { // Index is 1-based. Converting to 0 based for java. int arrayIndex = index - 1; if (parametersList.size() <= arrayIndex || parametersList.get(arrayIndex) == null) { + LOG.finest("getType(index=%d) returned null", index); return null; } - return parametersList.get(arrayIndex).getType(); + Class type = parametersList.get(arrayIndex).getType(); + LOG.finest("getType(index=%d) returned %s", index, type); + return type; } StandardSQLTypeName getSqlType(int index) { // Index is 1-based. Converting to 0 based for java. int arrayIndex = index - 1; if (parametersList.size() <= arrayIndex || parametersList.get(arrayIndex) == null) { + LOG.finest("getSqlType(index=%d) returned null", index); return null; } - return parametersList.get(arrayIndex).getSqlType(); + StandardSQLTypeName sqlType = parametersList.get(arrayIndex).getSqlType(); + LOG.finest("getSqlType(index=%d) returned %s", index, sqlType); + return sqlType; } void clearParameters() { diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryPreparedStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryPreparedStatement.java index 356578e8dd27..8e383757eeea 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryPreparedStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryPreparedStatement.java @@ -90,7 +90,13 @@ private int getParameterCount(String query) { @Override public ResultSet executeQuery() throws SQLException { - LOG.finest("++enter++"); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return executeQueryImpl(); + } + } + + private ResultSet executeQueryImpl() throws SQLException { logQueryExecutionStart(this.currentQuery); try { QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery); @@ -105,7 +111,13 @@ public ResultSet executeQuery() throws SQLException { @Override public long executeLargeUpdate() throws SQLException { - LOG.finest("++enter++"); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return executeLargeUpdateImpl(); + } + } + + private long executeLargeUpdateImpl() throws SQLException { logQueryExecutionStart(this.currentQuery); try { QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery); @@ -120,13 +132,21 @@ public long executeLargeUpdate() throws SQLException { @Override public int executeUpdate() throws SQLException { - LOG.finest("++enter++"); - return checkUpdateCount(executeLargeUpdate()); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ currentQuery: " + this.currentQuery); + return checkUpdateCount(executeLargeUpdateImpl()); + } } @Override public boolean execute() throws SQLException { - LOG.finest("++enter++"); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++"); + return executeImpl(); + } + } + + private boolean executeImpl() throws SQLException { logQueryExecutionStart(this.currentQuery); try { QueryJobConfiguration.Builder jobConfiguration = getJobConfig(this.currentQuery); @@ -270,7 +290,13 @@ private ArrayList deepCopyParameterList( @Override public int[] executeBatch() throws SQLException { - LOG.finest("++enter++"); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ currentQuery: " + this.currentQuery); + return executeBatchImpl(); + } + } + + private int[] executeBatchImpl() throws SQLException { int[] result = new int[this.batchParameters.size()]; if (this.batchParameters.isEmpty()) { return result; diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadata.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadata.java index d18c689333a4..32171e8fe74f 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadata.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadata.java @@ -28,25 +28,32 @@ /** This class returns ResultSetMetadata for the JSON and the Arrow ResultSets */ class BigQueryResultSetMetadata implements ResultSetMetaData { - private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); + static final BigQueryJdbcCustomLogger LOG = + new BigQueryJdbcCustomLogger(BigQueryResultSetMetadata.class.getName()); private final FieldList schemaFieldList; private final Statement statement; + private final BigQueryConnection connection; private final int columnCount; private static final int DEFAULT_DISPLAY_SIZE = 50; - private BigQueryResultSetMetadata(FieldList schemaFieldList, Statement statement) { + private BigQueryResultSetMetadata(FieldList schemaFieldList, Statement statement) + throws SQLException { LOG.finest("++enter++"); this.schemaFieldList = schemaFieldList; this.columnCount = schemaFieldList.size(); this.statement = statement; + this.connection = statement != null ? (BigQueryConnection) statement.getConnection() : null; } - static BigQueryResultSetMetadata of(FieldList schemaFieldList, Statement statement) { + static BigQueryResultSetMetadata of(FieldList schemaFieldList, Statement statement) + throws SQLException { + LOG.finest("++enter++"); return new BigQueryResultSetMetadata(schemaFieldList, statement); } private Field getField(int sqlColumn) { + LOG.finest("++enter++"); return this.schemaFieldList.get(sqlColumn - 1); } @@ -62,13 +69,15 @@ public boolean isAutoIncrement(int column) { } @Override - public boolean isCaseSensitive(int column) { + public boolean isCaseSensitive(int column) throws SQLException { + LOG.finest("++enter++"); int colType = getColumnType(column); return colType == Types.NVARCHAR; } @Override - public boolean isSearchable(int column) { + public boolean isSearchable(int column) throws SQLException { + LOG.finest("++enter++"); int colType = getColumnType(column); return colType != Types.OTHER; } @@ -79,7 +88,8 @@ public boolean isCurrency(int column) { } @Override - public int isNullable(int column) { + public int isNullable(int column) throws SQLException { + LOG.finest("++enter++"); Mode colMode = getField(column).getMode(); if (colMode == null) { return ResultSetMetaData.columnNullableUnknown; @@ -90,7 +100,8 @@ public int isNullable(int column) { } @Override - public boolean isSigned(int column) { + public boolean isSigned(int column) throws SQLException { + LOG.finest("++enter++"); int colType = getColumnType(column); return colType == Types.FLOAT || colType == Types.DOUBLE @@ -99,7 +110,14 @@ public boolean isSigned(int column) { } @Override - public int getColumnDisplaySize(int column) { + public int getColumnDisplaySize(int column) throws SQLException { + LOG.finest("++enter++"); + return getColumnDisplaySizeImpl(column); + } + + private int getColumnDisplaySizeImpl(int column) throws SQLException { + LOG.finest("++enter++"); + LOG.info("getColumnDisplaySize called for column: %d", column); int colType = getColumnType(column); switch (colType) { case Types.BOOLEAN: @@ -119,22 +137,26 @@ public int getColumnDisplaySize(int column) { } @Override - public String getColumnLabel(int column) { + public String getColumnLabel(int column) throws SQLException { + LOG.finest("++enter++"); return getField(column).getName(); } @Override - public String getColumnName(int column) { + public String getColumnName(int column) throws SQLException { + LOG.finest("++enter++"); return getField(column).getName(); } @Override - public int getPrecision(int column) { + public int getPrecision(int column) throws SQLException { + LOG.finest("++enter++"); return (int) (getField(column).getPrecision() != null ? getField(column).getPrecision() : 0); } @Override - public int getScale(int column) { + public int getScale(int column) throws SQLException { + LOG.finest("++enter++"); return (int) (getField(column).getScale() != null ? getField(column).getScale() : 0); } @@ -156,6 +178,7 @@ public String getSchemaName(int column) { } private StandardSQLTypeName getStandardSQLTypeName(int column) { + LOG.finest("++enter++"); Field field = getField(column); if (field.getMode() == Mode.REPEATED) { return StandardSQLTypeName.ARRAY; @@ -164,13 +187,16 @@ private StandardSQLTypeName getStandardSQLTypeName(int column) { } @Override - public int getColumnType(int column) { + public int getColumnType(int column) throws SQLException { + LOG.finest("++enter++"); + LOG.info("getColumnType called for column: %d", column); return BigQueryJdbcTypeMappings.standardSQLToJavaSqlTypesMapping.get( getStandardSQLTypeName(column)); } @Override - public String getColumnTypeName(int column) { + public String getColumnTypeName(int column) throws SQLException { + LOG.finest("++enter++"); return getStandardSQLTypeName(column).name(); } @@ -180,7 +206,8 @@ public boolean isReadOnly(int column) { } @Override - public boolean isWritable(int column) { + public boolean isWritable(int column) throws SQLException { + LOG.finest("++enter++"); return !isReadOnly(column); } @@ -190,7 +217,9 @@ public boolean isDefinitelyWritable(int column) { } @Override - public String getColumnClassName(int column) { + public String getColumnClassName(int column) throws SQLException { + LOG.finest("++enter++"); + LOG.info("getColumnClassName called for column: %d", column); Field field = getField(column); if (field.getMode() == Mode.REPEATED) { return java.sql.Array.class.getName(); diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index c2bcdc96a6c1..ed545fb839d6 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -91,7 +91,8 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { private static final int MAX_PROCESS_QUERY_THREADS_CNT = 50; protected static ExecutorService queryTaskExecutor = Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); - private final BigQueryJdbcCustomLogger LOG = new BigQueryJdbcCustomLogger(this.toString()); + private static final BigQueryJdbcCustomLogger LOG = + new BigQueryJdbcCustomLogger(BigQueryStatement.class.getName()); private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; private static final String DEFAULT_TABLE_NAME = "temp_table_"; private static final String JDBC_JOB_PREFIX = "google-jdbc-"; @@ -149,6 +150,7 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { @VisibleForTesting public BigQueryStatement(BigQueryConnection connection) { + LOG.finest("++enter++"); this.connection = connection; this.connectionId = connection.getConnectionId(); this.bigQuery = connection.getBigQuery(); @@ -156,6 +158,7 @@ public BigQueryStatement(BigQueryConnection connection) { } private void resetStatementFields() { + LOG.finest("++enter++"); this.isCanceled = false; this.scriptQuery = null; this.parentJobId = null; @@ -235,8 +238,7 @@ private BigQuerySettings generateBigQuerySettings() { */ @Override public ResultSet executeQuery(String sql) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); return executeQueryImpl(sql); @@ -244,6 +246,7 @@ public ResultSet executeQuery(String sql) throws SQLException { } private ResultSet executeQueryImpl(String sql) throws SQLException { + LOG.finest("++enter++"); logQueryExecutionStart(sql); try { QueryJobConfiguration jobConfiguration = @@ -262,8 +265,7 @@ private ResultSet executeQueryImpl(String sql) throws SQLException { @Override public long executeLargeUpdate(String sql) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); return executeLargeUpdateImpl(sql); @@ -271,6 +273,7 @@ public long executeLargeUpdate(String sql) throws SQLException { } private long executeLargeUpdateImpl(String sql) throws SQLException { + LOG.finest("++enter++"); logQueryExecutionStart(sql); try { QueryJobConfiguration.Builder jobConfiguration = getJobConfig(sql); @@ -287,12 +290,9 @@ private long executeLargeUpdateImpl(String sql) throws SQLException { @Override public int executeUpdate(String sql) throws SQLException { - try { - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId); - LOG.finest("++enter++"); - return checkUpdateCount(executeLargeUpdate(sql)); - } finally { - BigQueryJdbcMdc.clear(); + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { + LOG.finest("++enter++ sql: " + sql); + return checkUpdateCount(executeLargeUpdateImpl(sql)); } } @@ -308,8 +308,7 @@ int checkUpdateCount(long updateCount) { @Override public boolean execute(String sql) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); return executeImpl(sql); @@ -317,6 +316,7 @@ public boolean execute(String sql) throws SQLException { } private boolean executeImpl(String sql) throws SQLException { + LOG.finest("++enter++"); logQueryExecutionStart(sql); try { QueryJobConfiguration jobConfiguration = getJobConfig(sql).build(); @@ -392,11 +392,11 @@ QueryStatistics getQueryStatistics(QueryJobConfiguration queryJobConfiguration) */ @Override public void close() throws SQLException { + LOG.finest("++enter++"); if (isClosed()) { return; } - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.fine("Closing Statement %s.", this); boolean cancelSucceeded = false; @@ -417,36 +417,43 @@ public void close() throws SQLException { @Override public int getMaxFieldSize() { + LOG.finest("++enter++"); return this.maxFieldSize; } @Override public void setMaxFieldSize(int max) { + LOG.finest("++enter++"); this.maxFieldSize = max; } @Override public int getMaxRows() { + LOG.finest("++enter++"); return this.maxRows; } @Override public void setMaxRows(int max) { + LOG.finest("++enter++"); this.maxRows = max; } @Override public void setEscapeProcessing(boolean enable) { + LOG.finest("++enter++"); // TODO: verify how to implement this method } @Override public int getQueryTimeout() { + LOG.finest("++enter++"); return this.queryTimeout; } @Override public void setQueryTimeout(int seconds) { + LOG.finest("++enter++"); if (seconds < 0) { IllegalArgumentException ex = new IllegalArgumentException("Query Timeout should be >= 0."); LOG.severe(ex.getMessage(), ex); @@ -463,8 +470,7 @@ public void setQueryTimeout(int seconds) { */ @Override public void cancel() throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("Statement %s cancelled", this); synchronized (cancelLock) { this.isCanceled = true; @@ -492,36 +498,43 @@ public void cancel() throws SQLException { @Override public SQLWarning getWarnings() { + LOG.finest("++enter++"); return this.warning; } @Override public void clearWarnings() { + LOG.finest("++enter++"); this.warning = null; } @Override public ResultSet getResultSet() { + LOG.finest("++enter++"); return this.currentResultSet; } @VisibleForTesting void setUpdateCount(long count) { + LOG.finest("++enter++"); this.currentUpdateCount = count; } @Override public int getUpdateCount() { + LOG.finest("++enter++"); return (int) this.currentUpdateCount; } @Override public long getLargeUpdateCount() { + LOG.finest("++enter++"); return this.currentUpdateCount; } @Override public boolean getMoreResults() throws SQLException { + LOG.finest("++enter++"); return getMoreResults(CLOSE_CURRENT_RESULT); } @@ -547,11 +560,13 @@ private void closeStatementResources() throws SQLException { } private boolean isSingularResultSet() { + LOG.finest("++enter++"); return this.currentResultSet != null && (this.parentJobId == null || this.parentJobId.getJobs().size() == 1); } private String generateJobId() { + LOG.finest("++enter++"); return JDBC_JOB_PREFIX + UUID.randomUUID().toString(); } @@ -651,6 +666,7 @@ void runQuery(String query, QueryJobConfiguration jobConfiguration) } private boolean isLargeResultsEnabled() { + LOG.finest("++enter++"); String destinationTable = this.querySettings.getDestinationTable(); String destinationDataset = this.querySettings.getDestinationDataset(); return destinationDataset != null || destinationTable != null; @@ -658,6 +674,7 @@ private boolean isLargeResultsEnabled() { private QueryJobConfiguration setDestinationDatasetAndTableInJobConfig( QueryJobConfiguration jobConfiguration) { + LOG.finest("++enter++"); String destinationTable = this.querySettings.getDestinationTable(); String destinationDataset = this.querySettings.getDestinationDataset(); if (destinationDataset != null || destinationTable != null) { @@ -682,6 +699,7 @@ private QueryJobConfiguration setDestinationDatasetAndTableInJobConfig( } Job getNextJob() { + LOG.finest("++enter++"); if (this.parentJobId == null) { return null; } @@ -756,6 +774,7 @@ void handleQueryResult(String query, TableResult results, SqlType queryType) } private void updateAffectedRowCount(Long count) throws SQLException { + LOG.finest("++enter++"); // TODO(neenu): check if this need to be closed vs removed) if (this.currentResultSet != null) { try { @@ -770,6 +789,7 @@ private void updateAffectedRowCount(Long count) throws SQLException { @InternalApi BigQueryReadClient getBigQueryReadClient() { + LOG.finest("++enter++"); if (this.bigQueryReadClient == null) { this.bigQueryReadClient = this.connection.getBigQueryReadClient(); } @@ -784,6 +804,7 @@ ReadSession getReadSession(CreateReadSessionRequest readSessionRequest) { @InternalApi ArrowSchema getArrowSchema(ReadSession readSession) { + LOG.finest("++enter++"); return readSession.getArrowSchema(); } @@ -796,6 +817,7 @@ ResultSet processArrowResultSet(TableResult results) throws SQLException { long totalRows = (getMaxRows() > 0) ? getMaxRows() : results.getTotalRows(); JobId currentJobId = results.getJobId(); TableId destinationTable = getDestinationTable(currentJobId); + LOG.info("Processing Arrow result set for table: %s", destinationTable); Schema schema = results.getSchema(); try { String parent = String.format("projects/%s", destinationTable.getProject()); @@ -932,6 +954,8 @@ Thread populateArrowBufferedQueue( /** Executes SQL query using either fast query path or read API */ void processQueryResponse(String query, TableResult results) throws SQLException { + LOG.finest("++enter++"); + LOG.info("Processing query response with total rows: %d", results.getTotalRows()); LOG.finest( "API call completed{Query=%s, Parent Job ID=%s, Total rows=%s} ", query, results.getJobId(), results.getTotalRows()); @@ -980,6 +1004,7 @@ private boolean meetsReadRatio(TableResult results) { } BigQueryJsonResultSet processJsonResultSet(TableResult results) { + LOG.finest("++enter++"); String jobIdOrQueryId = results.getJobId() == null ? results.getQueryId() : results.getJobId().getJob(); LOG.info("BigQuery Job %s completed. Fetching results.", jobIdOrQueryId); @@ -1054,6 +1079,7 @@ void populateFirstPage( @Override public void setFetchDirection(int direction) throws SQLException { + LOG.finest("++enter++"); if (direction != ResultSet.FETCH_FORWARD) { throw new BigQueryJdbcSqlFeatureNotSupportedException("Only FETCH_FORWARD is supported."); } @@ -1239,6 +1265,7 @@ else if (numCachedRows < 2000 && numColumns < 15) { @Override public int getFetchDirection() { + LOG.finest("++enter++"); return this.fetchDirection; } @@ -1246,6 +1273,7 @@ public int getFetchDirection() { // getNumBufferedRows in querySettings is always the same withDefaultValues - 20000 buffer size // So, getBufferSize is also 20000. private int getBufferSize() { + LOG.finest("++enter++"); return (this.querySettings == null || this.querySettings.getNumBufferedRows() == null || this.querySettings.getNumBufferedRows() < 10000 @@ -1255,6 +1283,7 @@ private int getBufferSize() { /** Returns the destinationTable from jobId by calling `jobs.get` API */ TableId getDestinationTable(JobId jobId) { + LOG.finest("++enter++"); Job job = this.bigQuery.getJob(jobId); LOG.finest("Destination Table retrieved from %s", job.getJobId()); return ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable(); @@ -1307,6 +1336,7 @@ QueryJobConfiguration.Builder getJobConfig(String query) { } private void checkIfDatasetExistElseCreate(String datasetName) { + LOG.finest("++enter++"); Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetName)); if (dataset == null) { LOG.info("Creating a hidden dataset: %s ", datasetName); @@ -1319,6 +1349,7 @@ private void checkIfDatasetExistElseCreate(String datasetName) { } private String getDefaultDestinationTable() { + LOG.finest("++enter++"); String timeOfCreation = String.valueOf(Instant.now().toEpochMilli()); String randomizedId = String.valueOf(new Random().nextInt(9999)); return DEFAULT_TABLE_NAME + timeOfCreation + randomizedId; @@ -1339,11 +1370,13 @@ JobIdWrapper insertJob(JobConfiguration jobConfiguration) throws SQLException { @Override public void setFetchSize(int rows) { + LOG.finest("++enter++"); this.fetchSize = rows; } @Override public int getFetchSize() { + LOG.finest("++enter++"); return this.fetchSize; } @@ -1353,6 +1386,7 @@ public int getFetchSize() { * @return A map of the extra labels. */ public Map getExtraLabels() { + LOG.finest("++enter++"); return this.extraLabels; } @@ -1362,20 +1396,24 @@ public Map getExtraLabels() { * @param extraLabels A map of the extra labels. */ public void setExtraLabels(Map extraLabels) { + LOG.finest("++enter++"); this.extraLabels = extraLabels; } @Override public int getResultSetConcurrency() { + LOG.finest("++enter++"); return ResultSet.CONCUR_READ_ONLY; } ResultSet getCurrentResultSet() { + LOG.finest("++enter++"); return this.currentResultSet; } @Override public int getResultSetType() { + LOG.finest("++enter++"); return ResultSet.TYPE_FORWARD_ONLY; } @@ -1390,32 +1428,39 @@ static class JobIdWrapper { private ArrayList jobs; public JobIdWrapper(JobId jobId, TableResult firstPage, ArrayList jobs) { + LOG.finest("++enter++"); this.jobId = jobId; this.firstPage = firstPage; this.jobs = jobs; } JobId getJobId() { + LOG.finest("++enter++"); return this.jobId; } void setJobId(JobId jobId) { + LOG.finest("++enter++"); this.jobId = jobId; } TableResult getResults() { + LOG.finest("++enter++"); return this.firstPage; } void setResults(TableResult firstPage) { + LOG.finest("++enter++"); this.firstPage = firstPage; } ArrayList getJobs() { + LOG.finest("++enter++"); return jobs; } void setJobs(ArrayList jobs) { + LOG.finest("++enter++"); this.jobs = jobs; } } @@ -1442,12 +1487,14 @@ public void addBatch(String sql) throws SQLException { @Override public void clearBatch() { + LOG.finest("++enter++"); this.batchQueries.clear(); } @Override public int[] executeBatch() throws SQLException { LOG.finest("++enter++"); + LOG.info("Executing batch of %d queries", this.batchQueries.size()); int[] result = new int[this.batchQueries.size()]; if (this.batchQueries.isEmpty()) { return result; @@ -1475,10 +1522,12 @@ public int[] executeBatch() throws SQLException { @Override public Connection getConnection() { + LOG.finest("++enter++"); return this.connection; } public boolean hasMoreResults() { + LOG.finest("++enter++"); if (this.parentJobId == null) { return false; } @@ -1487,8 +1536,7 @@ public boolean hasMoreResults() { @Override public boolean getMoreResults(int current) throws SQLException { - try (BigQueryJdbcMdc.MdcCloseable mdc = - BigQueryJdbcMdc.registerInstance(this.connection, this.connectionId)) { + try (BigQueryJdbcMdc.MdcCloseable mdc = BigQueryJdbcMdc.setContext(this.connectionId)) { LOG.finest("++enter++"); checkClosed(); return getMoreResultsImpl(current); @@ -1496,6 +1544,7 @@ public boolean getMoreResults(int current) throws SQLException { } private boolean getMoreResultsImpl(int current) throws SQLException { + LOG.finest("++enter++"); if (current != CLOSE_CURRENT_RESULT) { throw new BigQueryJdbcSqlFeatureNotSupportedException( "The JDBC driver only supports Statement.CLOSE_CURRENT_RESULT."); @@ -1530,11 +1579,13 @@ private boolean getMoreResultsImpl(int current) throws SQLException { @Override public boolean isWrapperFor(Class iface) { + LOG.finest("++enter++"); return iface.isInstance(this); } @Override public T unwrap(Class iface) throws SQLException { + LOG.finest("++enter++"); if (!isWrapperFor(iface)) { throw new BigQueryJdbcException( String.format("Unable to cast Statement to %s class.", iface.getName())); @@ -1544,35 +1595,42 @@ public T unwrap(Class iface) throws SQLException { @Override public int getResultSetHoldability() { + LOG.finest("++enter++"); return ResultSet.CLOSE_CURSORS_AT_COMMIT; } @Override public boolean isClosed() { + LOG.finest("++enter++"); return this.isClosed; } @Override public void setPoolable(boolean poolable) { + LOG.finest("++enter++"); this.poolable = poolable; } @Override public boolean isPoolable() { + LOG.finest("++enter++"); return this.poolable; } @Override public void closeOnCompletion() { + LOG.finest("++enter++"); this.closeOnCompletion = true; } @Override public boolean isCloseOnCompletion() { + LOG.finest("++enter++"); return this.closeOnCompletion; } protected void logQueryExecutionStart(String sql) { + LOG.finest("++enter++"); if (sql == null) { return; } @@ -1585,6 +1643,7 @@ protected void logQueryExecutionStart(String sql) { /** Throws a {@link BigQueryJdbcException} if this object is closed */ void checkClosed() throws SQLException { + LOG.finest("++enter++"); if (isClosed()) { throw new BigQueryJdbcException("This " + getClass().getName() + " has been closed"); } @@ -1606,20 +1665,24 @@ enum QueryDialectType { } private void enqueueError(BlockingQueue queue, Exception e) { + LOG.finest("++enter++"); Uninterruptibles.putUninterruptibly( queue, BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e))); } private void enqueueEndOfStream(BlockingQueue queue) { + LOG.finest("++enter++"); Uninterruptibles.putUninterruptibly(queue, BigQueryArrowBatchWrapper.of(null, true)); } private void enqueueBufferError(BlockingQueue queue, Exception e) { + LOG.finest("++enter++"); Uninterruptibles.putUninterruptibly( queue, BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(e))); } private void enqueueBufferEndOfStream(BlockingQueue queue) { + LOG.finest("++enter++"); Uninterruptibles.putUninterruptibly(queue, BigQueryFieldValueListWrapper.of(null, null, true)); } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 4d108ee54f8b..37ed5ff69dca 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -610,7 +610,7 @@ public void testSortResults_Tables() { results.add(createTableRow("cat_a", null, "view_0", "VIEW", schemaFields)); Comparator comparator = dbMetadata.defineGetTablesComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getTables", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getTables"); // Expected order: TABLEs first, then VIEWs. Within type, sort by CAT, SCHEM, NAME assertEquals(6, results.size()); @@ -659,7 +659,7 @@ public void testSortResults_Tables_EmptyList() { List results = new ArrayList<>(); Comparator comparator = dbMetadata.defineGetTablesComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getTables", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getTables"); assertTrue(results.isEmpty()); } @@ -728,7 +728,7 @@ public void testSortResults_Schemas() { results.add(createSchemaRow("proj_a", "schema_c", schemaFields)); Comparator comparator = dbMetadata.defineGetSchemasComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getSchemas", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getSchemas"); // Expected order: Sort by TABLE_CATALOG (nulls first), then TABLE_SCHEM assertEquals(5, results.size()); @@ -761,7 +761,7 @@ public void testSortResults_Schemas_EmptyList() { List results = new ArrayList<>(); Comparator comparator = dbMetadata.defineGetSchemasComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getSchemas", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getSchemas"); assertTrue(results.isEmpty()); } @@ -906,7 +906,7 @@ public void testSortResults_Procedures() { createProcedureRow("cat_a", "sch_z", "proc_alpha", "proc_alpha_spec_older", schemaFields)); Comparator comparator = dbMetadata.defineGetProceduresComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getProcedures", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getProcedures"); // Expected Order: Null Cat, then Cat A (Null Schem, then sch_z), then Cat B. Within that, Name, // then Spec Name. @@ -961,7 +961,7 @@ public void testSortResults_Procedures_EmptyList() { List results = new ArrayList<>(); Comparator comparator = dbMetadata.defineGetProceduresComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getProcedures", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getProcedures"); assertTrue(results.isEmpty()); } @@ -994,8 +994,7 @@ public void testFindMatchingBigQueryObjects_Routines_ListWithPattern() { RoutineId.of(datasetId.getProject(), datasetId.getDataset(), name)), (rt) -> rt.getRoutineId().getRoutine(), pattern, - regex, - dbMetadata.LOG); + regex); verify(bigqueryClient, times(1)) .listRoutines(eq(datasetId), any(BigQuery.RoutineListOption.class)); @@ -1036,8 +1035,7 @@ public void testFindMatchingBigQueryObjects_Routines_ListNoPattern() { RoutineId.of(datasetId.getProject(), datasetId.getDataset(), name)), (rt) -> rt.getRoutineId().getRoutine(), pattern, - regex, - dbMetadata.LOG); + regex); verify(bigqueryClient, times(1)) .listRoutines(eq(datasetId), any(BigQuery.RoutineListOption.class)); @@ -1071,8 +1069,7 @@ public void testFindMatchingBigQueryObjects_Routines_GetSpecific() { RoutineId.of(datasetId.getProject(), datasetId.getDataset(), name)), (rt) -> rt.getRoutineId().getRoutine(), procNameExact, - regex, - dbMetadata.LOG); + regex); verify(bigqueryClient, times(1)).getRoutine(eq(routineId)); verify(bigqueryClient, never()) @@ -1464,7 +1461,7 @@ public void testDefineGetProcedureColumnsComparator() { Comparator comparator = dbMetadata.defineGetProcedureColumnsComparator(schemaFields); assertNotNull(comparator); - dbMetadata.sortResults(results, comparator, "getProcedureColumns", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getProcedureColumns"); assertEquals(5, results.size()); @@ -1549,7 +1546,7 @@ public void testListMatchingProcedureIdsFromDatasets() throws Exception { List resultIds = dbMetadata.listMatchingProcedureIdsFromDatasets( - datasetsToScan, null, null, mockExecutor, catalog, dbMetadata.LOG); + datasetsToScan, null, null, mockExecutor, catalog); assertEquals(2, resultIds.size()); assertTrue(resultIds.contains(proc1_ds1.getRoutineId())); @@ -1606,8 +1603,7 @@ public void testSubmitProcedureArgumentProcessingJobs_Basic() throws Interrupted collectedResults, resultSchemaFields, mockExecutor, - processingTaskFutures, - dbMetadata.LOG); + processingTaskFutures); verify(mockExecutor, times(2)).submit(any(Runnable.class)); assertEquals(2, processingTaskFutures.size()); @@ -2491,7 +2487,7 @@ public void testSortResults_Functions() { schemaFields)); Comparator comparator = dbMetadata.defineGetFunctionsComparator(schemaFields); - dbMetadata.sortResults(results, comparator, "getFunctions", dbMetadata.LOG); + dbMetadata.sortResults(results, comparator, "getFunctions"); // Expected Order: Null Cat, then Cat A (Null Schem, then sch_z), then Cat B. Within that, Name, // then Spec Name. diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java index 70cb56a53f10..ce1fa90d279a 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryJdbcMdcTest.java @@ -158,4 +158,36 @@ public void testMdcCloseableClearsContext() { } assertNull(BigQueryJdbcMdc.getConnectionId()); } + + @Test + public void testMdcCloseableNesting() { + try (BigQueryJdbcMdc.MdcCloseable mdc1 = + BigQueryJdbcMdc.registerInstance(mockConnection1, "outer")) { + assertEquals("JdbcConnection-outer", BigQueryJdbcMdc.getConnectionId()); + + try (BigQueryJdbcMdc.MdcCloseable mdc2 = + BigQueryJdbcMdc.registerInstance(mockConnection1, "inner")) { + assertEquals("JdbcConnection-outer", BigQueryJdbcMdc.getConnectionId()); + } + + assertEquals("JdbcConnection-outer", BigQueryJdbcMdc.getConnectionId()); + } + assertNull(BigQueryJdbcMdc.getConnectionId()); + } + + @Test + public void testMdcCloseableNestingDifferentConnections() { + try (BigQueryJdbcMdc.MdcCloseable mdc1 = + BigQueryJdbcMdc.registerInstance(mockConnection1, "conn1")) { + assertEquals("JdbcConnection-conn1", BigQueryJdbcMdc.getConnectionId()); + + try (BigQueryJdbcMdc.MdcCloseable mdc2 = + BigQueryJdbcMdc.registerInstance(mockConnection2, "conn2")) { + assertEquals("JdbcConnection-conn2", BigQueryJdbcMdc.getConnectionId()); + } + + assertEquals("JdbcConnection-conn1", BigQueryJdbcMdc.getConnectionId()); + } + assertNull(BigQueryJdbcMdc.getConnectionId()); + } } diff --git a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java index 6e7c9147bae8..7965d3933187 100644 --- a/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java +++ b/java-bigquery/google-cloud-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryResultSetMetadataTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; @@ -33,12 +34,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class BigQueryResultSetMetadataTest { private BigQueryStatement statement; + @Mock private BigQueryConnection mockConnection; private static Field tenthField = Field.newBuilder("tenth", LegacySQLTypeName.NUMERIC) @@ -109,6 +112,7 @@ public class BigQueryResultSetMetadataTest { @BeforeEach public void setUp() throws SQLException { statement = mock(BigQueryStatement.class); + when(statement.getConnection()).thenReturn(mockConnection); Thread[] workerThreads = {new Thread()}; BigQueryJsonResultSet bigQueryJsonResultSet = BigQueryJsonResultSet.of(QUERY_SCHEMA, 1L, null, statement, workerThreads);