Compare commits

...

1 Commits

Author SHA1 Message Date
lwang-talend
3573db27e1 TDI-33377: Issue with tsalesforceOutputbulkexec when rejecting row on
update 
Read the batch file, and findout the IDs.
2015-07-31 11:58:23 +08:00
2 changed files with 76 additions and 0 deletions

View File

@@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -81,6 +82,43 @@ public class SalesforceBulkAPI {
public SalesforceBulkAPI(SforceBulkConnection connection) {
this.connection = connection;
}
private int idColumnIndex = -1;
public void setIdColumnIndex(int index){
idColumnIndex = index;
}
private ArrayList<String> idValues = new ArrayList<String>();
private void fillIdValues() throws IOException {
if(idColumnIndex == -1){
return;
}
BufferedReader br = new BufferedReader(new FileReader(new File(bulkFileName)));
// ingore the header
if(br.readLine() == null){
br.close();
return;
}
String line = "";
while((line = br.readLine()) != null){
idValues.add(line.split(",")[idColumnIndex]);
}
br.close();
}
private ArrayList<Integer> batchNumArray = new ArrayList<Integer>();
private int getRecordsNumber(int batchNum){
int recordsNumber = 0;
for (int i = 0; i < batchNum; i++){
recordsNumber += batchNumArray.get(i);
}
return recordsNumber;
}
private JobInfo job;
@@ -98,6 +136,8 @@ public class SalesforceBulkAPI {
closeJob();
awaitCompletion();
prepareLog();
fillIdValues();
}
private void prepareLog() throws IOException {
@@ -205,6 +245,7 @@ public class SalesforceBulkAPI {
// Create a final batch for any remaining data
rdr.close();
if (currentLines > 1) {
batchNumArray.add(currentLines);
createBatch(tmpOut, tmpFile, batchInfos);
}
} finally {
@@ -283,12 +324,21 @@ public class SalesforceBulkAPI {
List<String> resultHeader = rdr.nextRecord();
int resultCols = resultHeader.size();
List<String> row;
int lineCounter = getRecordsNumber(batchNum);
while ((row = rdr.nextRecord()) != null) {
resultInfo = new HashMap<String, String>();
resultInfo.putAll(getBaseFileRow());
for (int i = 0; i < resultCols; i++) {
resultInfo.put(resultHeader.get(i), row.get(i));
if(resultHeader.get(i).equalsIgnoreCase("id") && row.get(i) == null){
resultInfo.put(resultHeader.get(i), idValues.get(lineCounter));
} else {
resultInfo.put(resultHeader.get(i), row.get(i));
}
}
lineCounter++;
resultInfoList.add(resultInfo);
// boolean success = Boolean.valueOf(resultInfo.get("Success"));
// boolean created = Boolean.valueOf(resultInfo.get("Created"));

View File

@@ -146,6 +146,32 @@ imports="
org.talend.salesforceBulk.SalesforceBulkAPI sforceBulk_<%=cid%> = new org.talend.salesforceBulk.SalesforceBulkAPI(sforceConn_<%=cid%>);
sforceBulk_<%=cid%>.setConcurrencyMode("<%=concurrencyMode%>");
sforceBulk_<%=cid%>.setAwaitTime(<%=awaitTime%>);
<%
List<IMetadataTable> ms = node.getMetadataList();
if ((ms!=null)&&(ms.size()>0)) {//1
IMetadataTable m = ms.get(0);
if (m!=null) {//2
int idIndex = -1;
int tmp = 0;
for (IMetadataColumn column: m.getListColumns()) {
if(column.getLabel().equalsIgnoreCase("id")){
idIndex = tmp;
break;
}
tmp++;
}
if(idIndex != -1){
%>
sforceBulk_<%=cid%>.setIdColumnIndex(<%=idIndex%>);
<%
}
}
}
%>
<%log.info(log.str("Start to " + action + " records in bulk."));%>
sforceBulk_<%=cid%>.executeBulk(<%=sObject%>,"<%=action%>",<%=externalId%>,"<%=contentType%>",<%=bulkFileName%>,<%=maxBytes%>,<%=maxRows%>);