* feat(TDI-45914): implement feature for Input component * feat(TDI-45914): implement feature for SQLRow component * feat(TDI-45914): implement feature for BulkExec component * feat(TDI-45914): remove die on error, put error message * feat(TDI-45914): update bulkexec with gson, reuse version * feat(TDI-45914): fix SQLRow as standalone component statistic return * feat(TDI-45914): improve logging code pieces * feat(TDI-45914): remove extra whitespaces * feat(TDI-45914): fix gson object missing cid * feat(TDI-45914): fix gson object missing cid for BulkExec * feat(TDI-45914): add return values for tBigqueryOutput * feat(TDI-45914): fix BulkExec and Output statistics, remove virtual cid part
281 lines
14 KiB
Plaintext
281 lines
14 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.process.IConnectionCategory
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.metadata.types.JavaType
|
|
java.util.List
|
|
"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/Log4jFileUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
|
|
String cid = node.getUniqueName();
|
|
|
|
String authMode = ElementParameterParser.getValue(node,"__AUTH_MODE__");
|
|
String query = ElementParameterParser.getValue(node,"__QUERY__");
|
|
boolean useLegacySql = ElementParameterParser.getBooleanValue(node,"__USE_LEGACY_SQL__");
|
|
query = query.replaceAll("\n"," ").replaceAll("\r"," ");
|
|
|
|
boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__"));
|
|
if (authMode.equals("OAUTH") || authMode.equals("TOKEN")) {
|
|
String resultSizeType = ElementParameterParser.getValue(node,"__RESULT_SIZE__");
|
|
%>
|
|
|
|
// Start a Query Job
|
|
String querySql_<%=cid %> = <%=query %>;
|
|
System.out.format("Running Query : %s\n", querySql_<%=cid %>);
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.debug("<%=cid%> - Running Query: "+querySql_<%=cid %>);
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Try <%="LARGE".equals(resultSizeType) ? "with" : "without"%> allow large results flag");
|
|
<%
|
|
}
|
|
if(authMode.equals("TOKEN") && !ElementParameterParser.canEncrypt(node, "__ACCESS_TOKEN__")) { %>
|
|
credential_<%=cid%>.setAccessToken(<%= ElementParameterParser.getValue(node, "__ACCESS_TOKEN__")%>);
|
|
<% } %>
|
|
try {
|
|
com.google.api.services.bigquery.model.Job insert_<%=cid %> = bigQueryUtil_<%=cid%>.executeQuery(querySql_<%=cid%>, <%="LARGE".equals(resultSizeType) ? true : false%>, <%=useLegacySql%>);
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Executing query.");
|
|
<%
|
|
}
|
|
%>
|
|
String pageToken_<%=cid%> = null;
|
|
if(insert_<%=cid%>.getStatus().getErrorResult() != null ) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE", insert_<%=cid%>.getStatus().getErrorResult().getMessage());
|
|
String message_<%=cid%> = "<%=cid%> - " + insert_<%=cid%>.getStatus().getErrorResult().getMessage();
|
|
<%
|
|
log4jFileUtil.logErrorMessage();
|
|
|
|
// Else job has finished successfully with the results. Start {.
|
|
%>
|
|
} else {
|
|
java.util.List<com.google.api.services.bigquery.model.Job> insert_jobs_<%=cid%>;
|
|
if (insert_<%=cid %>.getStatistics().getNumChildJobs() != null) {
|
|
insert_jobs_<%=cid%> = bigQueryUtil_<%=cid%>.listChildJobs(insert_<%=cid %>.getJobReference().getJobId());
|
|
} else {
|
|
insert_jobs_<%=cid%> = java.util.Collections.singletonList(insert_<%=cid%>);
|
|
}
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
com.google.cloud.bigquery.QueryJobConfiguration queryConfiguration_<%=cid%> = com.google.cloud.bigquery.QueryJobConfiguration.newBuilder(<%=query%>).setUseLegacySql(<%=useLegacySql%>).build();
|
|
com.google.cloud.bigquery.JobId jobId_<%=cid%> = com.google.cloud.bigquery.JobId.of(java.util.UUID.randomUUID().toString());
|
|
com.google.cloud.bigquery.Job job_<%=cid%> = bigquery_<%=cid%>.create(com.google.cloud.bigquery.JobInfo.newBuilder(queryConfiguration_<%=cid%>).setJobId(jobId_<%=cid%>).build());
|
|
globalMap.put("<%=cid%>_JOBID", jobId_<%=cid%>.getJob());
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Sending job " + jobId_<%=cid%> + " with query: " + <%=query%>);
|
|
<%
|
|
}
|
|
%>
|
|
try {
|
|
job_<%=cid%> = job_<%=cid%>.waitFor();
|
|
|
|
if (job_<%=cid%> == null) {
|
|
String message_<%=cid%> = "<%=cid%> - Job no longer exists";
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE", message_<%=cid%>);
|
|
<%
|
|
log4jFileUtil.logErrorMessage();
|
|
%>
|
|
} else if (job_<%=cid%>.getStatus().getError() != null) {
|
|
com.google.gson.Gson gsonObject_<%=cid%> = new com.google.gson.Gson();
|
|
globalMap.put("<%=cid%>_STATISTICS", gsonObject_<%=cid%>.toJson(job_<%=cid%>.getStatistics()));
|
|
String message_<%=cid%> = "<%=cid%> - " + job_<%=cid%>.getStatus().getError().toString();
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE", job_<%=cid%>.getStatus().getError().toString());
|
|
<%
|
|
log4jFileUtil.logErrorMessage();
|
|
|
|
// Else job has finished successfully with the results. Start {.
|
|
%>
|
|
} else {
|
|
com.google.gson.Gson gsonObject_<%=cid%> = new com.google.gson.Gson();
|
|
globalMap.put("<%=cid%>_STATISTICS", gsonObject_<%=cid%>.toJson(job_<%=cid%>.getStatistics()));
|
|
java.util.List<com.google.cloud.bigquery.Job> childJobs_<%=cid%>;
|
|
if (job_<%=cid%>.getStatistics().getNumChildJobs() != null) {
|
|
childJobs_<%=cid%> = java.util.Optional.ofNullable(bigquery_<%=cid%>.listJobs(com.google.cloud.bigquery.BigQuery.JobListOption.parentJobId(jobId_<%=cid%>.getJob())))
|
|
.map(com.google.api.gax.paging.Page::getValues)
|
|
.flatMap(iterable_<%=cid%> ->
|
|
java.util.Optional.ofNullable(java.util.stream.StreamSupport
|
|
.stream(iterable_<%=cid%>.spliterator(), false)
|
|
.collect(java.util.stream.Collectors.toList())))
|
|
.orElse(java.util.Collections.emptyList());
|
|
java.util.Collections.reverse(childJobs_<%=cid%>);
|
|
} else {
|
|
childJobs_<%=cid%> = java.util.Collections.singletonList(job_<%=cid%>);
|
|
}
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Job " + jobId_<%=cid%> + " finished successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\", \"OAUTH\" or \"TOKEN\", but it is " + authMode);
|
|
}
|
|
|
|
List< ? extends IConnection> conns = node.getOutgoingSortedConnections();
|
|
if (conns != null){
|
|
if (conns.size()>0){
|
|
IConnection conn =conns.get(0);
|
|
String connName = conn.getName();
|
|
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
|
|
|
|
if (authMode.equals("OAUTH") || authMode.equals("TOKEN")) {
|
|
%>
|
|
for (com.google.api.services.bigquery.model.Job insert_elem_<%=cid %> : insert_jobs_<%=cid%>) {
|
|
insert_<%=cid %> = insert_elem_<%=cid %>;
|
|
if (insert_<%=cid %>.getStatus().getErrorResult() != null) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE", insert_<%=cid %>.getStatus().getErrorResult());
|
|
String message_<%=cid%> = "<%=cid%> - " + "Query has error Result, skipping its ResultSet." + insert_<%=cid %>.getStatus().getErrorResult();
|
|
<%
|
|
log4jFileUtil.logErrorMessage();
|
|
%>
|
|
continue;
|
|
}
|
|
while (true) {
|
|
// Fetch Results
|
|
com.google.api.services.bigquery.model.GetQueryResultsResponse queryResultsResponse_<%=cid %> = bigqueryclient_<%=cid%>.jobs()
|
|
.getQueryResults(PROJECT_ID_<%=cid %>, insert_<%=cid %>.getJobReference().getJobId())
|
|
.setPageToken(pageToken_<%=cid%>)
|
|
.execute();
|
|
|
|
List<com.google.api.services.bigquery.model.TableRow> rows_<%=cid %> = queryResultsResponse_<%=cid %>.getRows();
|
|
|
|
if(rows_<%=cid %> == null) {
|
|
// Means there is no record.
|
|
rows_<%=cid %> = new java.util.ArrayList<com.google.api.services.bigquery.model.TableRow>();
|
|
}
|
|
|
|
for (com.google.api.services.bigquery.model.TableRow row_<%=cid %> : rows_<%=cid %>) {
|
|
java.util.List<com.google.api.services.bigquery.model.TableCell> field_<%=cid %> = row_<%=cid %>.getF();
|
|
Object value_<%=cid%> = null;
|
|
nb_line_<%=cid%> ++;
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
long nb_line_<%=cid%> = 0;
|
|
for (com.google.cloud.bigquery.Job job_iterable_<%=cid%> : childJobs_<%=cid%>) {
|
|
if (job_iterable_<%=cid%>.getStatus().getError() != null) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE", job_iterable_<%=cid%>.getStatus().getError().toString());
|
|
String message_<%=cid%> = "<%=cid%> - " + job_iterable_<%=cid%>.getStatus().getError().toString();
|
|
<%
|
|
log4jFileUtil.logErrorMessage();
|
|
%>
|
|
continue;
|
|
}
|
|
com.google.cloud.bigquery.TableResult result_<%=cid%> = job_iterable_<%=cid%>.getQueryResults();
|
|
for (com.google.cloud.bigquery.FieldValueList field_<%=cid %> : result_<%=cid%>.iterateAll()) {
|
|
Object value_<%=cid%>;
|
|
nb_line_<%=cid%> ++;
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\", \"OAUTH\" or \"TOKEN\", but it is " + authMode);
|
|
}
|
|
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
if ((metadatas!=null) && (metadatas.size() > 0)) {
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
if (metadata != null) {
|
|
String encoding = ElementParameterParser.getValue(node,"__ENCODING__");
|
|
String advancedSeparatorStr = ElementParameterParser.getValue(node, "__ADVANCED_SEPARATOR__");
|
|
boolean advancedSeparator = (advancedSeparatorStr!=null&&!("").equals(advancedSeparatorStr))?("true").equals(advancedSeparatorStr):false;
|
|
String thousandsSeparator = ElementParameterParser.getValueWithJavaType(node, "__THOUSANDS_SEPARATOR__", JavaTypesManager.CHARACTER);
|
|
String decimalSeparator = ElementParameterParser.getValueWithJavaType(node, "__DECIMAL_SEPARATOR__", JavaTypesManager.CHARACTER);
|
|
|
|
List<IMetadataColumn> columns = metadata.getListColumns();
|
|
int nbColumns = columns.size();
|
|
for (int i = 0; i < nbColumns; i++ ) {
|
|
IMetadataColumn column = columns.get(i);
|
|
String columnName = column.getLabel();
|
|
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
|
|
JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
|
|
String patternValue = column.getPattern() == null || column.getPattern().trim().length() == 0 ? null : column.getPattern();
|
|
if (authMode.equals("OAUTH") || authMode.equals("TOKEN")) {
|
|
%>
|
|
value_<%=cid%> = field_<%=cid %>.get(<%=i%>).getV();
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
value_<%=cid%> = field_<%=cid %>.get(<%=i%>).getValue();
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\", \"OAUTH\" or \"TOKEN\", but it is " + authMode);
|
|
}
|
|
%>
|
|
if(com.google.api.client.util.Data.isNull(value_<%=cid%>)) value_<%=cid%> = null;
|
|
if(value_<%=cid%> != null){
|
|
|
|
<%
|
|
if (javaType == JavaTypesManager.STRING) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>.toString();
|
|
<%
|
|
} else if (javaType == JavaTypesManager.OBJECT) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>;
|
|
<%
|
|
} else if(javaType == JavaTypesManager.DATE) {
|
|
%>
|
|
if (value_<%=cid%>.toString().contains("-")) {
|
|
String sValue_<%=cid%> = value_<%=cid%>.toString();
|
|
if (sValue_<%=cid%>.matches(".*\\.\\d{6}")) {
|
|
// microseconds must be ignored
|
|
sValue_<%=cid%> = sValue_<%=cid%>.substring(0, sValue_<%=cid%>.length() - 3);
|
|
}
|
|
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_Date(sValue_<%=cid%>,<%=patternValue%>);
|
|
} else{
|
|
<%=connName%>.<%=columnName%>=ParserUtils.parseTo_Date(value_<%=cid%>.toString());
|
|
}
|
|
<%
|
|
} else if(advancedSeparator && JavaTypesManager.isNumberType(javaType)) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_<%= typeToGenerate %>(ParserUtils.parseTo_Number(value_<%=cid%>.toString(), <%= thousandsSeparator %>, <%= decimalSeparator %>));
|
|
<%
|
|
} else if(javaType == JavaTypesManager.BYTE_ARRAY) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>.toString().getBytes(<%=encoding %>);
|
|
<%
|
|
} else {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_<%= typeToGenerate %>(value_<%=cid%>.toString());
|
|
<%
|
|
}
|
|
%>
|
|
}else{
|
|
<%=connName%>.<%=columnName%> = <%=JavaTypesManager.getDefaultValueFromJavaType(typeToGenerate, column.getDefault())%>;
|
|
}
|
|
<%
|
|
}
|
|
log4jFileUtil.debugRetriveData(node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|