Compare commits

...

1 Commits

Author SHA1 Message Date
wang wei
0b8c605661 fix(TDI-39942): Job when ran from 6.4.1 does not complete but does when
run from original 6.1.1
2018-01-26 15:50:22 +08:00
8 changed files with 388 additions and 28 deletions

View File

@@ -96,6 +96,26 @@ if(columnList != null && columnList.size() > 0) {
stmtStructure = getManager(dbmsId, cid).createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
isDynamic = isDynamic && !getManager(dbmsId, cid).isDynamicColumnReplaced();
}
boolean batch_update = "BULK_UPDATE".equals(dataAction);
if(batch_update) {
%>
<%if(isDynamic) {%>
boolean flag_<%=cid%> = true;
if(flag_<%=cid%>) {
throw new java.lang.RuntimeException("batch update can't work with dynamic together, please change it");
}
<%
} else if(rejectConnName!=null) {%>
boolean flag_<%=cid%> = true;
if(flag_<%=cid%>) {
throw new java.lang.RuntimeException("batch update can't work with reject line together, please change it");
}
<%
}
}
%>
String dbschema_<%=cid%> = null;
@@ -140,7 +160,7 @@ if(tableNameWithSchema){
%>
<%
if(("UPDATE").equals(dataAction) || ("INSERT_OR_UPDATE").equals(dataAction) || ("UPDATE_OR_INSERT").equals(dataAction)) {
if(("UPDATE").equals(dataAction) || ("INSERT_OR_UPDATE").equals(dataAction) || ("UPDATE_OR_INSERT").equals(dataAction) || batch_update) {
int updateKeyCount = 0;
if(stmtStructure != null) {
for(Column column : stmtStructure) {
@@ -255,7 +275,7 @@ if(!("true").equals(useExistingConn)) {
}
}
if("INSERT".equals(dataAction) && extendedInsert) {
if(("INSERT".equals(dataAction) && extendedInsert) || batch_update) {
%>
class Util_<%=cid%> {
@@ -310,20 +330,39 @@ if(columnList != null && columnList.size()>0) {
StringBuilder updateSetStmt = actionSQLMap.get(UPDATE_SET_STMT);
StringBuilder updateWhereStmt = actionSQLMap.get(UPDATE_WHERE_STMT);
StringBuilder deleteWhereStmt = actionSQLMap.get(DELETE_WHERE_STMT);
if(("INSERT").equals(dataAction)) {
if(!extendedInsert) {
StringBuilder insert_column_name_batch_update = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
StringBuilder insert_value_stmt_batch_update = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
if(("INSERT").equals(dataAction) || batch_update) {
if(!extendedInsert && !batch_update) {
%>
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement("INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES (<%=insertValueStmt.toString()%>)");
<%
} else { //entended insert
} else { //entended insert or batch update
if(batch_update) {
%>
String tmpTableName_<%=cid%> = "tmp_<%=cid%>_" + pid + Thread.currentThread().getId();
java.sql.Statement stat_<%=cid%> = conn_<%=cid%>.createStatement();
//redshift primary key allow duplicated, so ok now
stat_<%=cid%>.execute("CREATE TEMP TABLE \"" + tmpTableName_<%=cid%> + "\" (like \"" + tableName_<%=cid%> + "\")");
stat_<%=cid%>.close();
final String realTableName_<%=cid%> = tableName_<%=cid%>;
tableName_<%=cid%> = tmpTableName_<%=cid%>;
<%
}
%>
int rowCount<%=cid%> = 0;
class BufferLine_<%=cid%> {
<%
int count = 0;
for(Column column : stmtStructure) {
if(!column.isReplaced() && !column.isAddCol() && column.isInsertable()) {
boolean insertable = column.isInsertable();
if(batch_update) {
insertable = column.isUpdateKey() || column.isUpdatable();
}
if(!column.isReplaced() && !column.isAddCol() && insertable) {
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
%>
<%=typeToGenerate%> <%=column.getName()%>;
@@ -342,7 +381,18 @@ if(columnList != null && columnList.size()>0) {
java.util.List<BufferLine_<%=cid%>> exInsertColValueList<%=cid%> = new java.util.ArrayList();
BufferLine_<%=cid%> exInsertColValue<%=cid%> = null;
String valueList_<%=cid%> = "(<%=insertValueStmt.toString()%>)";
<%
StringBuilder column_list_in_sql = null;
StringBuilder value_list_in_sql = null;
if(batch_update){
column_list_in_sql = insert_column_name_batch_update;
value_list_in_sql = insert_value_stmt_batch_update;
} else {
column_list_in_sql = insertColName;
value_list_in_sql = insertValueStmt;
}
%>
String valueList_<%=cid%> = "(<%=value_list_in_sql.toString()%>)";
numPerInsert_<%=cid%> = util_<%=cid%>.correctNumPerInsert(valueList_<%=cid%>, numPerInsert_<%=cid%>);
StringBuilder extendInsertValueStmt_<%=cid%> = new StringBuilder();
@@ -351,7 +401,7 @@ if(columnList != null && columnList.size()>0) {
if (i_<%=cid%>!=numPerInsert_<%=cid%>-1) extendInsertValueStmt_<%=cid%>.append(",");
}
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES " + extendInsertValueStmt_<%=cid%>.toString();
String insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=column_list_in_sql.toString()%>) VALUES " + extendInsertValueStmt_<%=cid%>.toString();
java.sql.PreparedStatement pstmt_<%=cid %> = conn_<%=cid%>.prepareStatement(insert_<%=cid%>);
<%
}

View File

@@ -94,12 +94,14 @@ skeleton="../templates/db_output_bulk.skeleton"
StringBuilder updateSetStmt = actionSQLMap.get(UPDATE_SET_STMT);
StringBuilder updateWhereStmt = actionSQLMap.get(UPDATE_WHERE_STMT);
StringBuilder deleteWhereStmt = actionSQLMap.get(DELETE_WHERE_STMT);
StringBuilder insert_column_name_batch_update = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
StringBuilder insert_value_stmt_batch_update = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
String numPerInsert = ElementParameterParser.getValue(node, "__NB_ROWS_PER_INSERT__");
if(extendedInsert){
boolean batch_update = "BULK_UPDATE".equals(dataAction);
if(extendedInsert || batch_update){
class ExtendInsertOperation{
public String generateType(String typeToGenerate){
if(("byte[]").equals(typeToGenerate)){
@@ -243,10 +245,20 @@ skeleton="../templates/db_output_bulk.skeleton"
if(!("true").equals(useExistingConn)) {
if(!("").equals(commitEvery) && !("0").equals(commitEvery)) {
%>
commitCounter_<%=cid%> = rowCount<%=cid%>;//correct the commit counter as commit can happen before executing the sql for extend insert
commitCounter_<%=cid%> += rowCount<%=cid%>;//correct the commit counter as commit can happen before executing the sql for extend insert
<%
}
}
StringBuilder column_list_in_sql = null;
StringBuilder value_list_in_sql = null;
if(batch_update){
column_list_in_sql = insert_column_name_batch_update;
value_list_in_sql = insert_value_stmt_batch_update;
} else {
column_list_in_sql = insertColName;
value_list_in_sql = insertValueStmt;
}
%>
if(rowCount<%=cid%>!=0){
@@ -269,8 +281,9 @@ skeleton="../templates/db_output_bulk.skeleton"
}
}
} else {
%>
extendInsertValue_<%=cid%>.append("(<%=insertValueStmt.toString()%>)");
extendInsertValue_<%=cid%>.append("(<%=value_list_in_sql.toString()%>)");
<%
}
%>
@@ -293,7 +306,7 @@ skeleton="../templates/db_output_bulk.skeleton"
}
} else {
%>
insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=insertColName.toString()%>) VALUES "+extendInsertValue_<%=cid%>.toString();
insert_<%=cid%> = "INSERT INTO \"" + tableName_<%=cid%> + "\" (<%=column_list_in_sql.toString()%>) VALUES "+extendInsertValue_<%=cid%>.toString();
<%
}
%>
@@ -301,7 +314,8 @@ skeleton="../templates/db_output_bulk.skeleton"
<%
int insertableCount = 0;
for(Column column : colStruct) {
if(column.isInsertable()) {
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
if(insertable) {
insertableCount++;
}
}
@@ -322,7 +336,8 @@ skeleton="../templates/db_output_bulk.skeleton"
+ 1;
<%
for(Column column : colStruct) {
if(column.isInsertable()) {
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
if(insertable) {
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
eiOperation.generateSetStmt(typeToGenerate, column, cid);
%>
@@ -348,6 +363,25 @@ skeleton="../templates/db_output_bulk.skeleton"
}
<%
} // end extended insert case
if(batch_update) {
%>
//make it back
tableName_<%=cid%> = realTableName_<%=cid%>;
java.sql.Statement stmtUpdateBulk_<%=cid%> = conn_<%=cid%>.createStatement();
updatedCount_<%=cid%> = stmtUpdateBulk_<%=cid%>.executeUpdate("<%=getManager(dbmsId, cid).getUpdateBulkSQL4Output(stmtStructure)%>");
stmtUpdateBulk_<%=cid%>.close();
if(!conn_<%=cid%>.getAutoCommit()) {
conn_<%=cid%>.commit();
}
java.sql.Statement stmtTmpDrop_<%=cid%> = conn_<%=cid%>.createStatement();
stmtTmpDrop_<%=cid%>.execute("DROP TABLE \"" + tmpTableName_<%=cid%> + "\"");
stmtTmpDrop_<%=cid%>.close();
<%
}
if(("INSERT_OR_UPDATE").equals(dataAction)) {

View File

@@ -30,7 +30,7 @@
<CONNECTORS>
<CONNECTOR CTYPE="FLOW" MAX_INPUT="1" MAX_OUTPUT="1"/>
<CONNECTOR NAME="REJECT" CTYPE="FLOW" MAX_INPUT="0" MAX_OUTPUT="1" LINE_STYLE="2" COLOR="FF0000" BASE_SCHEMA="FLOW"
NOT_SHOW_IF="((DATA_ACTION == 'INSERT' AND EXTENDINSERT == 'true') OR (DIE_ON_ERROR == 'true')) OR (isShow[BATCH_SIZE])"/>
NOT_SHOW_IF="((DATA_ACTION == 'INSERT' AND EXTENDINSERT == 'true') OR (DIE_ON_ERROR == 'true')) OR (isShow[BATCH_SIZE]) OR (DATA_ACTION == 'BULK_UPDATE')"/>
<CONNECTOR CTYPE="ITERATE" MAX_OUTPUT="0" MAX_INPUT="0"/>
<CONNECTOR CTYPE="SUBJOB_OK" MAX_INPUT="1" />
<CONNECTOR CTYPE="SUBJOB_ERROR" MAX_INPUT="1" />
@@ -185,6 +185,7 @@
<ITEM NAME="INSERT_OR_UPDATE" VALUE="INSERT_OR_UPDATE"/>
<ITEM NAME="UPDATE_OR_INSERT" VALUE="UPDATE_OR_INSERT"/>
<ITEM NAME="DELETE" VALUE="DELETE"/>
<ITEM NAME="BULK_UPDATE" VALUE="BULK_UPDATE"/>
</ITEMS>
</PARAMETER>
@@ -250,7 +251,7 @@
FIELD="TEXT"
NUM_ROW="8"
REQUIRED="true"
SHOW_IF="(EXTENDINSERT == 'true' AND DATA_ACTION == 'INSERT')"
SHOW_IF="(EXTENDINSERT == 'true' AND DATA_ACTION == 'INSERT') OR (DATA_ACTION == 'BULK_UPDATE')"
>
<DEFAULT>100</DEFAULT>
</PARAMETER>

View File

@@ -297,8 +297,9 @@ skeleton="../templates/db_output_bulk.skeleton"
%>
whetherReject_<%=cid%> = false;
<%
if(("INSERT").equals(dataAction)) {
if(!extendedInsert) {
boolean batch_update = "BULK_UPDATE".equals(dataAction);
if("INSERT".equals(dataAction) || batch_update) {
if(!extendedInsert && !batch_update) {
int counter = 1;
for(Column column : colStruct) {
if(column.isInsertable()) {
@@ -369,7 +370,7 @@ skeleton="../templates/db_output_bulk.skeleton"
}
%>
<%
} else { //extended insert
} else { //extended insert or batch update
class ExtendInsertOperation{
public String generateType(String typeToGenerate){
if(("byte[]").equals(typeToGenerate)){
@@ -511,9 +512,10 @@ skeleton="../templates/db_output_bulk.skeleton"
ExtendInsertOperation eiOperation = new ExtendInsertOperation();
int insertableCount = 0;
for(Column column : colStruct) {
if(column.isInsertable()) {
insertableCount++;
}
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
if(insertable) {
insertableCount++;
}
}
%>
int counter<%=cid%> = rowCount<%=cid%> *
@@ -530,7 +532,8 @@ skeleton="../templates/db_output_bulk.skeleton"
+ 1;
<%
for(Column column : colStruct) {
if(column.isInsertable()) {
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
if(insertable) {
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getColumn().getTalendType(), column.getColumn().isNullable());
eiOperation.generateSetStmt(typeToGenerate,column,incomingConnName,cid);
%>
@@ -553,7 +556,8 @@ skeleton="../templates/db_output_bulk.skeleton"
<%
int count = 0;
for(Column column : colStruct) {
if(column.isInsertable()) {
boolean insertable = (batch_update ? (column.isUpdateKey() || column.isUpdatable()) : column.isInsertable());
if(insertable) {
if(count==0) {
%>
exInsertColValue<%=cid%>

View File

@@ -20,12 +20,15 @@ ADD_COLS.ITEM.SQL=SQL expression
ADD_COLS.NAME=Additional columns
CLEAR_TABLE.NAME=Clear data in table
COMMIT_EVERY.NAME=Commit every
DATA_ACTION.ITEM.DELETE=Delete
DATA_ACTION.ITEM.INSERT=Insert
DATA_ACTION.ITEM.INSERT_OR_UPDATE=Insert or update
DATA_ACTION.ITEM.UPDATE=Update
DATA_ACTION.ITEM.UPDATE_OR_INSERT=Update or insert
DATA_ACTION.ITEM.BULK_UPDATE=Bulk update
DATA_ACTION.NAME=Action on data
DBD-ODBC.INFO=Required for ODBC-like connection
DBD-Oracle.INFO=Required for Oracle
DBD-Pg.INFO=Required for Redshift

View File

@@ -10,6 +10,11 @@ public class CLASS {
public static final String FIRST_DELETE_KEY = "firstDeleteKeyColumn";
public static final String FIRST_INSERT_COLUMN = "firstInsertColumn";
public static final String FIRST_UPDATE_COLUMN = "firstUpdateColumn";
public static final String FIRST_BULK_UPDATE_COLUMN = "first_batch_update_column";
public static final String INSERT_COLUMN_NAME_BULK_UPDATE = "insert_column_name_batch_update";
public static final String INSERT_VALUE_STMT_BULK_UPDATE = "insert_value_stmt_batch_update";
public static final int NORMAL_TYPE = 0;
public static final int INSERT_TYPE = 1;
public static final int UPDATE_TYPE = 2;
@@ -558,6 +563,10 @@ public class CLASS {
return updateBulkSQL.toString() + updateSetStmt.toString() + updateWhereStmt.toString();
}
public String getUpdateBulkSQL4Output(List<Column> columnList) {
return null;
}
public List<Column> createColumnList(List<IMetadataColumn> columnList, boolean useFieldOptions, List<Map<String, String>> fieldOptions, List<Map<String, String>> addCols, boolean isSpecifyIdentityKey, String identityKey, int startValue, int step) {
List<Column> stmtStructure = createColumnList(columnList, useFieldOptions, fieldOptions, addCols);
if(isSpecifyIdentityKey) {
@@ -661,6 +670,10 @@ public class CLASS {
actionSQLMap.put(FIRST_DELETE_KEY, new StringBuilder());
actionSQLMap.put(FIRST_INSERT_COLUMN, new StringBuilder());
actionSQLMap.put(FIRST_UPDATE_COLUMN, new StringBuilder());
actionSQLMap.put(FIRST_BULK_UPDATE_COLUMN, new StringBuilder());
actionSQLMap.put(INSERT_COLUMN_NAME_BULK_UPDATE, new StringBuilder());
actionSQLMap.put(INSERT_VALUE_STMT_BULK_UPDATE, new StringBuilder());
} else {
for(Column column : stmtStructure) {
if(column.isReplaced()) {
@@ -719,8 +732,23 @@ public class CLASS {
if(firstUpdateColumn == null) {
firstUpdateColumn = new StringBuilder("true");
}
StringBuilder firstBatchUpdateColumn = actionSQLMap.get(FIRST_BULK_UPDATE_COLUMN);
if(firstBatchUpdateColumn == null) {
firstBatchUpdateColumn = new StringBuilder("true");
}
StringBuilder insertColNameBatchUpdate = actionSQLMap.get(INSERT_COLUMN_NAME_BULK_UPDATE);
if(insertColNameBatchUpdate == null) {
insertColNameBatchUpdate = new StringBuilder();
}
StringBuilder insertValueStmtBatchUpdate = actionSQLMap.get(INSERT_VALUE_STMT_BULK_UPDATE);
if(insertValueStmtBatchUpdate == null) {
insertValueStmtBatchUpdate = new StringBuilder();
}
String suffix = null;
String separate = null;
String identityKey = ElementParameterParser.getValue(node, "__IDENTITY_FIELD__");
if(!(isSpecifyIdentityKey && (column.getName().equals(identityKey))) && column.isInsertable() && !column.isDynamic()) {
if(firstInsertColumn.toString().equals("true")) {
@@ -734,6 +762,20 @@ public class CLASS {
insertValueStmt.append(suffix);
insertValueStmt.append(column.getSqlStmt());
}
if((column.isUpdateKey() || column.isUpdatable()) && !column.isDynamic()) {
if(firstBatchUpdateColumn.toString().equals("true")) {
suffix = "";
firstBatchUpdateColumn = new StringBuilder("false");
} else {
suffix = ",";
}
insertColNameBatchUpdate.append(suffix);
insertColNameBatchUpdate.append(getLProtectedChar(column.getColumnName()) + column.getColumnName() + getRProtectedChar(column.getColumnName()));
insertValueStmtBatchUpdate.append(suffix);
insertValueStmtBatchUpdate.append(column.getSqlStmt());
}
if(column.isUpdatable() && !column.isDynamic()) {
if(firstUpdateColumn.toString().equals("true")) {
suffix = "";
@@ -778,6 +820,11 @@ public class CLASS {
actionSQLMap.put(FIRST_DELETE_KEY, firstDeleteKeyColumn);
actionSQLMap.put(FIRST_INSERT_COLUMN, firstInsertColumn);
actionSQLMap.put(FIRST_UPDATE_COLUMN, firstUpdateColumn);
actionSQLMap.put(FIRST_BULK_UPDATE_COLUMN, firstBatchUpdateColumn);
actionSQLMap.put(INSERT_COLUMN_NAME_BULK_UPDATE, insertColNameBatchUpdate);
actionSQLMap.put(INSERT_VALUE_STMT_BULK_UPDATE, insertValueStmtBatchUpdate);
return actionSQLMap;
}
public String getGenerateType(String typeToGenerate) {
@@ -3029,6 +3076,77 @@ public class CLASS {
}
return createSQL.toString();
}
public String getUpdateBulkSQL4Output(List<Column> columnList) {
StringBuilder updateBulkSQL = new StringBuilder();
StringBuilder updateSetStmt = new StringBuilder();
StringBuilder updateWhereStmt = new StringBuilder();
updateBulkSQL.append("UPDATE " + getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar());
boolean firstKeyColumn = true;
boolean firstUpdateColumn = true;
String keySeparator = null;
String updateSeparator = null;
List<Column> fullColumnList = new java.util.ArrayList<Column>();
for(Column column : columnList) {
if(column.isReplaced()) {
List<Column> replacedColumns = column.getReplacement();
for(Column replacedColumn : replacedColumns) {
fullColumnList.add(replacedColumn);
}
} else {
fullColumnList.add(column);
}
}
for(Column column : fullColumnList) {
if(column.isDynamic()) {
continue;
}
if(column.isUpdateKey()) {
if(firstKeyColumn) {
keySeparator = "";
firstKeyColumn = false;
updateWhereStmt.append(" FROM " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + " WHERE \" + \"");
} else {
keySeparator = " AND ";
}
updateWhereStmt.append(keySeparator);
updateWhereStmt.append(getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " = " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
} else if(column.isUpdatable()) {
if(firstUpdateColumn) {
updateSeparator = "";
firstUpdateColumn = false;
updateSetStmt.append(" SET \" + \"");
} else {
updateSeparator = ", ";
}
updateSetStmt.append(updateSeparator);
updateSetStmt.append(getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " = " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
}
}
if(!firstUpdateColumn) {
updateWhereStmt.append(" AND (");
boolean first = true;
for(Column column : fullColumnList) {
if(!column.isUpdatable() || column.isDynamic()) {
continue;
}
if(!first) {
updateWhereStmt.append(" OR ");
} else {
first = false;
}
updateWhereStmt.append(getLProtectedChar() + "\" + tableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar() + " != " + getLProtectedChar() + "\" + tmpTableName_" + cid + " + \"" + getRProtectedChar() + "." + getLProtectedChar() + "\" + \"" + column.getColumnName() + "\" + \"" + getRProtectedChar());
}
updateWhereStmt.append(")");
}
return updateBulkSQL.toString() + updateSetStmt.toString() + updateWhereStmt.toString();
}
}
public class GreenplumManager extends PostgrePlusManager {

View File

@@ -2987,6 +2987,14 @@
name="RepositoryProjectDate"
version="7.0.0">
</projecttask>
<projecttask
beforeLogon="false"
breaks="7.0.0"
class="org.talend.repository.model.migration.ChangeUpdate2BulkUpdate4tRedshiftOutput"
id="org.talend.repository.model.migration.ChangeUpdate2BulkUpdate4tRedshiftOutput"
name="Change the update mode to bulk update mode"
version="7.0.0">
</projecttask>
</extension>
<extension

View File

@@ -0,0 +1,142 @@
// ============================================================================
//
// Copyright (C) 2006-2017 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
//
// You should have received a copy of the agreement
// along with this program; if not, write to Talend SA
// 9 rue Pages 92150 Suresnes, France
//
// ============================================================================
package org.talend.repository.model.migration;
import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Iterator;
import java.util.List;
import org.eclipse.emf.common.util.EList;
import org.eclipse.emf.ecore.EObject;
import org.talend.commons.exception.ExceptionHandler;
import org.talend.commons.exception.PersistenceException;
import org.talend.core.model.components.ComponentUtilities;
import org.talend.core.model.components.ModifyComponentsAction;
import org.talend.core.model.components.conversions.IComponentConversion;
import org.talend.core.model.components.filters.IComponentFilter;
import org.talend.core.model.components.filters.NameComponentFilter;
import org.talend.core.model.migration.AbstractJobMigrationTask;
import org.talend.core.model.process.EConnectionType;
import org.talend.core.model.properties.Item;
import org.talend.designer.core.model.utils.emf.talendfile.ColumnType;
import org.talend.designer.core.model.utils.emf.talendfile.ConnectionType;
import org.talend.designer.core.model.utils.emf.talendfile.ElementParameterType;
import org.talend.designer.core.model.utils.emf.talendfile.MetadataType;
import org.talend.designer.core.model.utils.emf.talendfile.NodeType;
import org.talend.designer.core.model.utils.emf.talendfile.ProcessType;
/**
* DOC Administrator class global comment. Detailled comment
*/
public class ChangeUpdate2BulkUpdate4tRedshiftOutput extends AbstractJobMigrationTask {
@Override
public ExecutionResult execute(Item item) {
final ProcessType processType = getProcessType(item);
String[] compNames = { "tRedshiftOutput" }; //$NON-NLS-1$
IComponentConversion doMigration = new IComponentConversion() {
public void transform(NodeType node) {
if (node == null) {
return;
}
ElementParameterType data_action = ComponentUtilities.getNodeProperty(node, "DATA_ACTION"); //$NON-NLS-1$
if (data_action == null) {
return;
}
String action = data_action.getValue();
if (action == null || !"UPDATE".equals(action)) {//$NON-NLS-1$
return;
}
if (isDynamicSchema(node)) {
return;
}
if (isRejectLineExists(node)) {
return;
}
data_action.setValue("BULK_UPDATE");//$NON-NLS-1$
ElementParameterType nb_rows_per_insert = ComponentUtilities.getNodeProperty(node, "NB_ROWS_PER_INSERT"); //$NON-NLS-1$
if (nb_rows_per_insert == null) {
return;
}
nb_rows_per_insert.setValue("1000");
}
private boolean isRejectLineExists(NodeType node) {
List<ConnectionType> list = ComponentUtilities.getNodeOutputConnections(node);
for (ConnectionType connType : list) {
EConnectionType eConnType = EConnectionType.getTypeFromId(connType.getLineStyle());
if (eConnType == EConnectionType.FLOW_MAIN && connType.getConnectorName().equals("REJECT")) {
return true;
}
}
return false;
}
private boolean isDynamicSchema(NodeType node) {
EList<EObject> list = node.eContents();
for (EObject object : list) {
if (object instanceof MetadataType) {
MetadataType flow = (MetadataType) object;
if ("FLOW".equalsIgnoreCase(flow.getConnector())) {
Iterator<?> columns = flow.getColumn().iterator();
while (columns.hasNext()) {
Object outColumn = columns.next();
if (outColumn instanceof ColumnType) {
ColumnType column = ((ColumnType) outColumn);
if ("id_Dynamic".equals(column.getType())) {
return true;
}
}
}
}
}
}
return false;
}
};
for (String name : compNames) {
IComponentFilter filter = new NameComponentFilter(name);
try {
ModifyComponentsAction.searchAndModify(item, processType, filter, Arrays.<IComponentConversion> asList(doMigration));
} catch (PersistenceException e) {
ExceptionHandler.process(e);
return ExecutionResult.FAILURE;
}
}
return ExecutionResult.SUCCESS_NO_ALERT;
}
public Date getOrder() {
GregorianCalendar gc = new GregorianCalendar(2018, 0, 26, 16, 0, 0);
return gc.getTime();
}
}