Compare commits

...

3 Commits

Author SHA1 Message Date
zzliu
514c21b191 TDI-31315: MDM transaction handling is wrong in multithreaded environment.
Note:Add '_<%=cid%>' for variable 'useClientTransactionId'
            https://jira.talendforge.org/browse/TDI-31315
2015-07-10 15:25:49 +08:00
zzliu
b8f2ab5e04 TDI-31315: MDM transaction handling is wrong in multithreaded environment.
Note:Refactor code and make stub,tid have better performance
            https://jira.talendforge.org/browse/TDI-31315
2015-07-08 18:38:25 +08:00
zzliu
44502a1a71 TDI-31315: MDM transaction handling is wrong in multithreaded environment.
Use ThreadLocal to handle client transaction id and serviceWS/stub.
            https://jira.talendforge.org/browse/TDI-31315
2015-07-06 16:16:28 +08:00
8 changed files with 182 additions and 55 deletions

View File

@@ -147,9 +147,9 @@
private boolean execStat = true;
private ThreadLocal<java.util.Map<String, String>> threadLocal = new ThreadLocal<java.util.Map<String, String>>() {
protected java.util.Map<String, String> initialValue() {
java.util.Map<String,String> threadRunResultMap = new java.util.HashMap<String, String>();
private ThreadLocal<java.util.Map<String, Object>> threadLocal = new ThreadLocal<java.util.Map<String, Object>>() {
protected java.util.Map<String, Object> initialValue() {
java.util.Map<String,Object> threadRunResultMap = new java.util.HashMap<String, Object>();
threadRunResultMap.put("errorCode", null);
threadRunResultMap.put("status", "");
return threadRunResultMap;

View File

@@ -3,9 +3,10 @@ imports="
org.talend.core.model.process.INode
org.talend.core.model.process.ElementParameterParser
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.core.model.process.IConnection
org.talend.core.model.utils.NodeUtil
"
%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
INode node = (INode)codeGenArgument.getArgument();
@@ -20,17 +21,41 @@ imports="
String conn = "";
%>
Boolean useClientTransactionId_<%=cid%> = false;
<%conn = "TMDMService_" + connection;%>
org.talend.mdm.webservice.TMDMService_PortType conn_<%=cid%> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
if(conn_<%=cid%> != null)
{
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)globalMap.get("<%=mdmTransaction%>");
if(mdmTransaction_<%=cid%>!=null) {
mdmTransaction_<%=cid%>.commit();
}
<% if(close){%>
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
org.talend.mdm.webservice.TMDMService_PortType conn_<%=cid%> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
String globalMdmURL_<%=cid%> = (String) globalMap.get("mdmURL_<%=connection%>");
String globalMdmUsername_<%=cid%> = (String) globalMap.get("mdmUsername_<%=connection%>");
String globalMdmPassword_<%=cid%> = (String) globalMap.get("mdmPassword_<%=connection%>");
if(null != globalMdmURL_<%=cid%> && null != globalMdmUsername_<%=cid%> && null != globalMdmPassword_<%=cid%>){
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(globalMdmURL_<%=cid%>);
conn_<%=cid%> = tmdmService_<%=cid %>.getTMDMPort();
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)conn_<%=cid%>).setUsername(globalMdmUsername_<%=cid%>);
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)conn_<%=cid%>).setPassword(globalMdmPassword_<%=cid%>);
}
if(conn_<%=cid%> != null){
useClientTransactionId_<%=cid%> = (Boolean)globalMap.get("useClientTransactionId_<%=connection%>");
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = null;
java.util.Map threadRunResultMap = threadLocal.get();
if(threadRunResultMap.get("mdmTransaction")!=null){
mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)threadRunResultMap.get("mdmTransaction");
}
if(mdmTransaction_<%=cid%> != null) {
mdmTransaction_<%=cid%>.commit();
threadRunResultMap.remove("mdmTransaction");
threadLocal.set(threadRunResultMap);
}
}else{
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)globalMap.get("<%=mdmTransaction%>");
if(mdmTransaction_<%=cid%> != null) {
mdmTransaction_<%=cid%>.commit();
}
}
<%if(close){%>
conn_<%=cid%>.logout(new org.talend.mdm.webservice.WSLogout());
<% }%>
}
<%}%>
}

