176 lines
6.8 KiB
Plaintext
176 lines
6.8 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
java.util.List
|
|
"
|
|
%>
|
|
|
|
class BigQueryUtil_<%=cid%> {
|
|
String projectId;
|
|
com.google.api.services.bigquery.Bigquery bigqueryclient = null;
|
|
String tokenFile;
|
|
boolean useLargeResult = false;
|
|
String tempDataset;
|
|
String tempTable;
|
|
|
|
public BigQueryUtil_<%=cid%>(String projectId, com.google.api.services.bigquery.Bigquery bigqueryclient, String tokenFile) {
|
|
this.projectId = projectId;
|
|
this.bigqueryclient = bigqueryclient;
|
|
this.tokenFile = tokenFile;
|
|
}
|
|
|
|
private String genTempName(String prefix){
|
|
return "temp_" + prefix + java.util.UUID.randomUUID().toString().replaceAll("-", "") + "<%=cid%>".toLowerCase().replaceAll("[^a-z0-9]", "0").replaceAll("^[^a-z]", "a") + Integer.toHexString(java.util.concurrent.ThreadLocalRandom.current().nextInt());
|
|
}
|
|
|
|
public void cleanup() throws Exception{
|
|
if(useLargeResult){
|
|
bigqueryclient.tables().delete(projectId, tempDataset, tempTable).execute();
|
|
bigqueryclient.datasets().delete(projectId, tempDataset).execute();
|
|
}
|
|
}
|
|
|
|
private String getLocation(com.google.api.services.bigquery.model.JobConfigurationQuery queryConfig) throws Exception {
|
|
String location = null;
|
|
com.google.api.services.bigquery.model.JobConfiguration config = new com.google.api.services.bigquery.model.JobConfiguration();
|
|
config.setQuery(queryConfig);
|
|
config.setDryRun(true);
|
|
com.google.api.services.bigquery.model.Job job = new com.google.api.services.bigquery.model.Job();
|
|
job.setConfiguration(config);
|
|
List<com.google.api.services.bigquery.model.TableReference> referencedTables = bigqueryclient.jobs().insert(projectId, job).execute().getStatistics().getQuery().getReferencedTables();
|
|
if(referencedTables != null && !referencedTables.isEmpty()) {
|
|
location = bigqueryclient.tables().get(projectId, referencedTables.get(0).getDatasetId(), referencedTables.get(0).getTableId()).execute().getLocation();
|
|
}
|
|
return location;
|
|
}
|
|
|
|
private void createDataset(String location) throws Exception {
|
|
com.google.api.services.bigquery.model.Dataset dataset = new com.google.api.services.bigquery.model.Dataset().setDatasetReference(new com.google.api.services.bigquery.model.DatasetReference().setProjectId(projectId).setDatasetId(tempDataset));
|
|
if(location != null) {
|
|
dataset.setLocation(location);
|
|
}
|
|
String description = "Dataset for BigQuery query job temporary table";
|
|
dataset.setFriendlyName(description);
|
|
dataset.setDescription(description);
|
|
bigqueryclient.datasets().insert(projectId, dataset).execute();
|
|
}
|
|
|
|
public com.google.api.services.bigquery.model.Job executeQuery(String query, boolean useLargeResult) throws Exception{
|
|
return executeQuery(query, useLargeResult, true);
|
|
}
|
|
|
|
public com.google.api.services.bigquery.model.Job executeQuery(String query, boolean useLargeResult, boolean useLegacySql) throws Exception{
|
|
com.google.api.services.bigquery.model.JobConfigurationQuery queryConfig = new com.google.api.services.bigquery.model.JobConfigurationQuery();
|
|
queryConfig.setQuery(query);
|
|
queryConfig.setUseLegacySql(useLegacySql);
|
|
String location = getLocation(queryConfig);
|
|
if(useLargeResult){
|
|
this.useLargeResult = true;
|
|
tempDataset = genTempName("dataset");
|
|
tempTable = genTempName("table");
|
|
createDataset(location);
|
|
queryConfig.setAllowLargeResults(true);
|
|
queryConfig.setDestinationTable(new com.google.api.services.bigquery.model.TableReference()
|
|
.setProjectId(projectId)
|
|
.setDatasetId(tempDataset)
|
|
.setTableId(tempTable));
|
|
}
|
|
|
|
com.google.api.services.bigquery.model.JobConfiguration config = new com.google.api.services.bigquery.model.JobConfiguration();
|
|
config.setQuery(queryConfig);
|
|
|
|
com.google.api.services.bigquery.model.Job job = new com.google.api.services.bigquery.model.Job();
|
|
job.setConfiguration(config);
|
|
|
|
com.google.api.services.bigquery.model.Job insert = null;
|
|
com.google.api.services.bigquery.model.JobReference jobId = null;
|
|
try {
|
|
insert = bigqueryclient.jobs().insert(projectId, job).execute();
|
|
jobId = insert.getJobReference();
|
|
} catch (com.google.api.client.googleapis.json.GoogleJsonResponseException e) {
|
|
if(tokenFile != null){
|
|
try {
|
|
java.io.File f = new java.io.File(tokenFile);
|
|
boolean isRemoved = f.delete();
|
|
if(isRemoved){
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.error("<%=cid%> - Unable to connect. This might come from the token expiration. Execute again the job with an empty authorization code.");
|
|
<%
|
|
}else{
|
|
%>
|
|
System.err.println("---> Unable to connect. This might come from the token expiration. Execute again the job with an empty authorization code.");
|
|
<%
|
|
}
|
|
%>
|
|
}else{
|
|
throw new java.lang.Exception();
|
|
}
|
|
} catch (java.lang.Exception ee) {
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.error("<%=cid%> - Unable to connect. This might come from the token expiration. Remove the file " + tokenFile + " Execute again the job with an empty authorization code.");
|
|
<%
|
|
}else{
|
|
%>
|
|
System.err.println("---> Unable to connect. This might come from the token expiration. Remove the file " + tokenFile + " Execute again the job with an empty authorization code.");
|
|
<%
|
|
}
|
|
%>
|
|
}
|
|
}
|
|
throw e;
|
|
}
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Wait for query execution");
|
|
<%
|
|
}
|
|
%>
|
|
// wait for query execution
|
|
while (true) {
|
|
com.google.api.services.bigquery.model.Job pollJob = bigqueryclient.jobs().get(projectId, jobId.getJobId()).setLocation(location).execute();
|
|
com.google.api.services.bigquery.model.JobStatus status = pollJob.getStatus();
|
|
if (status.getState().equals("DONE")) {
|
|
com.google.api.services.bigquery.model.ErrorProto errorProto = status.getErrorResult();
|
|
if(errorProto != null){// job failed, handle it
|
|
<%if("AUTO".equals(resultSizeType)){%>
|
|
if(!useLargeResult && "responseTooLarge".equals(errorProto.getReason())){// try with large result flag
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Try with allow large results flag");
|
|
<%
|
|
}
|
|
%>
|
|
return executeQuery(query, true);
|
|
}
|
|
<%}%>
|
|
// Do not throw exception to avoid behavior changed(because it may throw "duplicate" exception which do not throw before);
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.error("<%=cid%> - Reason: " + errorProto.getReason() + "\nMessage: " + errorProto.getMessage());
|
|
<%
|
|
}else{
|
|
%>
|
|
System.err.println("---> Reason: " + errorProto.getReason() + "\nMessage: " + errorProto.getMessage());
|
|
<%
|
|
}
|
|
%>
|
|
}// else job successful
|
|
break;
|
|
}
|
|
// Pause execution for one second before polling job status again, to
|
|
// reduce unnecessary calls to the BigQUery API and lower overall
|
|
// application bandwidth.
|
|
Thread.sleep(1000);
|
|
}
|
|
|
|
return insert;
|
|
}
|
|
|
|
} |