Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
String partnerToken;
DatabaseMetaData databaseMetaData;
Boolean reqGoogleDriveScope;
private boolean isReadOnlyTokenUsed = false;

BigQueryConnection(String url) throws IOException {
this(url, DataSource.fromUrl(url));
Expand Down Expand Up @@ -172,6 +173,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.jobTimeoutInSeconds = ds.getJobTimeout();
this.authProperties =
BigQueryJdbcOAuthUtility.parseOAuthProperties(ds, this.connectionClassName);
this.isReadOnlyTokenUsed = checkIsReadOnlyTokenUsed(this.authProperties);
this.catalog = ds.getProjectId();
this.universeDomain = ds.getUniverseDomain();

Expand Down Expand Up @@ -1193,4 +1195,18 @@ public CallableStatement prepareCall(
}
return prepareCall(sql);
}

public boolean isReadOnlyTokenUsed() {
return this.isReadOnlyTokenUsed;
}

private boolean checkIsReadOnlyTokenUsed(Map<String, String> authProps) {
String readonlyValue =
authProps.get(BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME);
if (readonlyValue != null) {
return BigQueryJdbcUrlUtility.convertIntToBoolean(
readonlyValue, BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ static Map<String, String> parseOAuthProperties(DataSource ds, String callerClas
}
oauthProperties.put(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_PROPERTY_NAME, ds.getOAuthAccessToken());
oauthProperties.put(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME,
String.valueOf(ds.getOAuthAccessTokenReadonly()));
LOG.fine("OAuthAccessToken provided.");
break;
case APPLICATION_DEFAULT_CREDENTIALS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
static final String BIGQUERY_ENDPOINT_OVERRIDE_PROPERTY_NAME = "BIGQUERY";
static final String STS_ENDPOINT_OVERRIDE_PROPERTY_NAME = "STS";
static final String OAUTH_ACCESS_TOKEN_PROPERTY_NAME = "OAuthAccessToken";
static final String OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME = "OAuthAccessTokenReadonly";
static final String OAUTH_REFRESH_TOKEN_PROPERTY_NAME = "OAuthRefreshToken";
static final String OAUTH_CLIENT_ID_PROPERTY_NAME = "OAuthClientId";
static final String OAUTH_CLIENT_SECRET_PROPERTY_NAME = "OAuthClientSecret";
Expand Down Expand Up @@ -248,6 +249,11 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
"The pre-generated access token to be used with BigQuery for"
+ " authentication.")
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME)
.setDescription(
"Set to true if the pre-generated access token has a read-only scope.")
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(OAUTH_CLIENT_ID_PROPERTY_NAME)
.setDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ private boolean executeImpl(String sql) throws SQLException {

StatementType getStatementType(QueryJobConfiguration queryJobConfiguration) throws SQLException {
LOG.finest("++enter++");
// BQ Read-only tokens are not recommended to use, they have a lot of known flaws.
// We're supporting them in a limited capacity, for pure SELECT statements.
if (this.connection.isReadOnlyTokenUsed()) {
LOG.warning(
"Read-only token detected, skipping dry run and assuming StatementType is SELECT.");
return StatementType.SELECT;
}
QueryJobConfiguration dryRunJobConfiguration =
queryJobConfiguration.toBuilder().setDryRun(true).build();
Job job;
Expand Down Expand Up @@ -574,31 +581,21 @@ ExecuteResult executeJob(QueryJobConfiguration jobConfiguration)
// so we need to explicitly set it;
// Do not set custom JobId here or it will disable jobless queries.
JobId jobId = JobId.newBuilder().setLocation(connection.getLocation()).build();
if (connection.getUseStatelessQueryMode()) {
Object result = bigQuery.queryWithTimeout(jobConfiguration, jobId, null);
if (result instanceof TableResult) {
TableResult tableResult = (TableResult) result;
if (tableResult.getJobId() != null) {
return new ExecuteResult(tableResult, bigQuery.getJob(tableResult.getJobId()));
}
return new ExecuteResult((TableResult) result, null);
Object result = bigQuery.queryWithTimeout(jobConfiguration, jobId, null);
if (result instanceof TableResult) {
TableResult tableResult = (TableResult) result;
if (tableResult.getJobId() != null) {
return new ExecuteResult(tableResult, bigQuery.getJob(tableResult.getJobId()));
}
return new ExecuteResult((TableResult) result, null);
}

if (result instanceof Job) {
job = (Job) result;
} else {
throw new BigQueryJdbcException("Unexpected result type from queryWithTimeout");
}
if (result instanceof Job) {
job = (Job) result;
} else {
// Update jobId with custom JobId if jobless query is disabled.
jobId = jobId.toBuilder().setJob(generateJobId()).build();
JobInfo jobInfo = JobInfo.newBuilder(jobConfiguration).setJobId(jobId).build();
job = bigQuery.create(jobInfo);
throw new BigQueryJdbcException("Unexpected result type from queryWithTimeout");
}
Comment thread
logachev marked this conversation as resolved.

if (job == null) {
throw new BigQueryJdbcException("Failed to create BQ Job.");
}
synchronized (cancelLock) {
if (isCanceled) {
job.cancel();
Expand All @@ -608,12 +605,12 @@ ExecuteResult executeJob(QueryJobConfiguration jobConfiguration)
jobIds.add(jobId);
}
LOG.info("Query submitted with Job ID: " + job.getJobId().getJob());
TableResult result =
TableResult tableResult =
job.getQueryResults(QueryResultsOption.pageSize(querySettings.getMaxResultPerPage()));
synchronized (cancelLock) {
jobIds.remove(jobId);
}
return new ExecuteResult(result, job);
return new ExecuteResult(tableResult, job);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DataSource implements javax.sql.DataSource {
private String oAuthPvtKeyPath;
private String oAuthPvtKey;
private String oAuthAccessToken;
private Boolean oAuthAccessTokenReadonly;
private String oAuthRefreshToken;
private Boolean useQueryCache;
private String queryDialect;
Expand Down Expand Up @@ -175,6 +176,12 @@ public class DataSource implements javax.sql.DataSource {
.put(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_PROPERTY_NAME,
DataSource::setOAuthAccessToken)
.put(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME,
(ds, val) ->
ds.setOAuthAccessTokenReadonly(
BigQueryJdbcUrlUtility.convertIntToBoolean(
val, BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME)))
.put(
BigQueryJdbcUrlUtility.OAUTH_REFRESH_TOKEN_PROPERTY_NAME,
DataSource::setOAuthRefreshToken)
Expand Down Expand Up @@ -451,6 +458,11 @@ private Properties createProperties() {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_PROPERTY_NAME, this.oAuthAccessToken);
}
if (this.oAuthAccessTokenReadonly != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.OAUTH_ACCESS_TOKEN_READONLY_PROPERTY_NAME,
String.valueOf(this.oAuthAccessTokenReadonly));
}
if (this.oAuthRefreshToken != null) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.OAUTH_REFRESH_TOKEN_PROPERTY_NAME, this.oAuthRefreshToken);
Expand Down Expand Up @@ -879,6 +891,14 @@ public void setOAuthAccessToken(String oAuthAccessToken) {
this.oAuthAccessToken = oAuthAccessToken;
}

public Boolean getOAuthAccessTokenReadonly() {
return oAuthAccessTokenReadonly != null ? oAuthAccessTokenReadonly : false;
}

public void setOAuthAccessTokenReadonly(Boolean oAuthAccessTokenReadonly) {
this.oAuthAccessTokenReadonly = oAuthAccessTokenReadonly;
}

public String getOAuthRefreshToken() {
return oAuthRefreshToken;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class BigQueryConnectionTest {

Expand Down Expand Up @@ -437,4 +439,21 @@ public void testWithDriveScopeDefault() throws Exception {
assertFalse(connection.reqGoogleDriveScope);
}
}

@ParameterizedTest
@CsvSource({"1, true", "0, false", "true, true", "false, false"})
public void testIsReadOnlyTokenProvided(String readonlyProp, boolean expectedIsReadOnly)
throws Exception {
String url =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;"
+ "OAuthType=2;ProjectId=MyBigQueryProject;"
+ "OAuthAccessToken=redacted;"
+ "OAuthAccessTokenReadonly="
+ readonlyProp
+ ";";

try (BigQueryConnection connection = new BigQueryConnection(url)) {
assertEquals(expectedIsReadOnly, connection.isReadOnlyTokenUsed());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

Expand Down Expand Up @@ -212,7 +214,7 @@ public void testExecSlowQueryPath() throws SQLException, InterruptedException {
.build();
Job job = getJobMock(tableResult, queryJobConfiguration, StatementType.SELECT);

doReturn(job).when(bigquery).create(any(JobInfo.class));
doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());

doReturn(jobIdWrapper)
.when(bigQueryStatementSpy)
Expand Down Expand Up @@ -297,14 +299,15 @@ public void setQueryTimeoutTest() throws Exception {
QueryJobConfiguration.newBuilder(query).setJobTimeoutMs(10000L).build();

Job job = getJobMock(result, jobConfiguration, StatementType.SELECT);
doReturn(job).when(bigquery).create(any(JobInfo.class));
doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());

doReturn(jsonResultSet).when(bigQueryStatementSpy).processJsonResultSet(result);
ArgumentCaptor<JobInfo> captor = ArgumentCaptor.forClass(JobInfo.class);
ArgumentCaptor<QueryJobConfiguration> captor =
ArgumentCaptor.forClass(QueryJobConfiguration.class);

bigQueryStatementSpy.runQuery(query, jobConfiguration);
verify(bigquery).create(captor.capture());
QueryJobConfiguration jobConfig = captor.getValue().getConfiguration();
verify(bigquery).queryWithTimeout(captor.capture(), any(), any());
QueryJobConfiguration jobConfig = captor.getValue();
assertEquals(3000L, jobConfig.getJobTimeoutMs().longValue());
}

Expand Down Expand Up @@ -393,23 +396,16 @@ public void testJoblessQuery() throws SQLException, InterruptedException {
TableResult tableResultJobfulMock = mock(TableResult.class);
QueryJobConfiguration jobConf = QueryJobConfiguration.newBuilder("SELECT 1").build();
Job jobMock = getJobMock(tableResultJobfulMock, jobConf, StatementType.SELECT);
ArgumentCaptor<JobInfo> jobfulCaptor = ArgumentCaptor.forClass(JobInfo.class);
doReturn(jobMock).when(bigquery).create(jobfulCaptor.capture());
doReturn(jobMock)
.when(bigquery)
.queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
doReturn(mock(BigQueryJsonResultSet.class))
.when(jobfulStatementSpy)
.processJsonResultSet(tableResultJobfulMock);

jobfulStatementSpy.executeQuery("SELECT 1");

verify(bigquery).create(any(JobInfo.class));
assertTrue(
jobfulCaptor.getAllValues().stream()
.noneMatch(
jobInfo ->
Boolean.TRUE.equals(
((QueryJobConfiguration) jobInfo.getConfiguration()).dryRun())));
verify(bigquery, Mockito.never())
.queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
verify(bigquery).queryWithTimeout(any(QueryJobConfiguration.class), any(), any());
}

@Test
Expand All @@ -422,7 +418,7 @@ public void testCloseCancelsJob() throws SQLException, InterruptedException {
QueryJobConfiguration.newBuilder(query).setPriority(Priority.BATCH).build();
Job job = getJobMock(tableResult, queryJobConfiguration, StatementType.SELECT);

doReturn(job).when(bigquery).create(any(JobInfo.class));
doReturn(job).when(bigquery).queryWithTimeout(any(), any(), any());
doReturn(false).when(bigQueryStatementSpy).useReadAPI(eq(tableResult));
doReturn(mock(JobId.class)).when(tableResult).getJobId();
Mockito.when(job.getQueryResults(any(QueryResultsOption.class)))
Expand Down Expand Up @@ -480,4 +476,22 @@ public void testCancelWithJoblessQuery() throws SQLException, InterruptedExcepti
// And no backend cancellation was attempted
verify(bigquery, Mockito.never()).cancel(any(JobId.class));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetStatementType(boolean isReadOnlyTokenUsed) throws Exception {
doReturn(isReadOnlyTokenUsed).when(bigQueryConnection).isReadOnlyTokenUsed();

Job dryRunJobMock = getJobMock(null, null, StatementType.SELECT);
doReturn(dryRunJobMock).when(bigquery).create(any(JobInfo.class));

BigQueryStatement statementSpy = Mockito.spy(bigQueryStatement);
QueryJobConfiguration queryJobConfiguration = QueryJobConfiguration.newBuilder(query).build();

StatementType type = statementSpy.getStatementType(queryJobConfiguration);

assertThat(type).isEqualTo(StatementType.SELECT);
verify(bigquery, isReadOnlyTokenUsed ? Mockito.never() : Mockito.times(1))
.create(any(JobInfo.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class ITAuthTests extends ITBase {
static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
Expand Down Expand Up @@ -283,26 +289,31 @@ public void testValidExternalAccountAuthenticationRawJson() throws SQLException
connection.close();
}

// TODO(farhan): figure out how to programmatically generate an access token and test
@Test
@Disabled
public void testValidPreGeneratedAccessTokenAuthentication() throws SQLException {
String connection_uri =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;PROJECTID="
+ PROJECT_ID
+ ";OAUTHTYPE=2;OAuthAccessToken=access_token;";

Connection connection = DriverManager.getConnection(connection_uri);
assertNotNull(connection);
assertFalse(connection.isClosed());

Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(
"SELECT repository_name FROM `bigquery-public-data.samples.github_timeline` LIMIT 50");
@ParameterizedTest
@CsvSource({
"https://www.googleapis.com/auth/bigquery.readonly, true",
"https://www.googleapis.com/auth/bigquery, false"
})
public void testValidPreGeneratedAccessTokenAuthentication(String scope, boolean isReadOnly)
throws Exception {
final JsonObject authJson = getAuthJson();
InputStream stream =
new ByteArrayInputStream(authJson.toString().getBytes(StandardCharsets.UTF_8));
GoogleCredentials credentials =
GoogleCredentials.fromStream(stream).createScoped(Arrays.asList(scope));
credentials.refresh();
String accessToken = credentials.getAccessToken().getTokenValue();

String connectionUri =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId="
+ authJson.get("project_id").getAsString()
+ ";OAuthType=2"
+ ";OAuthAccessToken="
+ accessToken
+ ";OAuthAccessTokenReadonly="
+ isReadOnly;

assertEquals(50, resultSetRowCount(resultSet));
connection.close();
validateConnection(connectionUri);
}

// TODO(obada): figure out how to programmatically generate a refresh token and test
Expand Down
Loading