* TDI-44342 : Ignore microseconds in BigQuery datetime * TDI-44342 : Ignore microseconds in BigQuery datetime Fixed for BigQuerySQLRow component
224 lines
8.8 KiB
Plaintext
224 lines
8.8 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.process.IConnectionCategory
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.metadata.types.JavaType
|
|
java.util.List
|
|
"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/Log4jFileUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
|
|
String cid = node.getUniqueName();
|
|
|
|
String authMode = ElementParameterParser.getValue(node,"__AUTH_MODE__");
|
|
String query = ElementParameterParser.getValue(node,"__QUERY__");
|
|
boolean useLegacySql = ElementParameterParser.getBooleanValue(node,"__USE_LEGACY_SQL__");
|
|
|
|
query = query.replaceAll("\n"," ").replaceAll("\r"," ");
|
|
|
|
boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__"));
|
|
if (authMode.equals("OAUTH")) {
|
|
String resultSizeType = ElementParameterParser.getValue(node,"__RESULT_SIZE__");
|
|
%>
|
|
|
|
// Start a Query Job
|
|
String querySql_<%=cid %> = <%=query %>;
|
|
System.out.format("Running Query : %s\n", querySql_<%=cid %>);
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.debug("<%=cid%> - Running Query: "+querySql_<%=cid %>);
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Try <%="LARGE".equals(resultSizeType) ? "with" : "without"%> allow large results flag");
|
|
<%
|
|
}
|
|
%>
|
|
com.google.api.services.bigquery.model.Job insert_<%=cid %> = bigQueryUtil_<%=cid%>.executeQuery(querySql_<%=cid%>, <%="LARGE".equals(resultSizeType) ? true : false%>, <%=useLegacySql%>);
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Executing query.");
|
|
<%
|
|
}
|
|
%>
|
|
String pageToken_<%=cid%> = null;
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
com.google.cloud.bigquery.QueryJobConfiguration queryConfiguration_<%=cid%> = com.google.cloud.bigquery.QueryJobConfiguration.newBuilder(<%=query%>).setUseLegacySql(<%=useLegacySql%>).build();
|
|
com.google.cloud.bigquery.JobId jobId_<%=cid%> = com.google.cloud.bigquery.JobId.of(java.util.UUID.randomUUID().toString());
|
|
com.google.cloud.bigquery.Job job_<%=cid%> = bigquery_<%=cid%>.create(com.google.cloud.bigquery.JobInfo.newBuilder(queryConfiguration_<%=cid%>).setJobId(jobId_<%=cid%>).build());
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Sending job " + jobId_<%=cid%> + " with query: " + <%=query%>);
|
|
<%
|
|
}
|
|
%>
|
|
job_<%=cid%> = job_<%=cid%>.waitFor();
|
|
|
|
if (job_<%=cid%> == null) {
|
|
throw new RuntimeException("Job no longer exists");
|
|
} else if (job_<%=cid%>.getStatus().getError() != null) {
|
|
throw new RuntimeException(job_<%=cid%>.getStatus().getError().toString());
|
|
}
|
|
|
|
<%
|
|
if(isLog4jEnabled){
|
|
%>
|
|
log.info("<%=cid%> - Job " + jobId_<%=cid%> + " finished successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\" or \"OAUTH\", but it is " + authMode);
|
|
}
|
|
|
|
List< ? extends IConnection> conns = node.getOutgoingSortedConnections();
|
|
if (conns != null){
|
|
if (conns.size()>0){
|
|
IConnection conn =conns.get(0);
|
|
String connName = conn.getName();
|
|
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
|
|
|
|
if (authMode.equals("OAUTH")) {
|
|
%>
|
|
while (true) {
|
|
// Fetch Results
|
|
com.google.api.services.bigquery.model.TableDataList dataList_<%=cid %> = bigqueryclient_<%=cid%>.tabledata()
|
|
.list(PROJECT_ID_<%=cid %>,
|
|
insert_<%=cid %>.getConfiguration().getQuery()
|
|
.getDestinationTable().getDatasetId(),
|
|
insert_<%=cid %>.getConfiguration().getQuery()
|
|
.getDestinationTable().getTableId())
|
|
.setPageToken(pageToken_<%=cid%>).execute();
|
|
|
|
List<com.google.api.services.bigquery.model.TableRow> rows_<%=cid %> = dataList_<%=cid %>.getRows();
|
|
|
|
if(rows_<%=cid %> == null) {
|
|
// Means there is no record.
|
|
rows_<%=cid %> = new java.util.ArrayList<com.google.api.services.bigquery.model.TableRow>();
|
|
}
|
|
|
|
for (com.google.api.services.bigquery.model.TableRow row_<%=cid %> : rows_<%=cid %>) {
|
|
java.util.List<com.google.api.services.bigquery.model.TableCell> field_<%=cid %> = row_<%=cid %>.getF();
|
|
Object value_<%=cid%> = null;
|
|
nb_line_<%=cid%> ++;
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
com.google.cloud.bigquery.TableResult result_<%=cid%> = job_<%=cid%>.getQueryResults();
|
|
|
|
long nb_line_<%=cid%> = 0;
|
|
|
|
for (com.google.cloud.bigquery.FieldValueList field_<%=cid %> : result_<%=cid%>.iterateAll()) {
|
|
Object value_<%=cid%>;
|
|
nb_line_<%=cid%> ++;
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\" or \"OAUTH\", but it is " + authMode);
|
|
}
|
|
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
if ((metadatas!=null) && (metadatas.size() > 0)) {
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
if (metadata != null) {
|
|
String encoding = ElementParameterParser.getValue(node,"__ENCODING__");
|
|
String advancedSeparatorStr = ElementParameterParser.getValue(node, "__ADVANCED_SEPARATOR__");
|
|
boolean advancedSeparator = (advancedSeparatorStr!=null&&!("").equals(advancedSeparatorStr))?("true").equals(advancedSeparatorStr):false;
|
|
String thousandsSeparator = ElementParameterParser.getValueWithJavaType(node, "__THOUSANDS_SEPARATOR__", JavaTypesManager.CHARACTER);
|
|
String decimalSeparator = ElementParameterParser.getValueWithJavaType(node, "__DECIMAL_SEPARATOR__", JavaTypesManager.CHARACTER);
|
|
|
|
List<IMetadataColumn> columns = metadata.getListColumns();
|
|
int nbColumns = columns.size();
|
|
for (int i = 0; i < nbColumns; i++ ) {
|
|
IMetadataColumn column = columns.get(i);
|
|
String columnName = column.getLabel();
|
|
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
|
|
JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
|
|
String patternValue = column.getPattern() == null || column.getPattern().trim().length() == 0 ? null : column.getPattern();
|
|
if (authMode.equals("OAUTH")) {
|
|
%>
|
|
value_<%=cid%> = field_<%=cid %>.get(<%=i%>).getV();
|
|
<%
|
|
} else if (authMode.equals("SERVICEACCOUNT")) {
|
|
%>
|
|
value_<%=cid%> = field_<%=cid %>.get(<%=i%>).getValue();
|
|
<%
|
|
} else {
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\" or \"OAUTH\", but it is " + authMode);
|
|
}
|
|
%>
|
|
if(com.google.api.client.util.Data.isNull(value_<%=cid%>)) value_<%=cid%> = null;
|
|
if(value_<%=cid%> != null){
|
|
|
|
<%
|
|
if (javaType == JavaTypesManager.STRING) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>.toString();
|
|
<%
|
|
} else if (javaType == JavaTypesManager.OBJECT) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>;
|
|
<%
|
|
} else if(javaType == JavaTypesManager.DATE) {
|
|
%>
|
|
if (value_<%=cid%>.toString().contains("-")) {
|
|
String sValue_<%=cid%> = value_<%=cid%>.toString();
|
|
if (sValue_<%=cid%>.matches(".*\\.\\d{6}")) {
|
|
// microseconds must be ignored
|
|
sValue_<%=cid%> = sValue_<%=cid%>.substring(0, sValue_<%=cid%>.length() - 3);
|
|
}
|
|
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_Date(sValue_<%=cid%>,<%=patternValue%>);
|
|
} else{
|
|
<%=connName%>.<%=columnName%>=ParserUtils.parseTo_Date(value_<%=cid%>.toString());
|
|
}
|
|
<%
|
|
} else if(advancedSeparator && JavaTypesManager.isNumberType(javaType)) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_<%= typeToGenerate %>(ParserUtils.parseTo_Number(value_<%=cid%>.toString(), <%= thousandsSeparator %>, <%= decimalSeparator %>));
|
|
<%
|
|
} else if(javaType == JavaTypesManager.BYTE_ARRAY) {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = value_<%=cid%>.toString().getBytes(<%=encoding %>);
|
|
<%
|
|
} else {
|
|
%>
|
|
<%=connName%>.<%=columnName%> = ParserUtils.parseTo_<%= typeToGenerate %>(value_<%=cid%>.toString());
|
|
<%
|
|
}
|
|
%>
|
|
}else{
|
|
<%=connName%>.<%=columnName%> = <%=JavaTypesManager.getDefaultValueFromJavaType(typeToGenerate, column.getDefault())%>;
|
|
}
|
|
<%
|
|
}
|
|
log4jFileUtil.debugRetriveData(node);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%> |