View File

@@ -5,7 +5,6 @@ imports="
org.talend.designer.codegen.config.CodeGeneratorArgument
"
%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
INode node = (INode)codeGenArgument.getArgument();
@@ -37,27 +36,20 @@ stub_<%=cid %>.setPassword(decryptedPassword_<%=cid%>);
<%if(useTransaction) {%>
String turl_<%=cid%> = com.talend.mdm.transaction.client.MDMTransactionClient.getMDMTransactionURL(<%=mdmUrl %>, true);
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = null;
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = null;
<%
if(useClientTranId){
if(!useClientTranId){
%>
mdmTransaction_<%=cid%> = new com.talend.mdm.transaction.client.MDMTransaction();
mdmTransaction_<%=cid%>.setUrl(turl_<%=cid%>);
String clientTranId_<%=cid%> = "<%=cid%>"+java.util.UUID.randomUUID()+Thread.currentThread().getId();
mdmTransaction_<%=cid%>.setId(clientTranId_<%=cid%>);
mdmTransaction_<%=cid%>.setUsername(stub_<%=cid %>.getUsername());
mdmTransaction_<%=cid%>.setPassword(stub_<%=cid %>.getPassword());
mdmTransaction_<%=cid%> = com.talend.mdm.transaction.client.MDMTransactionClient.newTransaction(turl_<%=cid%>,stub_<%=cid %>.getUsername(),stub_<%=cid %>.getPassword());
stub_<%=cid %>.setHeader(new org.apache.axis.message.SOAPHeaderElement("http://www.talend.com/mdm", "transaction-id", mdmTransaction_<%=cid%>.getId()));
globalMap.put("mdmTransaction_<%=cid %>", mdmTransaction_<%=cid%>);
<%
}else{
%>
mdmTransaction_<%=cid%> = com.talend.mdm.transaction.client.MDMTransactionClient.newTransaction(turl_<%=cid%>,stub_<%=cid %>.getUsername(),stub_<%=cid %>.getPassword());
<%
}
%>
stub_<%=cid %>.setHeader(new org.apache.axis.message.SOAPHeaderElement("http://www.talend.com/mdm", "transaction-id", mdmTransaction_<%=cid%>.getId()));
globalMap.put("mdmTransaction_<%=cid %>", mdmTransaction_<%=cid%>);
<%}%>
}%>
globalMap.put("mdmURL_<%=cid %>", <%=mdmUrl %>);
globalMap.put("mdmUsername_<%=cid %>", stub_<%=cid %>.getUsername());
globalMap.put("mdmPassword_<%=cid %>", stub_<%=cid %>.getPassword());
globalMap.put("useClientTransactionId_<%=cid %>", <%=useTransaction %>);
<%}else{%>
stub_<%=cid %>.ping(new org.talend.mdm.webservice.WSPing());<%//For TDI-26109%>
globalMap.put("TMDMService_<%=cid %>", tmdmWS_<%=cid %>);
globalMap.put("TMDMService_<%=cid %>", tmdmWS_<%=cid %>);
<%}%>

View File

