895 lines
42 KiB
Plaintext
895 lines
42 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.metadata.MappingTypeRetriever
|
|
org.talend.core.model.metadata.MetadataTalendType
|
|
org.talend.core.model.process.IConnectionCategory
|
|
java.util.List
|
|
java.util.ArrayList
|
|
java.util.Map
|
|
java.util.HashMap
|
|
"
|
|
skeleton="../templates/db_output_bulk.skeleton"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/DBLogUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
String cid = node.getUniqueName();
|
|
|
|
dbLog = new DBLogUtil(node);
|
|
|
|
String dataAction = ElementParameterParser.getValue(node,"__DATA_ACTION__");
|
|
String dieOnError = ElementParameterParser.getValue(node, "__DIE_ON_ERROR__");
|
|
String tableName = ElementParameterParser.getValue(node,"__TABLE__");
|
|
String dbmsId = ElementParameterParser.getValue(node,"__MAPPING__");
|
|
List<Map<String, String>> addCols =
|
|
(List<Map<String,String>>)ElementParameterParser.getObjectValue(node,"__ADD_COLS__");
|
|
|
|
boolean useFieldOptions = ("true").equals(ElementParameterParser.getValue(node, "__USE_FIELD_OPTIONS__"));
|
|
|
|
List<Map<String, String>> fieldOptions = (List<Map<String,String>>)ElementParameterParser.getObjectValue(node, "__FIELD_OPTIONS__");
|
|
|
|
String commitEvery = ElementParameterParser.getValue(node, "__COMMIT_EVERY__");
|
|
|
|
String useExistingConn = ElementParameterParser.getValue(node,"__USE_EXISTING_CONNECTION__");
|
|
boolean isEnableDebug = ("true").equals(ElementParameterParser.getValue(node,"__ENABLE_DEBUG_MODE__"));
|
|
|
|
//feature:2880
|
|
Manager manager = getManager(dbmsId, cid, node);
|
|
boolean whereSupportNull = ElementParameterParser.getValue(node, "__SUPPORT_NULL_WHERE__").equals("true");
|
|
|
|
boolean useBatch = "true".equals(ElementParameterParser.getValue(node, "__USE_BATCH__"));
|
|
|
|
String incomingConnName = null;
|
|
List<IMetadataColumn> columnList = getColumnList(node);
|
|
|
|
List< ? extends IConnection> conns = node.getIncomingConnections();
|
|
if(conns!=null && conns.size()>0){
|
|
IConnection conn = conns.get(0);
|
|
incomingConnName = conn.getName();
|
|
}
|
|
|
|
String rejectConnName = null;
|
|
List<? extends IConnection> rejectConns = node.getOutgoingConnections("REJECT");
|
|
if(rejectConns != null && rejectConns.size() > 0) {
|
|
IConnection rejectConn = rejectConns.get(0);
|
|
rejectConnName = rejectConn.getName();
|
|
}
|
|
|
|
useBatch = (rejectConnName == null)
|
|
&& (
|
|
("INSERT").equals(dataAction) || ("UPDATE").equals(dataAction) || ("DELETE").equals(dataAction)
|
|
)
|
|
&& useBatch;
|
|
|
|
List<IMetadataColumn> rejectColumnList = null;
|
|
IMetadataTable metadataTable = node.getMetadataFromConnector("REJECT");
|
|
if(metadataTable != null) {
|
|
rejectColumnList = metadataTable.getListColumns();
|
|
}
|
|
|
|
List<? extends IConnection> outgoingConns = node.getOutgoingSortedConnections();
|
|
for(IConnection conn : outgoingConns) {
|
|
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
|
|
%>
|
|
<%=conn.getName() %> = null;
|
|
<%
|
|
}
|
|
}
|
|
|
|
List<Column> stmtStructure = manager.createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
|
|
|
|
////////////////////////////////////////////////////////////
|
|
List<Column> colStruct = new ArrayList();
|
|
for(Column colStmt:stmtStructure){
|
|
if(!colStmt.isReplaced()&&!colStmt.isAddCol()){
|
|
colStruct.add(colStmt);
|
|
}
|
|
}
|
|
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = new StringBuffer("");
|
|
<%
|
|
}
|
|
|
|
boolean isParallelize ="true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__PARALLELIZE__"));
|
|
|
|
if (isParallelize) { // bug0014422
|
|
String tAsyncIn_cid = "";
|
|
if(conns!=null && conns.size() > 0) {
|
|
tAsyncIn_cid = conns.get(0).getSource().getUniqueName();
|
|
}
|
|
if(!"true".equals(useExistingConn)) {
|
|
if(!("").equals(commitEvery)&&!("0").equals(commitEvery)) {
|
|
%>
|
|
commitEvery_<%=cid%> = buffersSize_<%=tAsyncIn_cid%>;
|
|
<%
|
|
}
|
|
}
|
|
if (useBatch) {
|
|
%>
|
|
batchSize_<%=cid%> = buffersSize_<%=tAsyncIn_cid%>;
|
|
<%
|
|
}
|
|
} // end bug0014422
|
|
|
|
if(incomingConnName != null && columnList != null) {
|
|
%>
|
|
whetherReject_<%=cid%> = false;
|
|
<%
|
|
if(("INSERT").equals(dataAction)) {
|
|
int counter = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isInsertable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counter, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counter, "insertSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counter++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if(useBatch){
|
|
%>
|
|
pstmt_<%=cid %>.addBatch();
|
|
batchSizeCounter_<%=cid%>++;
|
|
nb_line_<%=cid%>++;
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
<% dbLog.data().addingToBatch(dbLog.var("nb_line"), dbLog.str(dataAction)); %>
|
|
<%
|
|
}else{
|
|
%>
|
|
try {
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmt_<%=cid %>.executeUpdate();
|
|
insertedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().inserting(dbLog.var("nb_line"));%>
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
} else if(("UPDATE").equals(dataAction)) {
|
|
int counterCol = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isUpdatable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counterCol, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
}
|
|
|
|
for(Column column : colStruct) {
|
|
if(column.isUpdateKey()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
String dbType = column.getColumn().getType();
|
|
%>
|
|
|
|
<%
|
|
//#############for feature:2880 and 6980
|
|
if(whereSupportNull && column.getColumn().isNullable()) { %>
|
|
<%=manager.generateSetBooleanForNullableKeyStmt(column, counterCol, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_", "(("+incomingConnName+"."+column.getName()+"==null)?1:0)")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
//#############
|
|
%>
|
|
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counterCol, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if(useBatch){
|
|
%>
|
|
pstmt_<%=cid %>.addBatch();
|
|
batchSizeCounter_<%=cid%>++;
|
|
nb_line_<%=cid%>++;
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
<% dbLog.data().addingToBatch(dbLog.var("nb_line"), dbLog.str(dataAction)); %>
|
|
<%
|
|
}else{
|
|
%>
|
|
try {
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmt_<%=cid %>.executeUpdate();
|
|
updatedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().updating(dbLog.var("nb_line"));%>
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
} else if (("INSERT_OR_UPDATE").equals(dataAction)) {
|
|
int columnIndex = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isUpdateKey()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
String dbType = column.getColumn().getType();
|
|
%>
|
|
<%
|
|
//#############for feature:2880 and 6980
|
|
if(whereSupportNull && column.getColumn().isNullable()) { %>
|
|
<%=manager.generateSetBooleanForNullableKeyStmt(column, columnIndex, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
columnIndex++;
|
|
}
|
|
//#############
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, columnIndex, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
columnIndex++;
|
|
}
|
|
}
|
|
%>
|
|
int checkCount_<%=cid%> = -1;
|
|
try (java.sql.ResultSet rs_<%=cid%> = pstmt_<%=cid %>.executeQuery()) {
|
|
while(rs_<%=cid%>.next()) {
|
|
checkCount_<%=cid%> = rs_<%=cid%>.getInt(1);
|
|
}
|
|
}
|
|
if(checkCount_<%=cid%> > 0) {
|
|
<%
|
|
int counterCol = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isUpdatable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counterCol, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
}
|
|
for(Column column : colStruct) {
|
|
if(column.isUpdateKey()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
String dbType = column.getColumn().getType();
|
|
%>
|
|
<%
|
|
//#############for feature:2880
|
|
if(whereSupportNull && column.getColumn().isNullable()) { %>
|
|
<%=manager.generateSetBooleanForNullableKeyStmt(column, counterCol, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_", "(("+incomingConnName+"."+column.getName()+"==null)?1:0)")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
//#############
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counterCol, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counterCol, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterCol++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
try {
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmtUpdate_<%=cid %>.executeUpdate();
|
|
updatedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().updating(dbLog.var("nb_line"));%>
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
%>
|
|
nb_line_<%=cid%>++;
|
|
<%
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
} else {
|
|
<%
|
|
int counterInsert = 1;
|
|
for(Column columnInsert : colStruct) {
|
|
if(columnInsert.isInsertable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(columnInsert.getColumn().getTalendType(), columnInsert.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, columnInsert, counterInsert, incomingConnName, cid, INSERT_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, columnInsert, incomingConnName, cid, "query_" , counterInsert, "insertSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterInsert++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
try {
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmtInsert_<%=cid %>.executeUpdate();
|
|
insertedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().inserting(dbLog.var("nb_line"));%>
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
%>
|
|
nb_line_<%=cid%>++;
|
|
<%
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
}
|
|
<%
|
|
} else if (("UPDATE_OR_INSERT").equals(dataAction)) {
|
|
%>
|
|
int updateFlag_<%=cid%>=0;
|
|
<%
|
|
int counterColUpdate = 1;
|
|
for(Column columnUpdate : colStruct) {
|
|
if(columnUpdate.isUpdatable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(columnUpdate.getColumn().getTalendType(), columnUpdate.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, columnUpdate, counterColUpdate, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, columnUpdate, incomingConnName, cid, "query_" , counterColUpdate, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterColUpdate++;
|
|
}
|
|
}
|
|
|
|
for(Column columnUpdate : colStruct) {
|
|
if(columnUpdate.isUpdateKey()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(columnUpdate.getColumn().getTalendType(), columnUpdate.getColumn().isNullable());
|
|
String dbType = columnUpdate.getColumn().getType();
|
|
%>
|
|
<%
|
|
//#############for feature:2880 and 6980
|
|
if(whereSupportNull && columnUpdate.getColumn().isNullable()) { %>
|
|
<%=manager.generateSetBooleanForNullableKeyStmt(columnUpdate, counterColUpdate, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, columnUpdate, incomingConnName, cid, "query_" , counterColUpdate, "updateSQLSplits_", "(("+incomingConnName+"."+columnUpdate.getName()+"==null)?1:0)")%>;
|
|
<%
|
|
}
|
|
counterColUpdate++;
|
|
}
|
|
//#############
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, columnUpdate, counterColUpdate, incomingConnName, cid, UPDATE_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, columnUpdate, incomingConnName, cid, "query_" , counterColUpdate, "updateSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counterColUpdate++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
|
|
try {
|
|
updateFlag_<%=cid%>=pstmtUpdate_<%=cid %>.executeUpdate();
|
|
updatedCount_<%=cid%> = updatedCount_<%=cid%>+updateFlag_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += updateFlag_<%=cid%>;
|
|
if(updateFlag_<%=cid%> == 0) {
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = new StringBuffer("");
|
|
<%
|
|
}
|
|
int counter = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isInsertable()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, counter, incomingConnName, cid, INSERT_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , counter, "insertSQLSplits_")%>;
|
|
<%
|
|
}
|
|
counter++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmtInsert_<%=cid %>.executeUpdate();
|
|
insertedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().inserting(dbLog.var("nb_line"));%>
|
|
}else{
|
|
nb_line_<%=cid%>++;
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
<%dbLog.data().updating(dbLog.var("nb_line"));%>
|
|
}
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
%>
|
|
nb_line_<%=cid%>++;
|
|
<%
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
} else if (("DELETE").equals(dataAction)) {
|
|
int keyCounter = 1;
|
|
for(Column column : colStruct) {
|
|
if(column.isDeleteKey()) {
|
|
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
|
String dbType = column.getColumn().getType();
|
|
%>
|
|
<%
|
|
//#############for feature:2880 and 6980
|
|
if(whereSupportNull && column.getColumn().isNullable()) { %>
|
|
<%=manager.generateSetBooleanForNullableKeyStmt(column, keyCounter, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , keyCounter, "deleteSQLSplits_", "(("+incomingConnName+"."+column.getName()+"==null)?1:0)")%>;
|
|
<%
|
|
}
|
|
keyCounter++;
|
|
}
|
|
//#############
|
|
%>
|
|
<%=manager.generateSetStmt(typeToGenerate, column, keyCounter, incomingConnName, cid, NORMAL_TYPE)%>
|
|
<%
|
|
if(isEnableDebug) {
|
|
%>
|
|
query_<%=cid%> = <%=manager.retrieveSQL(typeToGenerate, column, incomingConnName, cid, "query_" , keyCounter, "deleteSQLSplits_")%>;
|
|
<%
|
|
}
|
|
keyCounter++;
|
|
}
|
|
}
|
|
if(isEnableDebug) {
|
|
%>
|
|
globalMap.put("<%=cid %>_QUERY", query_<%=cid%>.toString().trim());
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
if(useBatch){
|
|
%>
|
|
pstmt_<%=cid %>.addBatch();
|
|
batchSizeCounter_<%=cid%>++;
|
|
nb_line_<%=cid%>++;
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
<% dbLog.data().addingToBatch(dbLog.var("nb_line"), dbLog.str(dataAction)); %>
|
|
<%
|
|
}else{
|
|
%>
|
|
try {
|
|
<%if(isEnableDebug){dbLog.data().sqlDebugPrint("globalMap.get(\""+cid+"_QUERY\")");}%>
|
|
int processedCount_<%=cid%> = pstmt_<%=cid %>.executeUpdate();
|
|
deletedCount_<%=cid%> += processedCount_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += processedCount_<%=cid%>;
|
|
nb_line_<%=cid%>++;
|
|
<%dbLog.data().deleting(dbLog.var("nb_line"));%>
|
|
} catch(java.lang.Exception e) {
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
whetherReject_<%=cid%> = true;
|
|
<%
|
|
if (("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
} else {
|
|
%>
|
|
nb_line_<%=cid%>++;
|
|
<%
|
|
if(rejectConnName != null && rejectColumnList != null && rejectColumnList.size() > 0) {
|
|
%>
|
|
<%=rejectConnName %> = new <%=rejectConnName %>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=rejectConnName%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
%>
|
|
rejectedCount_<%=cid%> = rejectedCount_<%=cid%> + 1;
|
|
<%=rejectConnName%>.errorCode = ((java.sql.SQLException)e).getSQLState();
|
|
<%=rejectConnName%>.errorMessage = e.getMessage() + " - Line: " + tos_count_<%=node.getUniqueName() %>;
|
|
<%
|
|
} else {
|
|
%>
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.print(e.getMessage());
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
}
|
|
|
|
if(outgoingConns != null && outgoingConns.size() > 0) {
|
|
%>
|
|
if(!whetherReject_<%=cid%>) {
|
|
<%
|
|
for(IConnection outgoingConn : outgoingConns) {
|
|
if(rejectConnName == null || (rejectConnName != null && !outgoingConn.getName().equals(rejectConnName))) {
|
|
if(outgoingConn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
|
|
%>
|
|
<%=outgoingConn.getName()%> = new <%=outgoingConn.getName()%>Struct();
|
|
<%
|
|
for(IMetadataColumn column : columnList) {
|
|
%>
|
|
<%=outgoingConn.getName()%>.<%=column.getLabel()%> = <%=incomingConnName%>.<%=column.getLabel()%>;
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
|
|
|
|
//////////batch execute by batch size///////
|
|
if (useBatch) {
|
|
%>
|
|
if ((batchSize_<%=cid%> > 0) && (batchSize_<%=cid%> <= batchSizeCounter_<%=cid%>)) {
|
|
int[] status_<%=cid%> = null;
|
|
int countSum_<%=cid%> = 0;
|
|
try {
|
|
batchSizeCounter_<%=cid%> = 0;
|
|
<%dbLog.batch().executeTry(dbLog.str(dataAction));%>
|
|
status_<%=cid%> = pstmt_<%=cid %>.executeBatch();
|
|
<%dbLog.batch().executeDone(dbLog.str(dataAction));%>
|
|
for(int countEach_<%=cid%>: status_<%=cid%>) {
|
|
countSum_<%=cid%> += (countEach_<%=cid%> < 0 ? 0 : countEach_<%=cid%>);
|
|
}
|
|
}catch (java.sql.BatchUpdateException e){
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
<%if(("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
}else {
|
|
%>
|
|
for(int countEach_<%=cid%>: e.getUpdateCounts()) {
|
|
countSum_<%=cid%> += (countEach_<%=cid%> < 0 ? 0 : countEach_<%=cid%>);
|
|
}
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.println(e.getMessage());
|
|
<%
|
|
}%>
|
|
}
|
|
try {
|
|
tmp_batchUpdateCount_<%=cid%> = pstmt_<%=cid %>.getUpdateCount();
|
|
}catch (java.sql.SQLException e){
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
<%dbLog.logPrintedException("e.getMessage()");%>
|
|
System.err.println(e.getMessage());
|
|
}
|
|
tmp_batchUpdateCount_<%=cid%> = tmp_batchUpdateCount_<%=cid%> > countSum_<%=cid%> ? tmp_batchUpdateCount_<%=cid%> : countSum_<%=cid%>;
|
|
rowsToCommitCount_<%=cid%> += tmp_batchUpdateCount_<%=cid%>;
|
|
<%if (("INSERT").equals(dataAction)) {
|
|
%>
|
|
insertedCount_<%=cid%> += tmp_batchUpdateCount_<%=cid%>;
|
|
<%
|
|
}else if (("UPDATE").equals(dataAction)) {
|
|
%>
|
|
updatedCount_<%=cid%> += tmp_batchUpdateCount_<%=cid%>;
|
|
<%
|
|
}else if (("DELETE").equals(dataAction)) {
|
|
%>
|
|
deletedCount_<%=cid%> += tmp_batchUpdateCount_<%=cid%>;
|
|
<%
|
|
}%>
|
|
}
|
|
<%
|
|
}
|
|
|
|
////////////commit every////////////
|
|
if(!("true").equals(useExistingConn)) {
|
|
if(!("").equals(commitEvery)&&!("0").equals(commitEvery)) {
|
|
%>
|
|
commitCounter_<%=cid%>++;
|
|
if(commitEvery_<%=cid%> <= commitCounter_<%=cid%>) {
|
|
<%
|
|
if (useBatch) {
|
|
%>
|
|
try {
|
|
if (batchSizeCounter_<%=cid%> > 0) {
|
|
int countSum_<%=cid%> = 0;
|
|
<%dbLog.batch().executeTry(dbLog.str(dataAction));%>
|
|
for(int countEach_<%=cid%>: pstmt_<%=cid %>.executeBatch()) {
|
|
countSum_<%=cid%> += (countEach_<%=cid%> < 0 ? 0 : countEach_<%=cid%>);
|
|
}
|
|
rowsToCommitCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%dbLog.batch().executeDone(dbLog.str(dataAction));%>
|
|
<%if (("INSERT").equals(dataAction)) {
|
|
%>
|
|
insertedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}else if (("UPDATE").equals(dataAction)) {
|
|
%>
|
|
updatedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}else if (("DELETE").equals(dataAction)) {
|
|
%>
|
|
deletedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}%>
|
|
batchSizeCounter_<%=cid%> = 0;
|
|
}
|
|
}catch (java.sql.BatchUpdateException e){
|
|
globalMap.put("<%=cid%>_ERROR_MESSAGE",e.getMessage());
|
|
<%if(("true").equals(dieOnError)) {
|
|
%>
|
|
throw(e);
|
|
<%
|
|
}else {
|
|
%>
|
|
int countSum_<%=cid%> = 0;
|
|
for(int countEach_<%=cid%>: e.getUpdateCounts()) {
|
|
countSum_<%=cid%> += (countEach_<%=cid%> < 0 ? 0 : countEach_<%=cid%>);
|
|
}
|
|
rowsToCommitCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%if (("INSERT").equals(dataAction)) {
|
|
%>
|
|
insertedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}else if (("UPDATE").equals(dataAction)) {
|
|
%>
|
|
updatedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}else if (("DELETE").equals(dataAction)) {
|
|
%>
|
|
deletedCount_<%=cid%> += countSum_<%=cid%>;
|
|
<%
|
|
}
|
|
dbLog.logPrintedException("e.getMessage()");
|
|
%>
|
|
System.err.println(e.getMessage());
|
|
<%
|
|
}%>
|
|
}
|
|
<%
|
|
}%>
|
|
if(rowsToCommitCount_<%=cid%> != 0){
|
|
<%dbLog.commit().commitTry(null, dbLog.var("rowsToCommitCount"));%>
|
|
}
|
|
conn_<%=cid%>.commit();
|
|
if(rowsToCommitCount_<%=cid%> != 0){
|
|
<%dbLog.commit().commitDone(null);%>
|
|
rowsToCommitCount_<%=cid%> = 0;
|
|
}
|
|
commitCounter_<%=cid%>=0;
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
%>
|