Compare commits
1 Commits
release/7.
...
wwang-tale
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b8c605661 |
@@ -96,6 +96,26 @@ if(columnList != null && columnList.size() > 0) {
|
||||
stmtStructure = getManager(dbmsId, cid).createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
|
||||
isDynamic = isDynamic && !getManager(dbmsId, cid).isDynamicColumnReplaced();
|
||||
}
|
||||
|
||||
boolean batch_update = "BULK_UPDATE".equals(dataAction);
|
||||
if(batch_update) {
|
||||
%>
|
||||
|
||||
<%if(isDynamic) {%>
|
||||
boolean flag_<%=cid%> = true;
|
||||
if(flag_<%=cid%>) {
|
||||
throw new java.lang.RuntimeException("batch update can't work with dynamic together, please change it");
|
||||
}
|
||||
<%
|
||||
} else if(rejectConnName!=null) {%>
|
||||
boolean flag_<%=cid%> = true;
|
||||
if(flag_<%=cid%>) {
|
||||
throw new java.lang.RuntimeException("batch update can't work with reject line together, please change it");
|
||||
}
|
||||
<%
|
||||
}
|
||||
|
||||
}
|
||||
%>
|
||||
|
||||
String dbschema_<%=cid%> = null;
|
||||
@@ -140,7 +160,7 @@ if(tableNameWithSchema){
|
||||
%>
|
||||
|
||||
<%
|
||||
if(("UPDATE").equals(dataAction) || ("INSERT_OR_UPDATE").equals(dataAction) || ("UPDATE_OR_INSERT").equals(dataAction)) {
|
||||
if(("UPDATE").equals(dataAction) || ("INSERT_OR_UPDATE").equals(dataAction) || ("UPDATE_OR_INSERT").equals(dataAction) || batch_update) {
|
||||
int updateKeyCount = 0;
|
||||
if(stmtStructure != null) {
|
||||
for(Column column : stmtStructure) {
|
||||
@@ -255,7 +275,7 @@ if(!("true").equals(useExistingConn)) {
|
||||
}
|
||||
}
|
||||
|
||||
if("INSERT".equals(dataAction) && extendedInsert) {
|
||||
if(("INSERT".equals(dataAction) && extendedInsert) || batch_update) {
|
||||
%>
|
||||
class Util_<%=cid%> {
|
||||
|
||||
@@ -310,20 +330,39 @@ if(columnList != null && columnList.size()>0) {
|
||||
StringBuilder updateSetStmt = actionSQLMap.get(UPDATE_SET_STMT);
|
||||
StringBuilder updateWhereStmt = actionSQLMap.get(UPDATE_WHERE_STMT);
|
||||
StringBuilder deleteWhereStmt = actionSQLMap.get(DELETE_WHERE_STMT);
|
||||
|
||||
if(("INSERT").equals(dataAction)) {
|
||||
if(!extendedInsert) {
|
||||
|
||||
StringBuilder insert_column_name_batch_update = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
|
||||
StringBuilder insert_value_stmt_batch_update = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
|
||||
|
||||
if(("INSERT").equals(dataAction) || batch_update) {
|
||||
if(!extendedInsert && !batch_update) {
|
||||
%>
|
||||
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement("INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES (<%=insertValueStmt.toString()%>)");
|
||||
<%
|
||||
} else { //entended insert
|
||||
} else { //entended insert or batch update
|
||||
if(batch_update) {
|
||||
%>
|
||||
String tmpTableName_<%=cid%> = "tmp_<%=cid%>_" + pid + Thread.currentThread().getId();
|
||||
java.sql.Statement stat_<%=cid%> = conn_<%=cid%>.createStatement();
|
||||
//redshift primary key allow duplicated, so ok now
|
||||
stat_<%=cid%>.execute("CREATE TEMP TABLE \"" + tmpTableName_<%=cid%> + "\" (like \"" + tableName_<%=cid%> + "\")");
|
||||
stat_<%=cid%>.close();
|
||||
|
||||
final String realTableName_<%=cid%> = tableName_<%=cid%>;
|
||||
tableName_<%=cid%> = tmpTableName_<%=cid%>;
|
||||
<%
|
||||
}
|
||||
%>
|
||||
int rowCount<%=cid%> = 0;
|
||||
class BufferLine_<%=cid%> {
|
||||
<%
|
||||
int count = 0;
|
||||
for(Column column : stmtStructure) {
|
||||
if(!column.isReplaced() && !column.isAddCol() && column.isInsertable()) {
|
||||
boolean insertable = column.isInsertable();
|
||||
if(batch_update) {
|
||||
insertable = column.isUpdateKey() || column.isUpdatable();
|
||||
}
|
||||
if(!column.isReplaced() && !column.isAddCol() && insertable) {
|
||||
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
||||
%>
|
||||
<%=typeToGenerate%> <%=column.getName()%>;
|
||||
@@ -342,7 +381,18 @@ if(columnList != null && columnList.size()>0) {
|
||||
java.util.List<BufferLine_<%=cid%>> exInsertColValueList<%=cid%> = new java.util.ArrayList();
|
||||
BufferLine_<%=cid%> exInsertColValue<%=cid%> = null;
|
||||
|
||||
String valueList_<%=cid%> = "(<%=insertValueStmt.toString()%>)";
|
||||
<%
|
||||
StringBuilder column_list_in_sql = null;
|
||||
StringBuilder value_list_in_sql = null;
|
||||
if(batch_update){
|
||||
column_list_in_sql = insert_column_name_batch_update;
|
||||
value_list_in_sql = insert_value_stmt_batch_update;
|
||||
} else {
|
||||
column_list_in_sql = insertColName;
|
||||
value_list_in_sql = insertValueStmt;
|
||||
}
|
||||
%>
|
||||
String valueList_<%=cid%> = "(<%=value_list_in_sql.toString()%>)";
|
||||
numPerInsert_<%=cid%> = util_<%=cid%>.correctNumPerInsert(valueList_<%=cid%>, numPerInsert_<%=cid%>);
|
||||
|
||||
StringBuilder extendInsertValueStmt_<%=cid%> = new StringBuilder();
|
||||
@@ -351,7 +401,7 @@ if(columnList != null && columnList.size()>0) {
|
||||
if (i_<%=cid%>!=numPerInsert_<%=cid%>-1) extendInsertValueStmt_<%=cid%>.append(",");
|
||||
}
|
||||
|
||||
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES " + extendInsertValueStmt_<%=cid%>.toString();
|
||||
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=column_list_in_sql.toString()%>) VALUES " + extendInsertValueStmt_<%=cid%>.toString();
|
||||
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement(insert_<%=cid%>);
|
||||
<%
|
||||
}
|
||||
|
||||
@@ -94,12 +94,14 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
StringBuilder updateSetStmt = actionSQLMap.get(UPDATE_SET_STMT);
|
||||
StringBuilder updateWhereStmt = actionSQLMap.get(UPDATE_WHERE_STMT);
|
||||
StringBuilder deleteWhereStmt = actionSQLMap.get(DELETE_WHERE_STMT);
|
||||
|
||||
|
||||
|
||||
|
||||
StringBuilder insert_column_name_batch_update = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
|
||||
StringBuilder insert_value_stmt_batch_update = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
|
||||
|
||||
String numPerInsert = ElementParameterParser.getValue(node, "__NB_ROWS_PER_INSERT__");
|
||||
|
||||
if(extendedInsert){
|
||||
boolean batch_update = "BULK_UPDATE".equals(dataAction);
|
||||
if(extendedInsert || batch_update){
|
||||
class ExtendInsertOperation{
|
||||
public String generateType(String typeToGenerate){
|
||||
if(("byte[]").equals(typeToGenerate)){
|
||||
@@ -243,10 +245,20 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
if(!("true").equals(useExistingConn)) {
|
||||
if(!("").equals(commitEvery) && !("0").equals(commitEvery)) {
|
||||
%>
|
||||
commitCounter_<%=cid%> = rowCount<%=cid%>;//correct the commit counter as commit can happen before executing the sql for extend insert
|
||||
commitCounter_<%=cid%> += rowCount<%=cid%>;//correct the commit counter as commit can happen before executing the sql for extend insert
|
||||
<%
|
||||
}
|
||||
}
|
||||
|
||||
StringBuilder column_list_in_sql = null;
|
||||
StringBuilder value_list_in_sql = null;
|
||||
if(batch_update){
|
||||
column_list_in_sql = insert_column_name_batch_update;
|
||||
value_list_in_sql = insert_value_stmt_batch_update;
|
||||
} else {
|
||||
column_list_in_sql = insertColName;
|
||||
value_list_in_sql = insertValueStmt;
|
||||
}
|
||||
%>
|
||||
|
||||
if(rowCount<%=cid%>!=0){
|
||||
@@ -269,8 +281,9 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
%>
|
||||
extendInsertValue_<%=cid%>.append("(<%=insertValueStmt.toString()%>)");
|
||||
extendInsertValue_<%=cid%>.append("(<%=value_list_in_sql.toString()%>)");
|
||||
<%
|
||||
}
|
||||
%>
|
||||
@@ -293,7 +306,7 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
}
|
||||
} else {
|
||||
%>
|
||||
insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES "+extendInsertValue_<%=cid%>.toString();
|
||||
insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=column_list_in_sql.toString()%>) VALUES "+extendInsertValue_<%=cid%>.toString();
|
||||
<%
|
||||
}
|
||||
%>
|
||||
@@ -301,7 +314,8 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
<%
|
||||
int insertableCount = 0;
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
|
||||
if(insertable) {
|
||||
insertableCount++;
|
||||
}
|
||||
}
|
||||
@@ -322,7 +336,8 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
+ 1;
|
||||
<%
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
|
||||
if(insertable) {
|
||||
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
||||
eiOperation.generateSetStmt(typeToGenerate, column, cid);
|
||||
%>
|
||||
@@ -348,6 +363,25 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
}
|
||||
<%
|
||||
} // end extended insert case
|
||||
|
||||
if(batch_update) {
|
||||
%>
|
||||
//make it back
|
||||
tableName_<%=cid%> = realTableName_<%=cid%>;
|
||||
|
||||
java.sql.Statement stmtUpdateBulk_<%=cid%> = conn_<%=cid%>.createStatement();
|
||||
updatedCount_<%=cid%> = stmtUpdateBulk_<%=cid%>.executeUpdate("<%=getManager(dbmsId, cid).getUpdateBulkSQL4Output(stmtStructure)%>");
|
||||
stmtUpdateBulk_<%=cid%>.close();
|
||||
|
||||
if(!conn_<%=cid%>.getAutoCommit()) {
|
||||
conn_<%=cid%>.commit();
|
||||
}
|
||||
|
||||
java.sql.Statement stmtTmpDrop_<%=cid%> = conn_<%=cid%>.createStatement();
|
||||
stmtTmpDrop_<%=cid%>.execute("DROP TABLE \"" + tmpTableName_<%=cid%> + "\"");
|
||||
stmtTmpDrop_<%=cid%>.close();
|
||||
<%
|
||||
}
|
||||
|
||||
|
||||
if(("INSERT_OR_UPDATE").equals(dataAction)) {
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
<CONNECTORS>
|
||||
<CONNECTOR CTYPE="FLOW" MAX_INPUT="1" MAX_OUTPUT="1"/>
|
||||
<CONNECTOR NAME="REJECT" CTYPE="FLOW" MAX_INPUT="0" MAX_OUTPUT="1" LINE_STYLE="2" COLOR="FF0000" BASE_SCHEMA="FLOW"
|
||||
NOT_SHOW_IF="((DATA_ACTION == 'INSERT' AND EXTENDINSERT == 'true') OR (DIE_ON_ERROR == 'true')) OR (isShow[BATCH_SIZE])"/>
|
||||
NOT_SHOW_IF="((DATA_ACTION == 'INSERT' AND EXTENDINSERT == 'true') OR (DIE_ON_ERROR == 'true')) OR (isShow[BATCH_SIZE]) OR (DATA_ACTION == 'BULK_UPDATE')"/>
|
||||
<CONNECTOR CTYPE="ITERATE" MAX_OUTPUT="0" MAX_INPUT="0"/>
|
||||
<CONNECTOR CTYPE="SUBJOB_OK" MAX_INPUT="1" />
|
||||
<CONNECTOR CTYPE="SUBJOB_ERROR" MAX_INPUT="1" />
|
||||
@@ -185,6 +185,7 @@
|
||||
<ITEM NAME="INSERT_OR_UPDATE" VALUE="INSERT_OR_UPDATE"/>
|
||||
<ITEM NAME="UPDATE_OR_INSERT" VALUE="UPDATE_OR_INSERT"/>
|
||||
<ITEM NAME="DELETE" VALUE="DELETE"/>
|
||||
<ITEM NAME="BULK_UPDATE" VALUE="BULK_UPDATE"/>
|
||||
</ITEMS>
|
||||
</PARAMETER>
|
||||
|
||||
@@ -250,7 +251,7 @@
|
||||
FIELD="TEXT"
|
||||
NUM_ROW="8"
|
||||
REQUIRED="true"
|
||||
SHOW_IF="(EXTENDINSERT == 'true' AND DATA_ACTION == 'INSERT')"
|
||||
SHOW_IF="(EXTENDINSERT == 'true' AND DATA_ACTION == 'INSERT') OR (DATA_ACTION == 'BULK_UPDATE')"
|
||||
>
|
||||
<DEFAULT>100</DEFAULT>
|
||||
</PARAMETER>
|
||||
|
||||
@@ -297,8 +297,9 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
%>
|
||||
whetherReject_<%=cid%> = false;
|
||||
<%
|
||||
if(("INSERT").equals(dataAction)) {
|
||||
if(!extendedInsert) {
|
||||
boolean batch_update = "BULK_UPDATE".equals(dataAction);
|
||||
if("INSERT".equals(dataAction) || batch_update) {
|
||||
if(!extendedInsert && !batch_update) {
|
||||
int counter = 1;
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
@@ -369,7 +370,7 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
}
|
||||
%>
|
||||
<%
|
||||
} else { //extended insert
|
||||
} else { //extended insert or batch update
|
||||
class ExtendInsertOperation{
|
||||
public String generateType(String typeToGenerate){
|
||||
if(("byte[]").equals(typeToGenerate)){
|
||||
@@ -511,9 +512,10 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
ExtendInsertOperation eiOperation = new ExtendInsertOperation();
|
||||
int insertableCount = 0;
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
insertableCount++;
|
||||
}
|
||||
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
|
||||
if(insertable) {
|
||||
insertableCount++;
|
||||
}
|
||||
}
|
||||
%>
|
||||
int counter<%=cid%> = rowCount<%=cid%> *
|
||||
@@ -530,7 +532,8 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
+ 1;
|
||||
<%
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
|
||||
if(insertable) {
|
||||
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
|
||||
eiOperation.generateSetStmt(typeToGenerate,column,incomingConnName,cid);
|
||||
%>
|
||||
@@ -553,7 +556,8 @@ skeleton="../templates/db_output_bulk.skeleton"
|
||||
<%
|
||||
int count = 0;
|
||||
for(Column column : colStruct) {
|
||||
if(column.isInsertable()) {
|
||||
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
|
||||
if(insertable) {
|
||||
if(count==0) {
|
||||
%>
|
||||
exInsertColValue<%=cid%>
|
||||
|
||||
@@ -20,12 +20,15 @@ ADD_COLS.ITEM.SQL=SQL expression
|
||||
ADD_COLS.NAME=Additional columns
|
||||
CLEAR_TABLE.NAME=Clear data in table
|
||||
COMMIT_EVERY.NAME=Commit every
|
||||
|
||||
DATA_ACTION.ITEM.DELETE=Delete
|
||||
DATA_ACTION.ITEM.INSERT=Insert
|
||||
DATA_ACTION.ITEM.INSERT_OR_UPDATE=Insert or update
|
||||
DATA_ACTION.ITEM.UPDATE=Update
|
||||
DATA_ACTION.ITEM.UPDATE_OR_INSERT=Update or insert
|
||||
DATA_ACTION.ITEM.BULK_UPDATE=Bulk update
|
||||
DATA_ACTION.NAME=Action on data
|
||||
|
||||
DBD-ODBC.INFO=Required for ODBC-like connection
|
||||
DBD-Oracle.INFO=Required for Oracle
|
||||
DBD-Pg.INFO=Required for Redshift
|
||||
|
||||
@@ -10,6 +10,11 @@ public class CLASS {
|
||||
public static final String FIRST_DELETE_KEY = "firstDeleteKeyColumn";
|
||||
public static final String FIRST_INSERT_COLUMN = "firstInsertColumn";
|
||||
public static final String FIRST_UPDATE_COLUMN = "firstUpdateColumn";
|
||||
|
||||
public static final String FIRST_BULK_UPDATE_COLUMN = "first_batch_update_column";
|
||||
public static final String INSERT_COLUMN_NAME_BULK_UPDATE = "insert_column_name_batch_update";
|
||||
public static final String INSERT_VALUE_STMT_BULK_UPDATE = "insert_value_stmt_batch_update";
|
||||
|
||||
public static final int NORMAL_TYPE = 0;
|
||||
public static final int INSERT_TYPE = 1;
|
||||
public static final int UPDATE_TYPE = 2;
|
||||
@@ -558,6 +563,10 @@ public class CLASS {
|
||||
return updateBulkSQL.toString() + updateSetStmt.toString() + updateWhereStmt.toString();
|
||||
}
|
||||
|
||||
public String getUpdateBulkSQL4Output(List<Column> columnList) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<Column> createColumnList(List<IMetadataColumn> columnList, boolean useFieldOptions, List<Map<String, String>> fieldOptions, List<Map<String, String>> addCols, boolean isSpecifyIdentityKey, String identityKey, int startValue, int step) {
|
||||
List<Column> stmtStructure = createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
|
||||
if(isSpecifyIdentityKey) {
|
||||
@@ -661,6 +670,10 @@ public class CLASS {
|
||||
actionSQLMap.put(FIRST_DELETE_KEY, new StringBuilder());
|
||||
actionSQLMap.put(FIRST_INSERT_COLUMN, new StringBuilder());
|
||||
actionSQLMap.put(FIRST_UPDATE_COLUMN, new StringBuilder());
|
||||
|
||||
actionSQLMap.put(FIRST_BULK_UPDATE_COLUMN, new StringBuilder());
|
||||
actionSQLMap.put(INSERT_COLUMN_NAME_BULK_UPDATE, new StringBuilder());
|
||||
actionSQLMap.put(INSERT_VALUE_STMT_BULK_UPDATE, new StringBuilder());
|
||||
} else {
|
||||
for(Column column : stmtStructure) {
|
||||
if(column.isReplaced()) {
|
||||
@@ -719,8 +732,23 @@ public class CLASS {
|
||||
if(firstUpdateColumn == null) {
|
||||
firstUpdateColumn = new StringBuilder("true");
|
||||
}
|
||||
|
||||
StringBuilder firstBatchUpdateColumn = actionSQLMap.get(FIRST_BULK_UPDATE_COLUMN);
|
||||
if(firstBatchUpdateColumn == null) {
|
||||
firstBatchUpdateColumn = new StringBuilder("true");
|
||||
}
|
||||
StringBuilder insertColNameBatchUpdate = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
|
||||
if(insertColNameBatchUpdate == null) {
|
||||
insertColNameBatchUpdate = new StringBuilder();
|
||||
}
|
||||
StringBuilder insertValueStmtBatchUpdate = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
|
||||
if(insertValueStmtBatchUpdate == null) {
|
||||
insertValueStmtBatchUpdate = new StringBuilder();
|
||||
}
|
||||
|
||||
String suffix = null;
|
||||
String separate = null;
|
||||
|
||||
String identityKey = ElementParameterParser.getValue(node, "__IDENTITY_FIELD__");
|
||||
if(!(isSpecifyIdentityKey && (column.getName().equals(identityKey))) && column.isInsertable() && !column.isDynamic()) {
|
||||
if(firstInsertColumn.toString().equals("true")) {
|
||||
@@ -734,6 +762,20 @@ public class CLASS {
|
||||
insertValueStmt.append(suffix);
|
||||
insertValueStmt.append(column.getSqlStmt());
|
||||
}
|
||||
|
||||
if((column.isUpdateKey() || column.isUpdatable()) && !column.isDynamic()) {
|
||||
if(firstBatchUpdateColumn.toString().equals("true")) {
|
||||
suffix = "";
|
||||
firstBatchUpdateColumn = new StringBuilder("false");
|
||||
} else {
|
||||
suffix = ",";
|
||||
}
|
||||
insertColNameBatchUpdate.append(suffix);
|
||||
insertColNameBatchUpdate.append(getLProtectedChar(column.getColumnName()) + column.getColumnName() + getRProtectedChar(column.getColumnName()));
|
||||
insertValueStmtBatchUpdate.append(suffix);
|
||||
insertValueStmtBatchUpdate.append(column.getSqlStmt());
|
||||
}
|
||||
|
||||
if(column.isUpdatable() && !column.isDynamic()) {
|
||||
if(firstUpdateColumn.toString().equals("true")) {
|
||||
suffix = "";
|
||||
@@ -778,6 +820,11 @@ public class CLASS {
|
||||
actionSQLMap.put(FIRST_DELETE_KEY, firstDeleteKeyColumn);
|
||||
actionSQLMap.put(FIRST_INSERT_COLUMN, firstInsertColumn);
|
||||
actionSQLMap.put(FIRST_UPDATE_COLUMN, firstUpdateColumn);
|
||||
|
||||
actionSQLMap.put(FIRST_BULK_UPDATE_COLUMN, firstBatchUpdateColumn);
|
||||
actionSQLMap.put(INSERT_COLUMN_NAME_BULK_UPDATE, insertColNameBatchUpdate);
|
||||
actionSQLMap.put(INSERT_VALUE_STMT_BULK_UPDATE, insertValueStmtBatchUpdate);
|
||||
|
||||
return actionSQLMap;
|
||||
}
|
||||
public String getGenerateType(String typeToGenerate) {
|
||||
@@ -3029,6 +3076,77 @@ public class CLASS {
|
||||
}
|
||||
return createSQL.toString();
|
||||
}
|
||||
|
||||
public String getUpdateBulkSQL4Output(List<Column> columnList) {
|
||||
StringBuilder updateBulkSQL = new StringBuilder();
|
||||
StringBuilder updateSetStmt = new StringBuilder();
|
||||
StringBuilder updateWhereStmt = new StringBuilder();
|
||||
updateBulkSQL.append("UPDATE " + getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar());
|
||||
boolean firstKeyColumn = true;
|
||||
boolean firstUpdateColumn = true;
|
||||
String keySeparator = null;
|
||||
String updateSeparator = null;
|
||||
|
||||
List<Column> fullColumnList = new java.util.ArrayList<Column>();
|
||||
for(Column column : columnList) {
|
||||
if(column.isReplaced()) {
|
||||
List<Column> replacedColumns = column.getReplacement();
|
||||
for(Column replacedColumn : replacedColumns) {
|
||||
fullColumnList.add(replacedColumn);
|
||||
}
|
||||
} else {
|
||||
fullColumnList.add(column);
|
||||
}
|
||||
}
|
||||
|
||||
for(Column column : fullColumnList) {
|
||||
if(column.isDynamic()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if(column.isUpdateKey()) {
|
||||
if(firstKeyColumn) {
|
||||
keySeparator = "";
|
||||
firstKeyColumn = false;
|
||||
updateWhereStmt.append(" FROM " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + " WHERE \" + \"");
|
||||
} else {
|
||||
keySeparator = " AND ";
|
||||
}
|
||||
updateWhereStmt.append(keySeparator);
|
||||
updateWhereStmt.append(getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " = " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
|
||||
} else if(column.isUpdatable()) {
|
||||
if(firstUpdateColumn) {
|
||||
updateSeparator = "";
|
||||
firstUpdateColumn = false;
|
||||
updateSetStmt.append(" SET \" + \"");
|
||||
} else {
|
||||
updateSeparator = ", ";
|
||||
}
|
||||
updateSetStmt.append(updateSeparator);
|
||||
updateSetStmt.append(getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " = " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
|
||||
}
|
||||
}
|
||||
|
||||
if(!firstUpdateColumn) {
|
||||
updateWhereStmt.append(" AND (");
|
||||
boolean first = true;
|
||||
for(Column column : fullColumnList) {
|
||||
if(!column.isUpdatable() || column.isDynamic()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if(!first) {
|
||||
updateWhereStmt.append(" OR ");
|
||||
} else {
|
||||
first = false;
|
||||
}
|
||||
updateWhereStmt.append(getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " != " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
|
||||
}
|
||||
updateWhereStmt.append(")");
|
||||
}
|
||||
|
||||
return updateBulkSQL.toString() + updateSetStmt.toString() + updateWhereStmt.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public class GreenplumManager extends PostgrePlusManager {
|
||||
|
||||
@@ -2987,6 +2987,14 @@
|
||||
name="RepositoryProjectDate"
|
||||
version="7.0.0">
|
||||
</projecttask>
|
||||
<projecttask
|
||||
beforeLogon="false"
|
||||
breaks="7.0.0"
|
||||
class="org.talend.repository.model.migration.ChangeUpdate2BulkUpdate4tRedshiftOutput"
|
||||
id="org.talend.repository.model.migration.ChangeUpdate2BulkUpdate4tRedshiftOutput"
|
||||
name="Change the update mode to bulk update mode"
|
||||
version="7.0.0">
|
||||
</projecttask>
|
||||
</extension>
|
||||
|
||||
<extension
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
// ============================================================================
|
||||
//
|
||||
// Copyright (C) 2006-2017 Talend Inc. - www.talend.com
|
||||
//
|
||||
// This source code is available under agreement available at
|
||||
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
|
||||
//
|
||||
// You should have received a copy of the agreement
|
||||
// along with this program; if not, write to Talend SA
|
||||
// 9 rue Pages 92150 Suresnes, France
|
||||
//
|
||||
// ============================================================================
|
||||
package org.talend.repository.model.migration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.emf.common.util.EList;
|
||||
import org.eclipse.emf.ecore.EObject;
|
||||
import org.talend.commons.exception.ExceptionHandler;
|
||||
import org.talend.commons.exception.PersistenceException;
|
||||
import org.talend.core.model.components.ComponentUtilities;
|
||||
import org.talend.core.model.components.ModifyComponentsAction;
|
||||
import org.talend.core.model.components.conversions.IComponentConversion;
|
||||
import org.talend.core.model.components.filters.IComponentFilter;
|
||||
import org.talend.core.model.components.filters.NameComponentFilter;
|
||||
import org.talend.core.model.migration.AbstractJobMigrationTask;
|
||||
import org.talend.core.model.process.EConnectionType;
|
||||
import org.talend.core.model.properties.Item;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.ColumnType;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.ConnectionType;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.ElementParameterType;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.MetadataType;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.NodeType;
|
||||
import org.talend.designer.core.model.utils.emf.talendfile.ProcessType;
|
||||
|
||||
/**
|
||||
* DOC Administrator class global comment. Detailled comment
|
||||
*/
|
||||
public class ChangeUpdate2BulkUpdate4tRedshiftOutput extends AbstractJobMigrationTask {
|
||||
|
||||
@Override
|
||||
public ExecutionResult execute(Item item) {
|
||||
final ProcessType processType = getProcessType(item);
|
||||
String[] compNames = { "tRedshiftOutput" }; //$NON-NLS-1$
|
||||
|
||||
IComponentConversion doMigration = new IComponentConversion() {
|
||||
|
||||
public void transform(NodeType node) {
|
||||
if (node == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ElementParameterType data_action = ComponentUtilities.getNodeProperty(node, "DATA_ACTION"); //$NON-NLS-1$
|
||||
|
||||
if (data_action == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String action = data_action.getValue();
|
||||
|
||||
if (action == null || !"UPDATE".equals(action)) {//$NON-NLS-1$
|
||||
return;
|
||||
}
|
||||
|
||||
if (isDynamicSchema(node)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isRejectLineExists(node)) {
|
||||
return;
|
||||
}
|
||||
|
||||
data_action.setValue("BULK_UPDATE");//$NON-NLS-1$
|
||||
|
||||
ElementParameterType nb_rows_per_insert = ComponentUtilities.getNodeProperty(node, "NB_ROWS_PER_INSERT"); //$NON-NLS-1$
|
||||
if (nb_rows_per_insert == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
nb_rows_per_insert.setValue("1000");
|
||||
}
|
||||
|
||||
private boolean isRejectLineExists(NodeType node) {
|
||||
List<ConnectionType> list = ComponentUtilities.getNodeOutputConnections(node);
|
||||
for (ConnectionType connType : list) {
|
||||
EConnectionType eConnType = EConnectionType.getTypeFromId(connType.getLineStyle());
|
||||
if (eConnType == EConnectionType.FLOW_MAIN && connType.getConnectorName().equals("REJECT")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isDynamicSchema(NodeType node) {
|
||||
EList<EObject> list = node.eContents();
|
||||
for (EObject object : list) {
|
||||
if (object instanceof MetadataType) {
|
||||
MetadataType flow = (MetadataType) object;
|
||||
if ("FLOW".equalsIgnoreCase(flow.getConnector())) {
|
||||
Iterator<?> columns = flow.getColumn().iterator();
|
||||
while (columns.hasNext()) {
|
||||
Object outColumn = columns.next();
|
||||
if (outColumn instanceof ColumnType) {
|
||||
ColumnType column = ((ColumnType) outColumn);
|
||||
if ("id_Dynamic".equals(column.getType())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
for (String name : compNames) {
|
||||
IComponentFilter filter = new NameComponentFilter(name);
|
||||
|
||||
try {
|
||||
ModifyComponentsAction.searchAndModify(item, processType, filter, Arrays.<IComponentConversion> asList(doMigration));
|
||||
} catch (PersistenceException e) {
|
||||
ExceptionHandler.process(e);
|
||||
return ExecutionResult.FAILURE;
|
||||
}
|
||||
}
|
||||
|
||||
return ExecutionResult.SUCCESS_NO_ALERT;
|
||||
|
||||
}
|
||||
|
||||
public Date getOrder() {
|
||||
GregorianCalendar gc = new GregorianCalendar(2018, 0, 26, 16, 0, 0);
|
||||
return gc.getTime();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user