|
|
|
|
@@ -24,7 +24,7 @@
|
|
|
|
|
String dataset = ElementParameterParser.getValue(node, "__DATASET__");
|
|
|
|
|
String table = ElementParameterParser.getValue(node, "__TABLE__");
|
|
|
|
|
String gsFile = ElementParameterParser.getValue(node, "__GS_FILE__");
|
|
|
|
|
|
|
|
|
|
String fieldDelimiter = ElementParameterParser.getValue(node, "__FIELD_DELIMITER__");
|
|
|
|
|
String actionOnData = ElementParameterParser.getValue(node, "__ACTION_ON_DATA__");
|
|
|
|
|
boolean dieOnError = "true".equals(ElementParameterParser.getValue(node, "__DIE_ON_ERROR__"));
|
|
|
|
|
|
|
|
|
|
@@ -258,56 +258,80 @@
|
|
|
|
|
|
|
|
|
|
com.google.api.services.bigquery.model.JobConfiguration config_<%=cid%> = new com.google.api.services.bigquery.model.JobConfiguration();
|
|
|
|
|
com.google.api.services.bigquery.model.JobConfigurationLoad queryLoad_<%=cid%> = new com.google.api.services.bigquery.model.JobConfigurationLoad();
|
|
|
|
|
com.google.api.services.bigquery.model.TableSchema schema_<%=cid%> = new com.google.api.services.bigquery.model.TableSchema();
|
|
|
|
|
|
|
|
|
|
<%
|
|
|
|
|
if(isLog4jEnabled){
|
|
|
|
|
%>
|
|
|
|
|
log.info("<%=cid%> - Table field schema:");
|
|
|
|
|
<%
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
java.util.List<com.google.api.services.bigquery.model.TableFieldSchema> fields_<%=cid%> = new java.util.ArrayList<com.google.api.services.bigquery.model.TableFieldSchema>();
|
|
|
|
|
<%
|
|
|
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
|
|
|
if ((metadatas!=null) && (metadatas.size() > 0)) {
|
|
|
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
|
|
|
if (metadata != null) {
|
|
|
|
|
List<IMetadataColumn> columns = metadata.getListColumns();
|
|
|
|
|
int nbColumns = columns.size();
|
|
|
|
|
for (int i = 0; i < nbColumns; i++ ) {
|
|
|
|
|
IMetadataColumn column = columns.get(i);
|
|
|
|
|
String columnName = column.getLabel();
|
|
|
|
|
String typeToGenerate = "string";
|
|
|
|
|
if("id_Float".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "float";
|
|
|
|
|
} else if("id_Integer".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "integer";
|
|
|
|
|
} else if("id_Boolean".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "boolean";
|
|
|
|
|
} else if("id_Date".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "timestamp";
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
com.google.api.services.bigquery.model.TableFieldSchema <%=columnName%>_<%=cid%> = new com.google.api.services.bigquery.model.TableFieldSchema();
|
|
|
|
|
<%=columnName%>_<%=cid%>.setName("<%=columnName%>");
|
|
|
|
|
<%=columnName%>_<%=cid%>.setType("<%=typeToGenerate%>");
|
|
|
|
|
fields_<%=cid%>.add(<%=columnName%>_<%=cid%>);
|
|
|
|
|
<%
|
|
|
|
|
if(isLog4jEnabled){
|
|
|
|
|
%>
|
|
|
|
|
log.debug("<%=cid%> - Field index[<%=i%>] {\"name\":\"<%=columnName%>\",\"type\":\"<%=typeToGenerate%>\"}");
|
|
|
|
|
<%
|
|
|
|
|
|
|
|
|
|
if (<%=ElementParameterParser.getBooleanValue(node, "__CREATE_TABLE_IF_NOT_EXIST__")%>) {
|
|
|
|
|
com.google.api.services.bigquery.model.TableSchema schema_<%=cid%> = new com.google.api.services.bigquery.model.TableSchema();
|
|
|
|
|
|
|
|
|
|
<%
|
|
|
|
|
if(isLog4jEnabled){
|
|
|
|
|
%>
|
|
|
|
|
log.info("<%=cid%> - Table field schema:");
|
|
|
|
|
<%
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
java.util.List<com.google.api.services.bigquery.model.TableFieldSchema> fields_<%=cid%> = new java.util.ArrayList<com.google.api.services.bigquery.model.TableFieldSchema>();
|
|
|
|
|
<%
|
|
|
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
|
|
|
if ((metadatas!=null) && (metadatas.size() > 0)) {
|
|
|
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
|
|
|
if (metadata != null) {
|
|
|
|
|
List<IMetadataColumn> columns = metadata.getListColumns();
|
|
|
|
|
int nbColumns = columns.size();
|
|
|
|
|
for (int i = 0; i < nbColumns; i++ ) {
|
|
|
|
|
IMetadataColumn column = columns.get(i);
|
|
|
|
|
String columnName = column.getLabel();
|
|
|
|
|
String typeToGenerate = "string";
|
|
|
|
|
if("id_Float".equals(column.getTalendType()) || "id_Double".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "float";
|
|
|
|
|
}else if("id_Integer".equals(column.getTalendType()) || "id_Long".equals(column.getTalendType()) || "id_Short".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "integer";
|
|
|
|
|
} else if("id_Character".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "string";
|
|
|
|
|
} else if("id_BigDecimal".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "numeric";
|
|
|
|
|
} else if("id_Boolean".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "boolean";
|
|
|
|
|
} else if("id_Date".equals(column.getTalendType())) {
|
|
|
|
|
String pattern = column.getPattern();
|
|
|
|
|
if(pattern.length() == 12 || pattern.isEmpty() || "\"\"".equals(pattern)) {
|
|
|
|
|
typeToGenerate = "date";
|
|
|
|
|
}else if(pattern.length() > 12){
|
|
|
|
|
typeToGenerate = "timestamp";
|
|
|
|
|
}else{
|
|
|
|
|
typeToGenerate = "string";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
<%
|
|
|
|
|
String modeType = null;
|
|
|
|
|
if (!column.isNullable()) {
|
|
|
|
|
modeType = "REQUIRED";
|
|
|
|
|
} else {
|
|
|
|
|
modeType = "NULLABLE";
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
com.google.api.services.bigquery.model.TableFieldSchema <%=columnName%>_<%=cid%> = new com.google.api.services.bigquery.model.TableFieldSchema();
|
|
|
|
|
<%=columnName%>_<%=cid%>.setName("<%=columnName%>");
|
|
|
|
|
<%=columnName%>_<%=cid%>.setType("<%=typeToGenerate%>");
|
|
|
|
|
<%=columnName%>_<%=cid%>.setMode("<%=modeType%>");
|
|
|
|
|
fields_<%=cid%>.add(<%=columnName%>_<%=cid%>);
|
|
|
|
|
<%
|
|
|
|
|
if(isLog4jEnabled){
|
|
|
|
|
%>
|
|
|
|
|
log.debug("<%=cid%> - Field index[<%=i%>] {\"name\":\"<%=columnName%>\",\"type\":\"<%=typeToGenerate%>\",\"mode\":\"<%=modeType%>\"}");
|
|
|
|
|
<%
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
%>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
schema_<%=cid%>.setFields(fields_<%=cid%>);
|
|
|
|
|
|
|
|
|
|
queryLoad_<%=cid%>.setSchema(schema_<%=cid%>);
|
|
|
|
|
schema_<%=cid%>.setFields(fields_<%=cid%>);
|
|
|
|
|
|
|
|
|
|
queryLoad_<%=cid%>.setSchema(schema_<%=cid%>);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
<%
|
|
|
|
|
if("true".equals(ElementParameterParser.getValue(node, "__CREATE_TABLE_IF_NOT_EXIST__"))) {
|
|
|
|
|
%>
|
|
|
|
|
@@ -321,7 +345,7 @@
|
|
|
|
|
|
|
|
|
|
if("true".equals(ElementParameterParser.getValue(node, "__SET_FIELD_DELIMITER__"))) {
|
|
|
|
|
%>
|
|
|
|
|
queryLoad_<%=cid%>.setFieldDelimiter(<%=ElementParameterParser.getValue(node, "__FIELD_DELIMITER__")%>);
|
|
|
|
|
queryLoad_<%=cid%>.setFieldDelimiter(<%=fieldDelimiter%>);
|
|
|
|
|
<%
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
@@ -336,7 +360,7 @@
|
|
|
|
|
queryLoad_<%=cid%>.setDestinationTable(destinationTable_<%=cid%>);
|
|
|
|
|
queryLoad_<%=cid%>.setSourceUris(java.util.Arrays.asList(<%=ElementParameterParser.getValue(node, "__GS_FILE__")%>));
|
|
|
|
|
queryLoad_<%=cid%>.setSkipLeadingRows(<%=ElementParameterParser.getValue(node, "__GS_FILE_HEADER__")%>);
|
|
|
|
|
|
|
|
|
|
queryLoad_<%=cid%>.setNullMarker("\\N");
|
|
|
|
|
config_<%=cid%>.setLoad(queryLoad_<%=cid%>);
|
|
|
|
|
|
|
|
|
|
job_<%=cid%>.setConfiguration(config_<%=cid%>);
|
|
|
|
|
@@ -375,10 +399,10 @@
|
|
|
|
|
}
|
|
|
|
|
if (jobExec_<%=cid%>.getStatus().getState().equals("RUNNING")
|
|
|
|
|
|| jobExec_<%=cid%>.getStatus().getState().equals("PENDING")) {
|
|
|
|
|
com.google.api.services.bigquery.model.Job pollJob_<%=cid%> = bigqueryclient_<%=cid%>.jobs().get(PROJECT_ID_<%=cid%>,jobExec_<%=cid%>.getJobReference().getJobId()).execute();
|
|
|
|
|
com.google.api.services.bigquery.model.Job pollJob_<%=cid%> = bigqueryclient_<%=cid%>.jobs().get(PROJECT_ID_<%=cid%>,jobExec_<%=cid%>.getJobReference().getJobId()).setLocation(jobExec_<%=cid%>.getJobReference().getLocation()).execute();
|
|
|
|
|
while (pollJob_<%=cid%>.getStatus().getState().equals("RUNNING") || pollJob_<%=cid%>.getStatus().getState().equals("PENDING")) {
|
|
|
|
|
Thread.sleep(1000);
|
|
|
|
|
pollJob_<%=cid%> = bigqueryclient_<%=cid%>.jobs().get(PROJECT_ID_<%=cid%>,jobExec_<%=cid%>.getJobReference().getJobId()).execute();
|
|
|
|
|
pollJob_<%=cid%> = bigqueryclient_<%=cid%>.jobs().get(PROJECT_ID_<%=cid%>,jobExec_<%=cid%>.getJobReference().getJobId()).setLocation(jobExec_<%=cid%>.getJobReference().getLocation()).execute();
|
|
|
|
|
System.out.println(String.format(
|
|
|
|
|
"Waiting on job %s ... Current status: %s", jobExec_<%=cid%>
|
|
|
|
|
.getJobReference().getJobId(), pollJob_<%=cid%>
|
|
|
|
|
@@ -440,6 +464,7 @@
|
|
|
|
|
/* ----START-CREATING-JOB (Cloud API)---- */
|
|
|
|
|
com.google.cloud.bigquery.TableId tableId_<%=cid%> = com.google.cloud.bigquery.TableId.of(<%=projectId%>, <%=dataset%>, <%=table%>);
|
|
|
|
|
com.google.cloud.bigquery.Table table_<%=cid%> = bigquery_<%=cid%>.getTable(tableId_<%=cid%>);
|
|
|
|
|
com.google.cloud.bigquery.LoadJobConfiguration.Builder loadJobBuilder_<%=cid%> = com.google.cloud.bigquery.LoadJobConfiguration.newBuilder(tableId_<%=cid%>, <%=gsFile%>);
|
|
|
|
|
if (<%=ElementParameterParser.getBooleanValue(node, "__DROP__")%> && table_<%=cid%> != null) {
|
|
|
|
|
boolean deleted = bigquery_<%=cid%>.delete(tableId_<%=cid%>);
|
|
|
|
|
if (deleted) {
|
|
|
|
|
@@ -454,9 +479,8 @@
|
|
|
|
|
throw new RuntimeException("Unable to delete table " + tableId_<%=cid%>);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
boolean tableNotExist = table_<%=cid%> == null;
|
|
|
|
|
if (<%=ElementParameterParser.getBooleanValue(node, "__DROP__")%> || tableNotExist) {
|
|
|
|
|
boolean typeSupported = true;
|
|
|
|
|
boolean tableNotExist_<%=cid%> = table_<%=cid%> == null;
|
|
|
|
|
if (<%=ElementParameterParser.getBooleanValue(node, "__DROP__")%> || tableNotExist_<%=cid%>) {
|
|
|
|
|
java.util.List<com.google.cloud.bigquery.Field> fields_<%=cid%> = new java.util.ArrayList<>();
|
|
|
|
|
<%
|
|
|
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
|
|
|
@@ -468,27 +492,37 @@
|
|
|
|
|
for (int i = 0; i < nbColumns; i++ ) {
|
|
|
|
|
IMetadataColumn column = columns.get(i);
|
|
|
|
|
String columnName = column.getLabel();
|
|
|
|
|
String typeToGenerate = null;
|
|
|
|
|
if("id_String".equals(column.getTalendType())) {
|
|
|
|
|
String typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.STRING";
|
|
|
|
|
if("id_String".equals(column.getTalendType()) || "id_Character".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.STRING";
|
|
|
|
|
} else if ("id_Float".equals(column.getTalendType()) || "id_Double".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.FLOAT";
|
|
|
|
|
} else if ("id_Short".equals(column.getTalendType()) || "id_Integer".equals(column.getTalendType()) || "id_Long".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.INTEGER";
|
|
|
|
|
} else if ("id_BigDecimal".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.NUMERIC";
|
|
|
|
|
} else if ("id_Boolean".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.BOOLEAN";
|
|
|
|
|
} else if ("id_Date".equals(column.getTalendType())) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.TIMESTAMP";
|
|
|
|
|
} else {
|
|
|
|
|
%>
|
|
|
|
|
typeSupported = false;
|
|
|
|
|
<%
|
|
|
|
|
} else if ("id_Date".equals(column.getTalendType())) {
|
|
|
|
|
String pattern = column.getPattern();
|
|
|
|
|
if(pattern.length() == 12 || pattern.isEmpty() || "\"\"".equals(pattern)) {
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.DATE";
|
|
|
|
|
}else if(pattern.length() > 12){
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.TIMESTAMP";
|
|
|
|
|
}else{
|
|
|
|
|
typeToGenerate = "com.google.cloud.bigquery.LegacySQLTypeName.STRING";
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String modeType = "NULLABLE";
|
|
|
|
|
if (!column.isNullable()) {
|
|
|
|
|
modeType = "REQUIRED";
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
if (!typeSupported) {
|
|
|
|
|
throw new IllegalArgumentException("unsupported type for column [<%=columnName%>]: " + "<%=column.getTalendType()%>");
|
|
|
|
|
}
|
|
|
|
|
com.google.cloud.bigquery.Field field_<%=i%> = com.google.cloud.bigquery.Field.of("<%=columnName%>", <%=typeToGenerate%>);
|
|
|
|
|
|
|
|
|
|
com.google.cloud.bigquery.Field field_<%=i%> = com.google.cloud.bigquery.Field.newBuilder("<%=columnName%>", <%=typeToGenerate%>)
|
|
|
|
|
.setMode(com.google.cloud.bigquery.Field.Mode.valueOf("<%=modeType%>"))
|
|
|
|
|
.build();
|
|
|
|
|
fields_<%=cid%>.add(field_<%=i%>);
|
|
|
|
|
<%
|
|
|
|
|
if(isLog4jEnabled){
|
|
|
|
|
@@ -500,11 +534,24 @@
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
|
|
|
|
|
com.google.cloud.bigquery.Schema schema_<%=cid%> = com.google.cloud.bigquery.Schema.of(fields_<%=cid%>);
|
|
|
|
|
com.google.cloud.bigquery.TableInfo tableInfo_<%=cid%> = com.google.cloud.bigquery.TableInfo.newBuilder(tableId_<%=cid%>, com.google.cloud.bigquery.StandardTableDefinition.of(schema_<%=cid%>)).build();
|
|
|
|
|
table_<%=cid%> = bigquery_<%=cid%>.create(tableInfo_<%=cid%>);
|
|
|
|
|
loadJobBuilder_<%=cid%>.setSchema(schema_<%=cid%>);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
com.google.cloud.bigquery.Job job_<%=cid%> = table_<%=cid%>.load(com.google.cloud.bigquery.FormatOptions.csv(), <%=gsFile%>);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<%if("true".equals(ElementParameterParser.getValue(node, "__SET_FIELD_DELIMITER__"))) {
|
|
|
|
|
%>
|
|
|
|
|
loadJobBuilder_<%=cid%>.setFormatOptions(com.google.cloud.bigquery.CsvOptions.newBuilder().setFieldDelimiter(<%=fieldDelimiter%>).build());
|
|
|
|
|
<%
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
|
|
|
|
|
loadJobBuilder_<%=cid%>.setNullMarker("\\N");
|
|
|
|
|
com.google.cloud.bigquery.Job job_<%=cid%> = bigquery_<%=cid%>.create(com.google.cloud.bigquery.JobInfo.of(loadJobBuilder_<%=cid%>.build()));
|
|
|
|
|
job_<%=cid%> = job_<%=cid%>.waitFor(com.google.cloud.RetryOption.initialRetryDelay(org.threeten.bp.Duration.ofSeconds(1)), com.google.cloud.RetryOption.totalTimeout(org.threeten.bp.Duration.ofSeconds(30)));
|
|
|
|
|
if (job_<%=cid%> != null && job_<%=cid%>.getStatus().getError() == null) {
|
|
|
|
|
<%
|
|
|
|
|
@@ -523,4 +570,4 @@
|
|
|
|
|
} else {
|
|
|
|
|
throw new IllegalArgumentException("authentication mode should be either \"SERVICEACCOUNT\" or \"OAUTH\", but it is " + authMode);
|
|
|
|
|
}
|
|
|
|
|
%>
|
|
|
|
|
%>
|
|
|
|
|
|