390 lines
16 KiB
Plaintext
390 lines
16 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.MappingTypeRetriever
|
|
org.talend.core.model.metadata.MetadataTalendType
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.utils.NodeUtil
|
|
java.util.List
|
|
java.util.ArrayList
|
|
java.util.Map
|
|
java.util.HashMap
|
|
"
|
|
skeleton="../templates/db_output_bulk.skeleton"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/DBLogUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
String cid = node.getUniqueName();
|
|
|
|
dbLog = new DBLogUtil(node);
|
|
|
|
List<Map<String, String>> addCols =
|
|
(List<Map<String,String>>)ElementParameterParser.getObjectValue(
|
|
node,"__ADD_COLS__" );
|
|
|
|
boolean useFieldOptions = ("true").equals(ElementParameterParser.getValue(node, "__USE_FIELD_OPTIONS__"));
|
|
|
|
List<Map<String, String>> fieldOptions = (List<Map<String,String>>)ElementParameterParser.getObjectValue(node, "__FIELD_OPTIONS__");
|
|
|
|
String dbname= ElementParameterParser.getValue(node, "__DBNAME__");
|
|
|
|
String dbproperties = ElementParameterParser.getValue(node, "__PROPERTIES__");
|
|
|
|
String dbhost = ElementParameterParser.getValue(node, "__HOST__");
|
|
|
|
String dbport = ElementParameterParser.getValue(node, "__PORT__");
|
|
|
|
String dbschema = ElementParameterParser.getValue(node, "__SCHEMA_DB__");
|
|
|
|
String dbuser= ElementParameterParser.getValue(node, "__USER__");
|
|
|
|
String table = ElementParameterParser.getValue(node,"__TABLE__");
|
|
|
|
String dbmsId = ElementParameterParser.getValue(node,"__MAPPING__");
|
|
|
|
String dataAction = ElementParameterParser.getValue(node,"__DATA_ACTION__");
|
|
|
|
String tableAction = ElementParameterParser.getValue(node,"__TABLE_ACTION__");
|
|
|
|
String commitEvery = ElementParameterParser.getValue(node, "__COMMIT_EVERY__");
|
|
|
|
String useExistingConn = ElementParameterParser.getValue(node,"__USE_EXISTING_CONNECTION__");
|
|
|
|
boolean isEnableDebug = ("true").equals(ElementParameterParser.getValue(node,"__ENABLE_DEBUG_MODE__"));
|
|
|
|
boolean useSpatialOptions = ("true").equals(ElementParameterParser.getValue(node,"__USE_SPATIAL_OPTIONS__"));
|
|
|
|
boolean createSpatialIndex = ("true").equals(ElementParameterParser.getValue(node,"__SPATIAL_INDEX__"));
|
|
|
|
boolean createGeometryColumns = ("true").equals(ElementParameterParser.getValue(node,"__GEOMETRY_COLUMNS__"));
|
|
|
|
boolean useBatchSize = ("true").equals(ElementParameterParser.getValue(node,"__USE_BATCH_SIZE__"));
|
|
String batchSize=ElementParameterParser.getValue(node,"__BATCH_SIZE__");
|
|
|
|
boolean savePoint = ("true").equals(ElementParameterParser.getValue(node,"__SAVE_POINT__"));
|
|
|
|
String dieOnError = ElementParameterParser.getValue(node, "__DIE_ON_ERROR__");
|
|
|
|
boolean convertToLowercase = ("true").equals(ElementParameterParser.getValue(node, "__CONVERT_COLUMN_TABLE_TO_LOWERCASE__"));
|
|
|
|
boolean useAlternateSchema = "true".equals(ElementParameterParser.getValue(node, "__USE_ALTERNATE_SCHEMA__"));
|
|
|
|
String alternateSchema = ElementParameterParser.getValue(node, "__ALTERNATE_SCHEMA__");
|
|
|
|
String rejectConnName = null;
|
|
List<? extends IConnection> rejectConns = node.getOutgoingConnections("REJECT");
|
|
if(rejectConns != null && rejectConns.size() > 0) {
|
|
IConnection rejectConn = rejectConns.get(0);
|
|
rejectConnName = rejectConn.getName();
|
|
}
|
|
|
|
boolean useBatch = useBatchSize && (rejectConnName == null)
|
|
&& (
|
|
("INSERT").equals(dataAction) || ("UPDATE").equals(dataAction) || ("UPSERT").equals(dataAction) || ("DELETE").equals(dataAction)
|
|
);
|
|
%>
|
|
|
|
<%
|
|
getManager(dbmsId, cid, node);
|
|
|
|
boolean isDynamic = false;
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
if ((metadatas!=null)&&(metadatas.size()>0)) {
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
isDynamic = metadata.isDynamicSchema();
|
|
}
|
|
|
|
List<IMetadataColumn> columnList = getColumnList(node);
|
|
List<Column> stmtStructure = null;
|
|
if(columnList != null && columnList.size() > 0) {
|
|
stmtStructure = getManager(dbmsId, cid).createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
|
|
isDynamic = isDynamic && !getManager(dbmsId, cid).isDynamicColumnReplaced();
|
|
|
|
if(convertToLowercase){
|
|
for(Column column : stmtStructure) {
|
|
if(column.isReplaced()) {
|
|
for (Column replacedColumn : column.getReplacement()) {
|
|
replacedColumn.setColumnName(replacedColumn.getColumnName().toLowerCase());
|
|
}
|
|
} else if(!column.isDynamic()){
|
|
column.setColumnName(column.getColumnName().toLowerCase());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|
|
|
|
String dbschema_<%=cid%> = null;
|
|
<%
|
|
if(("true").equals(useExistingConn)) {
|
|
if(useAlternateSchema) {
|
|
%>
|
|
dbschema_<%=cid%> = <%=alternateSchema%>;
|
|
<%
|
|
} else {
|
|
%>
|
|
dbschema_<%=cid%> = (String)globalMap.get("schema_" + "<%=ElementParameterParser.getValue(node,"__CONNECTION__")%>");
|
|
<%
|
|
}
|
|
} else {
|
|
%>
|
|
dbschema_<%=cid%> = <%=dbschema%>;
|
|
<%
|
|
}
|
|
%>
|
|
|
|
String tableName_<%=cid%> = null;
|
|
if(dbschema_<%=cid%> == null || dbschema_<%=cid%>.trim().length() == 0) {
|
|
tableName_<%=cid%> = (<%=table%>)<%=convertToLowercase ? ".toLowerCase()" : ""%>;
|
|
} else {
|
|
tableName_<%=cid%> = dbschema_<%=cid%> + "\".\"" + (<%=table%>)<%=convertToLowercase ? ".toLowerCase()" : ""%>;
|
|
}
|
|
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/DB/Output/CheckKeysForUpdateAndDelete.javajet"%>
|
|
|
|
int nb_line_<%=cid%> = 0;
|
|
int nb_line_update_<%=cid%> = 0;
|
|
int nb_line_inserted_<%=cid%> = 0;
|
|
int nb_line_deleted_<%=cid%> = 0;
|
|
int nb_line_rejected_<%=cid%> = 0;
|
|
|
|
int deletedCount_<%=cid%>=0;
|
|
int updatedCount_<%=cid%>=0;
|
|
int insertedCount_<%=cid%>=0;
|
|
int rejectedCount_<%=cid%>=0;
|
|
|
|
boolean whetherReject_<%=cid%> = false;
|
|
|
|
java.sql.Connection conn_<%=cid%> = null;
|
|
String dbUser_<%=cid %> = null;
|
|
|
|
<%
|
|
if(("true").equals(useExistingConn)) {
|
|
String connection = ElementParameterParser.getValue(node,"__CONNECTION__");
|
|
String conn = "conn_" + connection;
|
|
%>
|
|
conn_<%=cid%> = (java.sql.Connection)globalMap.get("<%=conn%>");
|
|
<%dbLog.conn().useExistConn("conn_"+cid+".getMetaData().getURL()", "conn_"+cid+".getMetaData().getUserName()");%>
|
|
<%if(savePoint && !useBatchSize && !("true").equals(dieOnError)){%>
|
|
java.sql.Savepoint sp_<%=cid %> = null;
|
|
<%}
|
|
} else {
|
|
boolean specify_alias = "true".equals(ElementParameterParser.getValue(node, "__SPECIFY_DATASOURCE_ALIAS__"));
|
|
if(specify_alias){
|
|
String alias = ElementParameterParser.getValue(node, "__DATASOURCE_ALIAS__");
|
|
%>
|
|
java.util.Map<String, routines.system.TalendDataSource> dataSources_<%=cid%> = (java.util.Map<String, routines.system.TalendDataSource>) globalMap.get(KEY_DB_DATASOURCES);
|
|
if (null != dataSources_<%=cid%>) {
|
|
String dsAlias_<%=cid%> = <%=(null != alias && !("".equals(alias)))?alias:"\"\""%>;
|
|
if (dataSources_<%=cid%>.get(dsAlias_<%=cid%>) == null) {
|
|
throw new RuntimeException("No DataSource with alias: " + dsAlias_<%=cid%> + " available!");
|
|
}
|
|
conn_<%=cid%> = dataSources_<%=cid%>.get(dsAlias_<%=cid%>).getConnection();
|
|
} else {
|
|
<%
|
|
}
|
|
%>
|
|
<%dbLog.conn().logJDBCDriver(dbLog.str("org.postgresql.Driver"));%>
|
|
java.lang.Class.forName("org.postgresql.Driver");
|
|
|
|
<%
|
|
if(dbproperties == null || ("\"\"").equals(dbproperties) || ("").equals(dbproperties)) {
|
|
%>
|
|
String url_<%=cid %> = "jdbc:postgresql://"+<%=dbhost%>+":"+<%=dbport%>+"/"+<%=dbname%>;
|
|
<%
|
|
} else {
|
|
%>
|
|
String url_<%=cid %> = "jdbc:postgresql://"+<%=dbhost%>+":"+<%=dbport%>+"/"+<%=dbname%> + "?" + <%=dbproperties%>;
|
|
<%
|
|
}
|
|
%>
|
|
dbUser_<%=cid %> = <%=dbuser%>;
|
|
<%
|
|
String passwordFieldName = "__PASS__";
|
|
%>
|
|
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/password.javajet"%>
|
|
|
|
String dbPwd_<%=cid %> = decryptedPassword_<%=cid%>;
|
|
|
|
<%dbLog.conn().connTry(dbLog.var("url"), dbLog.var("dbUser"));%>
|
|
conn_<%=cid%> = java.sql.DriverManager.getConnection(url_<%=cid %>,dbUser_<%=cid%>,dbPwd_<%=cid%>);
|
|
<%dbLog.conn().connDone(dbLog.var("url"));%>
|
|
<%
|
|
if(specify_alias){
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
resourceMap.put("conn_<%=cid%>", conn_<%=cid%>);
|
|
<%if(savePoint && !useBatchSize && !("true").equals(dieOnError) && (!("").equals(commitEvery) && !("0").equals(commitEvery))){%>
|
|
java.sql.Savepoint sp_<%=cid %> = null;
|
|
<%}
|
|
}
|
|
if(!("true").equals(useExistingConn)) {
|
|
if(!("").equals(commitEvery) && !("0").equals(commitEvery)) {
|
|
%>
|
|
conn_<%=cid%>.setAutoCommit(false);
|
|
int commitEvery_<%=cid%> = <%=commitEvery%>;
|
|
int commitCounter_<%=cid%> = 0;
|
|
<%
|
|
}
|
|
}
|
|
dbLog.commit().logAutoCommit("conn_"+cid+".getAutoCommit()");
|
|
%>
|
|
|
|
<%if(("true").equals(useExistingConn) && savePoint){%>
|
|
if(conn_<%=cid%>.getAutoCommit()){
|
|
throw new RuntimeException("Connection autocommit must be off to use savePoint");
|
|
}
|
|
<%}%>
|
|
|
|
<%
|
|
if (useBatch) {
|
|
%>
|
|
int batchSize_<%=cid%> = <%=batchSize%>;
|
|
int batchSizeCounter_<%=cid%>=0;
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if(!isDynamic) {
|
|
%>
|
|
int count_<%=cid%>=0;
|
|
<%
|
|
if(columnList != null && columnList.size()>0) {
|
|
Manager manager = null;
|
|
%>
|
|
<%@ include file="../templates/_tableActionForOutput.javajet"%>
|
|
<%
|
|
Map<String, StringBuilder> actionSQLMap = getManager(dbmsId, cid).createProcessSQL(stmtStructure);
|
|
StringBuilder insertColName = actionSQLMap.get(INSERT_COLUMN_NAME);
|
|
StringBuilder insertValueStmt = actionSQLMap.get(INSERT_VALUE_STMT);
|
|
StringBuilder updateSetStmt = actionSQLMap.get(UPDATE_SET_STMT);
|
|
StringBuilder updateWhereStmt = actionSQLMap.get(UPDATE_WHERE_STMT);
|
|
StringBuilder deleteWhereStmt = actionSQLMap.get(DELETE_WHERE_STMT);
|
|
|
|
if(("INSERT").equals(dataAction) || ("UPSERT").equals(dataAction)) {
|
|
%>
|
|
java.lang.StringBuilder sb_<%=cid%> = new java.lang.StringBuilder();
|
|
sb_<%=cid%>.append("INSERT INTO \"").append(tableName_<%=cid%>).append("\" (<%=insertColName.toString()%>) VALUES (<%=insertValueStmt.toString()%>)");
|
|
|
|
<%if (("UPSERT").equals(dataAction)) {
|
|
java.lang.StringBuilder primaryKeysSB = new java.lang.StringBuilder();
|
|
java.lang.StringBuilder updateColumnSB = new java.lang.StringBuilder();
|
|
for(Column column : stmtStructure) {
|
|
String columnName = column.getColumnName();
|
|
if (column.isReplaced()) {
|
|
for (Column replacedColumn : column.getReplacement()) {
|
|
columnName = replacedColumn.getColumnName();
|
|
}
|
|
}
|
|
if (column.isKey()) {
|
|
primaryKeysSB.append("\\\"").append(columnName).append("\\\"").append(",");
|
|
} else {
|
|
updateColumnSB.append("\\\"").append(columnName).append("\\\"").append(" = EXCLUDED.").append("\\\"").append(columnName).append("\\\"").append(",");
|
|
}
|
|
}
|
|
String primaryKeys = primaryKeysSB.toString();
|
|
primaryKeys = (primaryKeys.length() == 0) ? "" : primaryKeys.substring(0, primaryKeys.length() - 1);
|
|
String updateColumns = updateColumnSB.toString();
|
|
updateColumns = (updateColumns.length() == 0) ? "" : updateColumns.substring(0, updateColumns.length() - 1);
|
|
%>
|
|
|
|
sb_<%=cid%>.append(" ON CONFLICT (").append("<%=primaryKeys%>").append(")").append(" DO UPDATE SET ").append("<%=updateColumns%>");
|
|
|
|
<%
|
|
}%>
|
|
String insert_<%=cid%> = sb_<%=cid%>.toString();
|
|
|
|
<%dbLog.data().sqlExecuteTry("insert_"+cid);%>
|
|
|
|
<%
|
|
boolean usingStatsLogs = cid.equals("talendLogs_DB") || cid.equals("talendStats_DB") || cid.equals("talendMeter_DB");
|
|
if(!usingStatsLogs) {
|
|
%>
|
|
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement(insert_<%=cid%>);
|
|
resourceMap.put("pstmt_<%=cid %>", pstmt_<%=cid %>);
|
|
<%
|
|
} else {
|
|
%>
|
|
java.sql.PreparedStatement pstmt_<%=cid %> = null;
|
|
String keyPsmt_<%=cid %> = conn_<%=cid%> + "[psmt]" + "[" + <%=table%> + "]";
|
|
pstmt_<%=cid %> = SharedDBPreparedStatement.getSharedPreparedStatement(conn_<%=cid%>,insert_<%=cid%>,keyPsmt_<%=cid%>);
|
|
resourceMap.put("pstmt_<%=cid %>", pstmt_<%=cid %>);
|
|
<%
|
|
}
|
|
} else if (("UPDATE").equals(dataAction)) {
|
|
%>
|
|
String update_<%=cid%> = "UPDATE \"" + tableName_<%=cid%> + "\" SET <%=updateSetStmt.toString()%> WHERE <%=updateWhereStmt.toString()%>";
|
|
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement(update_<%=cid%>);
|
|
resourceMap.put("pstmt_<%=cid %>", pstmt_<%=cid %>);
|
|
<%
|
|
} else if (("INSERT_OR_UPDATE").equals(dataAction)) {
|
|
%>
|
|
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement("SELECT COUNT(1) FROM \"" + tableName_<%=cid%> + "\" WHERE <%=updateWhereStmt.toString()%>");
|
|
resourceMap.put("pstmt_<%=cid %>", pstmt_<%=cid %>);
|
|
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES (<%=insertValueStmt.toString()%>)";
|
|
java.sql.PreparedStatement pstmtInsert_<%=cid %> = conn_<%=cid%>.prepareStatement(insert_<%=cid%>);
|
|
resourceMap.put("pstmtInsert_<%=cid %>", pstmtInsert_<%=cid %>);
|
|
String update_<%=cid%> = "UPDATE \"" + tableName_<%=cid%> + "\" SET <%=updateSetStmt.toString()%> WHERE <%=updateWhereStmt.toString()%>";
|
|
java.sql.PreparedStatement pstmtUpdate_<%=cid %> = conn_<%=cid%>.prepareStatement(update_<%=cid%>);
|
|
resourceMap.put("pstmtUpdate_<%=cid %>", pstmtUpdate_<%=cid %>);
|
|
<%
|
|
} else if (("UPDATE_OR_INSERT").equals(dataAction)) {
|
|
%>
|
|
String update_<%=cid%> = "UPDATE \"" + tableName_<%=cid%> + "\" SET <%=updateSetStmt.toString()%> WHERE <%=updateWhereStmt.toString()%>";
|
|
java.sql.PreparedStatement pstmtUpdate_<%=cid %> = conn_<%=cid%>.prepareStatement(update_<%=cid%>);
|
|
resourceMap.put("pstmtUpdate_<%=cid %>", pstmtUpdate_<%=cid %>);
|
|
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES (<%=insertValueStmt.toString()%>)";
|
|
java.sql.PreparedStatement pstmtInsert_<%=cid %> = conn_<%=cid%>.prepareStatement(insert_<%=cid%>);
|
|
resourceMap.put("pstmtInsert_<%=cid %>", pstmtInsert_<%=cid %>);
|
|
<%
|
|
} else if (("DELETE").equals(dataAction)) {
|
|
%>
|
|
String delete_<%=cid%> = "DELETE FROM \"" + tableName_<%=cid%> + "\" WHERE <%=deleteWhereStmt.toString()%>";
|
|
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement(delete_<%=cid%>);
|
|
resourceMap.put("pstmt_<%=cid %>", pstmt_<%=cid %>);
|
|
<%
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
StringBuffer query_<%=cid%> = null;
|
|
<%@ include file="../templates/DB/Output/splitSQLForAllDBInBegin.javajet" %>
|
|
<%
|
|
}
|
|
}
|
|
} //end isDynamic
|
|
boolean isParallelize ="true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__PARALLELIZE__"));
|
|
boolean createTable = "CREATE".equals(tableAction) || "DROP_CREATE".equals(tableAction) || "CREATE_IF_NOT_EXISTS".equals(tableAction) || "DROP_IF_EXISTS_AND_CREATE".equals(tableAction);
|
|
if((createGeometryColumns || createSpatialIndex) && useSpatialOptions && !isParallelize && createTable) {
|
|
%>
|
|
String geometryColumnName_<%=cid %> = null;
|
|
// Initialize SRID
|
|
int SRID_<%=cid %> = 0;
|
|
int dimension_<%=cid %> = 0;
|
|
boolean firstGeometry_<%=cid %> = true;
|
|
<%
|
|
}
|
|
|
|
|
|
if(isDynamic) {
|
|
%>
|
|
java.sql.PreparedStatement pstmt_<%=cid %> =null;
|
|
java.sql.PreparedStatement pstmtInsert_<%=cid %> =null;
|
|
java.sql.PreparedStatement pstmtUpdate_<%=cid %> =null;
|
|
<%if(isEnableDebug) {%>
|
|
StringBuffer query_<%=cid%> = null;
|
|
<%@ include file="../templates/DB/Output/splitSQLForAllDBInDynamicBegin.javajet" %>
|
|
<%
|
|
}
|
|
}
|
|
%>
|