@@ -3,6 +3,7 @@ imports="
org.talend.core.model.process.INode
org.talend.core.model.process.ElementParameterParser
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.core.model.utils.NodeUtil
"
%>
<%
@@ -29,10 +30,12 @@ imports="
boolean addTaskID = ("true").equals(ElementParameterParser.getValue(node,"__ADD_TASKID__"));
boolean usePartialUpdate = ("true").equals(ElementParameterParser.getValue(node,"__USE_PARTIAL_UPDATE__"));
%>
int nb_line_<%=cid %> = 0;
int nb_line_rejected_<%=cid%> = 0;
Boolean useClientTransactionId_<%=cid%> = false;
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid %> = null;
<%if(!usePartialUpdate){%>
<%if(addTaskID){%>
class Util_<%=cid%>{
@@ -68,7 +71,7 @@ int nb_line_rejected_<%=cid%> = 0;
}
}
}%>
<%if(!useExistingConn){%>
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(<%=mdmUrl %>);
@@ -86,7 +89,37 @@ int nb_line_rejected_<%=cid%> = 0;
stub_<%=cid %>.setPassword(decryptedPassword_<%=cid%>);
<%}else{%>
org.talend.mdm.webservice.TMDMService_PortType tmdmWS_<%=cid %> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
org.talend.mdm.webservice.TMDMService_PortType tmdmWS_<%=cid %> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
useClientTransactionId_<%=cid%> = (Boolean)globalMap.get("useClientTransactionId_<%=connection%>");
String mdmConnection_username_<%=cid %>=null,mdmConnection_password_<%=cid %>=null,mdmConnection_url_<%=cid %>=null;
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
java.util.Map threadRunResultMap = threadLocal.get();
if(threadRunResultMap.get("mdmTransaction")!=null){
mdmTransaction_<%=cid %> = (com.talend.mdm.transaction.client.MDMTransaction)threadRunResultMap.get("mdmTransaction");
}else{
mdmTransaction_<%=cid %> = new com.talend.mdm.transaction.client.MDMTransaction();
mdmConnection_username_<%=cid %> = (String) globalMap.get("mdmUsername_<%=connection%>");
mdmConnection_password_<%=cid %> = (String) globalMap.get("mdmPassword_<%=connection%>");
mdmConnection_url_<%=cid %> = (String) globalMap.get("mdmURL_<%=connection%>");
String finalURL = com.talend.mdm.transaction.client.MDMTransactionClient.getMDMTransactionURL(mdmConnection_url_<%=cid %>,true);
mdmTransaction_<%=cid %>.setUrl(finalURL);
String clientTranId = jobName + "_"+ java.util.UUID.randomUUID().toString() + Thread.currentThread().getId();
mdmTransaction_<%=cid %>.setId(clientTranId);
mdmTransaction_<%=cid %>.setUsername(mdmConnection_username_<%=cid %>);
mdmTransaction_<%=cid %>.setPassword(mdmConnection_password_<%=cid %>);
threadRunResultMap.put("mdmTransaction",mdmTransaction_<%=cid %>);
threadLocal.set(threadRunResultMap);
}
if(null != mdmConnection_url_<%=cid %> && null != mdmConnection_username_<%=cid %> && null != mdmConnection_password_<%=cid %>){
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(mdmConnection_url_<%=cid %>);
tmdmWS_<%=cid %> = tmdmService_<%=cid %>.getTMDMPort();
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).setUsername(mdmConnection_username_<%=cid %>);
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).setPassword(mdmConnection_password_<%=cid %>);
}
}
<%}%>
org.talend.mdm.webservice.WSDataClusterPK dataCluster_<%=cid %> = new org.talend.mdm.webservice.WSDataClusterPK(<%=dataCluster %> + "<%=isStaging?"#STAGING":""%>");

View File

@@ -8,6 +8,7 @@
org.talend.core.model.process.IConnection
org.talend.core.model.process.IConnectionCategory
org.talend.core.model.process.EConnectionType
org.talend.core.model.utils.NodeUtil
java.util.List
java.util.Map
"
@@ -19,6 +20,9 @@ String cid = node.getUniqueName();
boolean isStaging = "STAGING".equalsIgnoreCase(ElementParameterParser.getValue(node, "__CONTAINER_TYPE__"));
boolean useExistingConn = ("true").equals(ElementParameterParser.getValue(node,"__USE_EXISTING_CONNECTION__"));
String connection = ElementParameterParser.getValue(node,"__CONNECTION__");
String destination = ElementParameterParser.getValue(node, "__DESTINATION__");
boolean withReport = ("true").equals(ElementParameterParser.getValue(node,"__WITHREPORT__"));
String needCheck = ElementParameterParser.getValue(node,"__ISINVOKE__");
@@ -49,7 +53,16 @@ List<IMetadataTable> metadatas = node.getMetadataList();
if (destination != null && !"".equals(destination)) {
cid = destination;
}
%>
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
if(mdmTransaction_<%=cid%> != null){
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).
setHeader(new org.apache.axis.message.SOAPHeaderElement("http://www.talend.com/mdm", "transaction-id", mdmTransaction_<%=cid%>.getId()));
}
}
<%
if (metadatas != null && metadatas.size()>0) {
IMetadataTable metadata = metadatas.get(0);
if (metadata != null) {

View File

@@ -3,6 +3,7 @@ imports="
org.talend.core.model.process.INode
org.talend.core.model.process.ElementParameterParser
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.core.model.utils.NodeUtil
"
%>
@@ -20,18 +21,41 @@ imports="
String conn = "";
%>
Boolean useClientTransactionId_<%=cid%> = false;
<%conn = "TMDMService_" + connection;%>
org.talend.mdm.webservice.TMDMService_PortType conn_<%=cid%> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
if(conn_<%=cid%> != null)
{
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)globalMap.get("<%=mdmTransaction%>");
if(mdmTransaction_<%=cid%>!=null) {
mdmTransaction_<%=cid%>.rollback();
}
<% if(close){%>
conn_<%=cid%>.logout(new org.talend.mdm.webservice.WSLogout());
<% }%>
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
org.talend.mdm.webservice.TMDMService_PortType conn_<%=cid %> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
String globalMdmURL_<%=cid%> = (String) globalMap.get("mdmURL_<%=connection%>");
String globalMdmUsername_<%=cid%> = (String) globalMap.get("mdmUsername_<%=connection%>");
String globalMdmPassword_<%=cid%> = (String) globalMap.get("mdmPassword_<%=connection%>");
if(null != globalMdmURL_<%=cid%> && null != globalMdmUsername_<%=cid%> && null != globalMdmPassword_<%=cid%>){
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(globalMdmURL_<%=cid%>);
conn_<%=cid %> = tmdmService_<%=cid %>.getTMDMPort();
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)conn_<%=cid %>).setUsername(globalMdmUsername_<%=cid%>);
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)conn_<%=cid %>).setPassword(globalMdmPassword_<%=cid%>);
}
if(conn_<%=cid%> != null){
useClientTransactionId_<%=cid%> = (Boolean)globalMap.get("useClientTransactionId_<%=connection%>");
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = null;
java.util.Map threadRunResultMap = threadLocal.get();
if(threadRunResultMap.get("mdmTransaction")!=null){
mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)threadRunResultMap.get("mdmTransaction");
}
if(mdmTransaction_<%=cid%> != null) {
mdmTransaction_<%=cid%>.rollback();
threadRunResultMap.remove("mdmTransaction");
threadLocal.set(threadRunResultMap);
}
}else{
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid%> = (com.talend.mdm.transaction.client.MDMTransaction)globalMap.get("<%=mdmTransaction%>");
if(mdmTransaction_<%=cid%> != null) {
mdmTransaction_<%=cid%>.rollback();
}
}
<%if(close){%>
conn_<%=cid%>.logout(new org.talend.mdm.webservice.WSLogout());
<%}%>
}

