This repository was archived by the owner on Nov 11, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 323
BigQuery: fix an issue with option propagation and refactor to future-proof #540
Merged
dhalperi
merged 4 commits into
GoogleCloudPlatform:master
from
dhalperi:bigquery-direct-standard-sql
Feb 16, 2017
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,11 +46,9 @@ | |
import com.google.common.base.MoreObjects; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.util.concurrent.Uninterruptibles; | ||
|
||
import org.joda.time.Duration; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
|
@@ -61,7 +59,6 @@ | |
import java.util.Objects; | ||
import java.util.Random; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
/** | ||
|
@@ -73,6 +70,7 @@ public class BigQueryTableRowIterator implements AutoCloseable { | |
@Nullable private TableReference ref; | ||
@Nullable private final String projectId; | ||
@Nullable private TableSchema schema; | ||
@Nullable private final JobConfigurationQuery queryConfig; | ||
private final Bigquery client; | ||
private String pageToken; | ||
private Iterator<TableRow> iteratorOverCurrentBatch; | ||
|
@@ -89,25 +87,18 @@ public class BigQueryTableRowIterator implements AutoCloseable { | |
// following interval to check the status of query execution job | ||
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); | ||
|
||
private final String query; | ||
// Whether to flatten query results. | ||
private final boolean flattenResults; | ||
// Whether to use the BigQuery legacy SQL dialect.. | ||
private final boolean useLegacySql; | ||
// Temporary dataset used to store query results. | ||
private String temporaryDatasetId = null; | ||
// Temporary table used to store query results. | ||
private String temporaryTableId = null; | ||
|
||
private BigQueryTableRowIterator( | ||
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId, | ||
Bigquery client, boolean flattenResults, boolean useLegacySql) { | ||
@Nullable TableReference ref, @Nullable JobConfigurationQuery queryConfig, | ||
@Nullable String projectId, Bigquery client) { | ||
this.ref = ref; | ||
this.query = query; | ||
this.queryConfig = queryConfig; | ||
this.projectId = projectId; | ||
this.client = checkNotNull(client, "client"); | ||
this.flattenResults = flattenResults; | ||
this.useLegacySql = useLegacySql; | ||
} | ||
|
||
/** | ||
|
@@ -116,7 +107,7 @@ private BigQueryTableRowIterator( | |
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { | ||
checkNotNull(ref, "ref"); | ||
checkNotNull(client, "client"); | ||
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); | ||
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client); | ||
} | ||
|
||
/** | ||
|
@@ -135,23 +126,39 @@ public static BigQueryTableRowIterator fromQuery( | |
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the | ||
* specified query in the specified project. | ||
*/ | ||
@Deprecated | ||
public static BigQueryTableRowIterator fromQuery( | ||
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, | ||
@Nullable Boolean useLegacySql) { | ||
checkNotNull(query, "query"); | ||
checkNotNull(projectId, "projectId"); | ||
checkNotNull(client, "client"); | ||
return new BigQueryTableRowIterator(null, query, projectId, client, | ||
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), | ||
MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); | ||
JobConfigurationQuery queryConfig = new JobConfigurationQuery() | ||
.setFlattenResults(MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)) | ||
.setPriority("BATCH") | ||
.setQuery(query) | ||
.setUseLegacySql(MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); | ||
return new BigQueryTableRowIterator(null, queryConfig, projectId, client); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not refactoring in dataflow 1.x, this is a minimal fix only. |
||
} | ||
|
||
/** | ||
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the | ||
* specified query in the specified project. | ||
*/ | ||
public static BigQueryTableRowIterator fromQuery( | ||
JobConfigurationQuery queryConfig, String projectId, Bigquery client) { | ||
checkNotNull(queryConfig, "queryConfig"); | ||
checkNotNull(projectId, "projectId"); | ||
checkNotNull(client, "client"); | ||
return new BigQueryTableRowIterator(null, queryConfig, projectId, client); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not refactoring in dataflow 1.x, this is a minimal fix only. |
||
} | ||
|
||
/** | ||
* Opens the table for read. | ||
* @throws IOException on failure | ||
*/ | ||
public void open() throws IOException, InterruptedException { | ||
if (query != null) { | ||
if (queryConfig != null) { | ||
ref = executeQueryAndWaitForCompletion(); | ||
} | ||
// Get table schema. | ||
|
@@ -401,15 +408,17 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce | |
*/ | ||
private TableReference executeQueryAndWaitForCompletion() | ||
throws IOException, InterruptedException { | ||
checkState(projectId != null, "Unable to execute a query without a configured project id"); | ||
checkState(queryConfig != null, "Unable to execute a query without a configured query"); | ||
// Dry run query to get source table location | ||
Job dryRunJob = new Job() | ||
.setConfiguration(new JobConfiguration() | ||
.setQuery(new JobConfigurationQuery() | ||
.setQuery(query)) | ||
.setQuery(queryConfig) | ||
.setDryRun(true)); | ||
JobStatistics jobStats = executeWithBackOff( | ||
client.jobs().insert(projectId, dryRunJob), | ||
String.format("Error when trying to dry run query %s.", query)).getStatistics(); | ||
String.format("Error when trying to dry run query %s.", | ||
queryConfig.toPrettyString())).getStatistics(); | ||
|
||
// Let BigQuery to pick default location if the query does not read any tables. | ||
String location = null; | ||
|
@@ -428,30 +437,27 @@ private TableReference executeQueryAndWaitForCompletion() | |
createDataset(temporaryDatasetId, location); | ||
Job job = new Job(); | ||
JobConfiguration config = new JobConfiguration(); | ||
JobConfigurationQuery queryConfig = new JobConfigurationQuery(); | ||
config.setQuery(queryConfig); | ||
job.setConfiguration(config); | ||
queryConfig.setQuery(query); | ||
queryConfig.setAllowLargeResults(true); | ||
queryConfig.setFlattenResults(flattenResults); | ||
queryConfig.setUseLegacySql(useLegacySql); | ||
|
||
|
||
TableReference destinationTable = new TableReference(); | ||
destinationTable.setProjectId(projectId); | ||
destinationTable.setDatasetId(temporaryDatasetId); | ||
destinationTable.setTableId(temporaryTableId); | ||
queryConfig.setDestinationTable(destinationTable); | ||
queryConfig.setAllowLargeResults(Boolean.TRUE); | ||
|
||
Job queryJob = executeWithBackOff( | ||
client.jobs().insert(projectId, job), | ||
String.format("Error when trying to execute the job for query %s.", query)); | ||
String.format("Error when trying to execute the job for query %s.", | ||
queryConfig.toPrettyString())); | ||
JobReference jobId = queryJob.getJobReference(); | ||
|
||
while (true) { | ||
Job pollJob = executeWithBackOff( | ||
client.jobs().get(projectId, jobId.getJobId()), | ||
String.format("Error when trying to get status of the job for query %s.", query)); | ||
String.format("Error when trying to get status of the job for query %s.", | ||
queryConfig.toPrettyString())); | ||
JobStatus status = pollJob.getStatus(); | ||
if (status.getState().equals("DONE")) { | ||
// Job is DONE, but did not necessarily succeed. | ||
|
@@ -461,7 +467,9 @@ private TableReference executeQueryAndWaitForCompletion() | |
} else { | ||
// There will be no temporary table to delete, so null out the reference. | ||
temporaryTableId = null; | ||
throw new IOException("Executing query " + query + " failed: " + error.getMessage()); | ||
throw new IOException( | ||
String.format("Executing query %s failed: %s", | ||
queryConfig.toPrettyString(), error.getMessage())); | ||
} | ||
} | ||
Uninterruptibles.sleepUninterruptibly( | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that better to have a
public static createBasicQueryConfig(String query, boolean flattenResults, boolean useLegacySql)
and, use it in two places?