<%@ jet imports=" org.talend.core.model.process.INode org.talend.core.model.process.ElementParameterParser org.talend.designer.codegen.config.CodeGeneratorArgument org.talend.core.model.metadata.IMetadataTable org.talend.core.model.metadata.MetadataTalendType org.talend.core.model.metadata.MappingTypeRetriever java.util.List java.util.ArrayList java.util.Map java.util.HashMap " skeleton="../templates/db_output_bulk.skeleton" %> <%@ include file="../templates/Log4j/Log4jDBConnUtil.javajet"%> <% CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; INode node = (INode)codeGenArgument.getArgument(); String cid = node.getUniqueName(); String table = ElementParameterParser.getValue(node, "__TABLE__"); String dbSchema = ElementParameterParser.getValue(node, "__SCHEMA_DB__"); String dataAction = ElementParameterParser.getValue(node,"__DATA_ACTION__"); boolean standardConformingString= ("true").equals(ElementParameterParser.getValue(node,"__STANDARD_CONFORMING_STRING__")); boolean convertToUppercase = false; // not active this function boolean useExistingConnection = "true".equalsIgnoreCase(ElementParameterParser.getValue(node,"__USE_EXISTING_CONNECTION__")); %> int deletedCount_<%=cid%>=0; String tableName_<%=cid%> = null; String dbschema_<%=cid%> = null; <% if(useExistingConnection) { %> dbschema_<%=cid%> = (String)globalMap.get("schema_" + "<%=ElementParameterParser.getValue(node,"__CONNECTION__")%>"); <% } else { %> dbschema_<%=cid%> = <%=dbSchema%>; <% } %> if(dbschema_<%=cid%> == null || dbschema_<%=cid%>.trim().length() == 0) { tableName_<%=cid%> = <%=table%>; } else { tableName_<%=cid%> = dbschema_<%=cid%> + "\".\"" + <%=table%>; } <% if(("UPDATE").equals(dataAction)) { %> String tmpTableName_<%=cid%> = "tmp_<%=cid %>_" + pid + Thread.currentThread().getId(); //tableName not end. will add pid + Thread.currentThread().getId() when create a temp table for update. <% } String file = ElementParameterParser.getValue(node, "__FILENAME__"); List columnList = getColumnList(node); StringBuilder copyColumns = new StringBuilder(""); if(columnList != null && columnList.size() > 0) { copyColumns.append(" ("); int count = 0; for(IMetadataColumn column : columnList) { copyColumns.append("\\\"" + column.getOriginalDbColumnName() + "\\\""); if(count != columnList.size() - 1) { copyColumns.append(","); } count++; } copyColumns.append(") "); } StringBuilder notNullColumns = new StringBuilder(" "); List> notNullCols = (List>)ElementParameterParser.getObjectValue( node,"__FORCENOTNULL__" ); if(notNullCols != null && notNullCols.size() > 0) { notNullColumns.append("FORCE NOT NULL "); int notNullCount = 0; for(Map notNullCol : notNullCols) { if(("true").equals(notNullCol.get("FORCENOTNULL"))) { notNullColumns.append("\\\"" + notNullCol.get("SCHEMA_COLUMN") + "\\\","); notNullCount++; } } if(notNullCount == 0) { notNullColumns = new StringBuilder(" "); } } %> String bulkSQL_<%=cid%> = "COPY \"" + <%if(("UPDATE").equals(dataAction)) {%>tmpTableName_<%=cid%><% } else {%>tableName_<%=cid%> <%}%> + "\"<%=copyColumns.toString()%>FROM '" + <%=file%> + "' WITH "; <% String fileType = ElementParameterParser.getValue(node, "__FILETYPE__"); String oids = (("true").equals(ElementParameterParser.getValue(node, "__OIDS__")) ? "OIDS" : ""); if(("BINARYFILE").equals(fileType)) { %> bulkSQL_<%=cid%> += "BINARY <%=oids%>"; <% } else { %> Character field_separator_<%=cid %> = (<%=ElementParameterParser.getValue(node, "__FIELDS_TERMINATED_BY__")%>).charAt(0); bulkSQL_<%=cid%> += "<%=oids%> " + "DELIMITER AS '" + field_separator_<%=cid %> + "' "; bulkSQL_<%=cid%> += "NULL AS '" + <%=ElementParameterParser.getValue(node, "__NULLSTRING__")%> + "' "; <% if(("CSVFILE").equals(fileType)) { String header = (("true").equals(ElementParameterParser.getValue(node, "__HEADER__")) ? "HEADER" : ""); String escapeChar = ElementParameterParser.getValue(node, "__ESCAPE_CHAR__"); String textEnclosure = ElementParameterParser.getValue(node, "__TEXT_ENCLOSURE__"); %> Character text_enclosure_<%=cid%> = (<%=textEnclosure %>).charAt(0); bulkSQL_<%=cid%> += "CSV <%=header%> QUOTE AS '" + text_enclosure_<%=cid%>.toString().replaceAll("\\\\", "\\\\\\\\") + "' "; <%if(standardConformingString) { %> bulkSQL_<%=cid%> += "ESCAPE AS '" + <%=escapeChar %> + "' "; <%}else {%> bulkSQL_<%=cid%> += "ESCAPE AS '" + <%=escapeChar %>.replaceAll("\\\\", "\\\\\\\\") + "' "; <%}%> bulkSQL_<%=cid%> += "<%=notNullColumns.toString().substring(0, notNullColumns.toString().length() - 1)%>"; <% } } String host = ElementParameterParser.getValue(node, "__HOST__"); String port = ElementParameterParser.getValue(node, "__PORT__"); String dbName = ElementParameterParser.getValue(node, "__DBNAME__"); String userName = ElementParameterParser.getValue(node, "__USER__"); %> java.sql.Connection conn_<%=cid%> = null; <% if(useExistingConnection) { String conn = "conn_" + ElementParameterParser.getValue(node,"__CONNECTION__"); %> conn_<%=cid%> = (java.sql.Connection)globalMap.get("<%=conn%>"); <% log4jCodeGenerateUtil.useExistConnection(node); } else { %> String driverClass_<%=cid%> = "org.postgresql.Driver"; java.lang.Class.forName(driverClass_<%=cid%>).newInstance(); String url_<%=cid%> = "jdbc:postgresql://" + <%=host%> + ":" + <%=port%> + "/" + <%=dbName%>; String dbUser_<%=cid%> = <%=userName%>; <% String passwordFieldName = "__PASS__"; %> <%@ include file="@{org.talend.designer.components.localprovider}/components/templates/password.javajet"%> String dbPwd_<%=cid%> = decryptedPassword_<%=cid%>; <% log4jCodeGenerateUtil.debugConnectionParams(node); log4jCodeGenerateUtil.connect(node); } %> <% String tableAction = ElementParameterParser.getValue(node,"__TABLE_ACTION__"); String dbmsId = ElementParameterParser.getValue(node,"__MAPPING__"); List stmtStructure = null; Manager manager = null; if(columnList != null && columnList.size() > 0) { stmtStructure = getManager(dbmsId, cid).createColumnList(columnList, false, null, null); } if((columnList != null && columnList.size() > 0) || "CLEAR".equals(tableAction) || "TRUNCATE".equals(tableAction)){ %> <%@ include file="../templates/_tableActionForBulk.javajet"%> <% } boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__")); if(("UPDATE").equals(dataAction)) { if(columnList != null && columnList.size() > 0) { int keyCount = 0; for(IMetadataColumn column : columnList) { if(column.isKey()) { keyCount++; } } %> int keyCount_<%=cid%> = <%=keyCount%>; if(keyCount_<%=cid%> == 0) { throw new RuntimeException("For bulk update, Schema must have a key at least."); } tmpTableName_<%=cid%> = tableName_<%=cid%>; tableName_<%=cid%> = "tmp_<%=cid%>" + "_" + pid + Thread.currentThread().getId(); java.sql.Statement stmtCreateTmp_<%=cid%> = conn_<%=cid%>.createStatement(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Creating temp table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%>." ); <%}%> stmtCreateTmp_<%=cid%>.execute("<%=manager.getCreateTableSQL(stmtStructure)%>)"); stmtCreateTmp_<%=cid%>.close(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Create temp table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%> has succeeded."); <%}%> <%if(standardConformingString) { %> java.sql.CallableStatement cs_<%=cid%> =conn_<%=cid %>.prepareCall("set standard_conforming_strings=on"); cs_<%=cid%>.execute(); cs_<%=cid%>.close(); <%}%> java.sql.Statement stmtTmpBulk_<%=cid%> = conn_<%=cid%>.createStatement(); <%if(isLog4jEnabled){%> log.debug("<%=cid%> - Bulk SQL:"+bulkSQL_<%=cid%>+"."); log.info("<%=cid%> - Bulk inserting data into <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%>." ); <%}%> stmtTmpBulk_<%=cid%>.execute(bulkSQL_<%=cid%>); stmtTmpBulk_<%=cid%>.close(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Bulk insert data into <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%> has finished."); <%}%> tableName_<%=cid%> = tmpTableName_<%=cid%>; tmpTableName_<%=cid%> = "tmp_<%=cid%>" + "_" + pid + Thread.currentThread().getId(); java.sql.Statement stmtUpdateBulk_<%=cid%> = conn_<%=cid%>.createStatement(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Updating <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%> from <%=manager.getLProtectedChar()%>"+tmpTableName_<%=cid%>+"<%=manager.getRProtectedChar()%>."); <%}%> stmtUpdateBulk_<%=cid%>.executeUpdate("<%=manager.getUpdateBulkSQL(columnList)%>"); <%log4jCodeGenerateUtil.logInfo(node,"info",cid+" - Update has finished.");%> stmtUpdateBulk_<%=cid%>.close(); tableName_<%=cid%> = tmpTableName_<%=cid%>; java.sql.Statement stmtTmpDrop_<%=cid%> = conn_<%=cid%>.createStatement(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Droping temp table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%>."); <%}%> stmtTmpDrop_<%=cid%>.execute("<%=manager.getDropTableSQL()%>"); stmtTmpDrop_<%=cid%>.close(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Drop temp table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%>+ "<%=manager.getRProtectedChar()%> has succeeded."); <%}%> <% } } else if(("INSERT").equals(dataAction)) { %> <%if(standardConformingString) { %> java.sql.CallableStatement cs_<%=cid%> =conn_<%=cid %>.prepareCall("set standard_conforming_strings=on"); cs_<%=cid%>.execute(); cs_<%=cid%>.close(); <%}%> java.sql.Statement stmtBulk_<%=cid %> = conn_<%=cid %>.createStatement(); //stmt.execute("SET client_encoding to 'UNICODE'"); <%if(isLog4jEnabled){%> log.debug("<%=cid%> - Bulk SQL:"+bulkSQL_<%=cid%>+"."); log.info("<%=cid%> - Inserting records in table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%>."); <%}%> stmtBulk_<%=cid %>.execute(bulkSQL_<%=cid%>); stmtBulk_<%=cid %>.close(); <%if(isLog4jEnabled){%> log.info("<%=cid%> - Insert records in table <%=manager.getLProtectedChar()%>" + tableName_<%=cid%> + "<%=manager.getRProtectedChar()%> has finished."); <% } } if(!useExistingConnection) { log4jCodeGenerateUtil.close(node); } %>