View File

@@ -39,6 +39,8 @@ imports="
%>
int nb_line_<%=cid %> = 0;
int nb_line_rejected_<%=cid%> = 0;
Boolean useClientTransactionId_<%=cid%> = false;
com.talend.mdm.transaction.client.MDMTransaction mdmTransaction_<%=cid %> = null;
<%if(!usePartialUpdate){%>
<%if(addTaskID){%>
class Util_<%=cid%>{
@@ -74,7 +76,6 @@ int nb_line_rejected_<%=cid%> = 0;
}
}
}%>
<%if(!useExistingConn){%>
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(<%=mdmUrl %>);
@@ -92,7 +93,37 @@ int nb_line_rejected_<%=cid%> = 0;
stub_<%=cid %>.setPassword(decryptedPassword_<%=cid%>);
<%}else{%>
org.talend.mdm.webservice.TMDMService_ServiceLocator tmdmService_<%=cid %> = new org.talend.mdm.webservice.TMDMService_ServiceLocator();
org.talend.mdm.webservice.TMDMService_PortType tmdmWS_<%=cid %> = (org.talend.mdm.webservice.TMDMService_PortType)globalMap.get("<%=conn%>");
useClientTransactionId_<%=cid%> = (Boolean)globalMap.get("useClientTransactionId_<%=connection%>");
String mdmConnection_username_<%=cid %>=null,mdmConnection_password_<%=cid %>=null,mdmConnection_url_<%=cid %>=null;
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
java.util.Map threadRunResultMap = threadLocal.get();
if(threadRunResultMap.get("mdmTransaction")!=null){
mdmTransaction_<%=cid %> = (com.talend.mdm.transaction.client.MDMTransaction)threadRunResultMap.get("mdmTransaction");
}else{
mdmTransaction_<%=cid %> = new com.talend.mdm.transaction.client.MDMTransaction();
mdmConnection_username_<%=cid %> = (String) globalMap.get("mdmUsername_<%=connection%>");
mdmConnection_password_<%=cid %> = (String) globalMap.get("mdmPassword_<%=connection%>");
mdmConnection_url_<%=cid %> = (String) globalMap.get("mdmURL_<%=connection%>");
String finalURL = com.talend.mdm.transaction.client.MDMTransactionClient.getMDMTransactionURL(mdmConnection_url_<%=cid %>,true);
mdmTransaction_<%=cid %>.setUrl(finalURL);
String clientTranId = jobName + "_"+ java.util.UUID.randomUUID().toString() + Thread.currentThread().getId();
mdmTransaction_<%=cid %>.setId(clientTranId);
mdmTransaction_<%=cid %>.setUsername(mdmConnection_username_<%=cid %>);
mdmTransaction_<%=cid %>.setPassword(mdmConnection_password_<%=cid %>);
threadRunResultMap.put("mdmTransaction",mdmTransaction_<%=cid %>);
threadLocal.set(threadRunResultMap);
}
if(null != mdmConnection_url_<%=cid %> && null != mdmConnection_username_<%=cid %> && null != mdmConnection_password_<%=cid %>){
tmdmService_<%=cid %>.setTMDMPortEndpointAddress(mdmConnection_url_<%=cid %>);
tmdmWS_<%=cid %> = tmdmService_<%=cid %>.getTMDMPort();
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).setUsername(mdmConnection_username_<%=cid %>);
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).setPassword(mdmConnection_password_<%=cid %>);
}
}
<%}%>
org.talend.mdm.webservice.WSDataClusterPK dataCluster_<%=cid %> = new org.talend.mdm.webservice.WSDataClusterPK(<%=dataCluster %> + "<%=isStaging?"#STAGING":""%>");
@@ -114,6 +145,7 @@ if(startNode != null){
IConnection nextMergeConn = NodeUtil.getNextMergeConnection(node);
if(nextMergeConn != null && nextMergeConn.getInputId()>1 && startNodeCid != null){
%>
java.util.Queue<String> queue_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<String>();
<%
if(storeFlow){

View File

@@ -50,9 +50,17 @@ List<IMetadataTable> metadatas = node.getMetadataList();
if (destination != null && !"".equals(destination)) {
cid = destination;
}
%>
if(null != useClientTransactionId_<%=cid%> && useClientTransactionId_<%=cid%>){
if(mdmTransaction_<%=cid%> != null){
((org.talend.mdm.webservice.TMDMServiceSoapBindingStub)tmdmWS_<%=cid %>).
setHeader(new org.apache.axis.message.SOAPHeaderElement("http://www.talend.com/mdm", "transaction-id", mdmTransaction_<%=cid%>.getId()));
}
}
if (metadatas != null && metadatas.size()>0) {
<%if (metadatas != null && metadatas.size()>0) {
IMetadataTable metadata = metadatas.get(0);
if (metadata != null) {