Files
tdi-studio-se/main/plugins/org.talend.designer.codegen/jet_stub/iterate_subprocess_header.javajet
Oleksandr Zhelezniak 642d2c5591 fix(TDI-49434): copy MDC to subprocess (#8526)
* copy MDC to parallelize
2023-03-13 09:16:55 +02:00

262 lines
9.6 KiB
Plaintext

<%@ jet
package="org.talend.designer.codegen.translators"
imports="
org.talend.core.model.process.INode
org.talend.core.model.temp.ECodePart
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.IConnection
org.talend.core.model.metadata.IMetadataTable
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.process.EConnectionType
org.talend.core.model.process.IExternalNode
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.codegen.config.NodesSubTree
org.talend.core.model.process.IProcess
org.talend.core.model.process.ProcessUtils
org.talend.core.model.utils.NodeUtil
org.talend.core.model.process.IContextParameter
java.util.List
java.util.Set
java.util.HashSet
java.util.ArrayList
"
class="IterateSubProcessHeader"
skeleton="subprocess_header_java.skeleton"
%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
INode node = (INode)codeGenArgument.getArgument();
boolean containsTPartitioner = node.getProcess().getNodesOfType("tPartitioner").size() > 0 ? true : false;
boolean isRunJob = "tRunJob".equals(node.getComponent().getName());
IProcess process = node.getProcess();
final boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(process, "__LOG4J_ACTIVATE__"));
final boolean isLog4j2Enabled = ("true").equals(ElementParameterParser.getValue(process, "__LOG4J2_ACTIVATE__"));//log4j2 enable
final String MDC_CLASS = "org.slf4j.MDC";
boolean isTestContainer=ProcessUtils.isTestContainer(process);
String className = process.getName();
if (isTestContainer) {
className = className + "Test";
}
NodesSubTree subTree = (NodesSubTree) codeGenArgument.getSubTree();
ECodePart codePart = codeGenArgument.getCodePart();
//boolean trace = codeGenArgument.isTrace();
boolean stat = codeGenArgument.isStatistics();
boolean isRunInMultiThread = codeGenArgument.getIsRunInMultiThread();
Set<IConnection> connSet = new HashSet<IConnection>();
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MAIN));
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MERGE));
Set<IConnection> iterateConnSet = new HashSet<IConnection>();
iterateConnSet.addAll(node.getIncomingConnections(EConnectionType.ITERATE));
String iterateNodeName = node.getUniqueName();
List<IContextParameter> params = new ArrayList<IContextParameter>();
params = process.getContextManager().getDefaultContext().getContextParameterList();
List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeConnections();
boolean parallelIterate = false;
for (IConnection iterateConn : iterateConnSet) { //1
%>
NB_ITERATE_<%=iterateNodeName %>++;
<%if(containsTPartitioner){%>
iterateLoop++;
<%}%>
<%
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
if (parallelIterate) {//2
if (codePart.equals(ECodePart.BEGIN)) {//3
String rowList=",";
for (IConnection conn : allSubProcessConnection) {
rowList += conn.getUniqueName()+"Struct "+conn.getUniqueName()+",";
}
rowList = rowList.substring(0, rowList.length()-1);
%>
class <%=node.getUniqueName() %>Thread extends TalendThread {//implements routines.system.TalendThreadPool.PropertySettable
class ThreadedMap extends java.util.HashMap<String, Object> {
private static final long serialVersionUID = 0L;
public ThreadedMap(java.util.Map<String, Object> globalMap) {
super(globalMap);
}
@Override
public Object put(String key, Object value) {
<%if(!isRunInMultiThread){%>
synchronized (<%=className%>.this.obj) {
<%}%>
super.put(key, value);
return <%=className%>.this.globalMap.put(key, value);
<%if(!isRunInMultiThread){%>
}
<%}%>
}
}
<% if(isRunJob) {%>
private final ContextProperties localContext = new ContextProperties();
<% } %>
private java.util.Map<String, Object> globalMap = null;
boolean isRunning = false;
String iterateId = "";
<%if(containsTPartitioner){%>
int iterateLoop = 0;
<%}%>
<%
if(!subTree.isMergeSubTree()) {
List< ? extends IConnection> rootConns = subTree.getRootNode().getOutgoingConnections();
if ((rootConns!=null)&&(rootConns.size()>0)) {
%>
<%=createPrivateClassInstance(subTree.getRootNode(), rootConns.get(0).getName(), false)%>
<%
}
} else {
List<INode> sortedMergeBranchStarts = subTree.getSortedMergeBranchStarts();
for (INode startNode : sortedMergeBranchStarts) {
List< ? extends IConnection> rootConns = startNode.getOutgoingConnections();
if ((rootConns!=null)&&(rootConns.size()>0)) {
%>
<%=createPrivateClassInstance(startNode, rootConns.get(0).getName(), false)%>
<%
}
}
List<INode> mergeNodes =subTree.getMergeNodes();
for(INode mNode:mergeNodes){
List< ? extends IConnection> rootConns = mNode.getOutgoingConnections();
if ((rootConns!=null)&&(rootConns.size()>0)) {
%>
<%=createPrivateClassInstance(mNode, rootConns.get(0).getName(), false)%>
<%
}
}
}
String schemaInstanceDeclaration = createPriveClassMethodDeclaration(subTree.getRootNode(), subTree.getRootNode().getOutgoingConnections().get(0).getName(), false, new java.util.HashSet<String>());
if (schemaInstanceDeclaration.length()>0) {
schemaInstanceDeclaration = "," + schemaInstanceDeclaration.substring(0,schemaInstanceDeclaration.length()-1);
}
%>
public <%=node.getUniqueName() %>Thread(java.util.Map<String, Object> globalMap<%=schemaInstanceDeclaration %>, int threadID) {
super();
<%
for (IConnection connection : allSubProcessConnection) {
IMetadataTable table = connection.getMetadataTable();
%>
if(<%= connection.getName() %> != null){
<%
List<IMetadataColumn> listColumns = table.getListColumns();
for (IMetadataColumn column : listColumns) {
%>
this.<%= connection.getName() %>.<%= column.getLabel() %> = <%= connection.getName() %>.<%= column.getLabel() %>;
<%
}
%>
}
<%
}
%>
<%
// if codeGenArgument.getIsRunInMultiThread() is true, the job.this.globalMap will wrapped with synchronizedMap, use synchronized(job.this.globalMap) when use globalMap.keySet().iterator()
// when codeGenArgument.getIsRunInMultiThread() is false, the job.this.globalMap is HashMap, use synchronized(job.this.object) when do the job.this.globalMap.put() operation(tMap,tIterateToFlow).
if(isRunInMultiThread){%>
synchronized (globalMap) {
this.globalMap = java.util.Collections.synchronizedMap(new ThreadedMap(globalMap));
<%}else{%>
synchronized (<%=className%>.this.obj) {
this.globalMap = new ThreadedMap(globalMap);
<%}%>
}
iterateId = "." + threadID;
<%if(containsTPartitioner){%>
iterateLoop++;
<%}%>
<%if(isRunJob) {%>
//bug21906 copy context to local for mutilthread
context.synchronizeContext();
java.util.Enumeration<?> propertyNames = context.propertyNames();
while(propertyNames.hasMoreElements()) {
String propertyName = (String)propertyNames.nextElement();
String propertyValue = context.getProperty(propertyName);
localContext.setProperty(propertyName, propertyValue);
}
<% for (IContextParameter ctxParam : params){%>
localContext.<%=ctxParam.getName()%> = context.<%=ctxParam.getName()%>;
<%
}
}
%>
}
public void run() {
<% if(isLog4jEnabled && isLog4j2Enabled) { %>
mdcInfo.forEach(<%=MDC_CLASS%>::put);
<% } %>
java.util.Map threadRunResultMap = new java.util.HashMap();
threadRunResultMap.put("errorCode", null);
threadRunResultMap.put("status", "");
threadLocal.set(threadRunResultMap);
this.isRunning = true;
String currentComponent = "";
String cLabel = null;
java.util.Map<String, Object> resourceMap = new java.util.HashMap<String, Object>();
<%
if(NodeUtil.hasVirtualComponent(subTree.getNodes())){
%>
String currentVirtualComponent = null;
<%
}
%>
try {
<%
if(stat){
%>
if(execStat){
runStat.updateStatOnConnection("<%=iterateConn.getUniqueName() %>",0,"exec"+iterateId);
}
<%
}
}//3
continue;
}else {//2
if(stat){
//iterateConn.getSource().getVirtualLinkTo() only works on the first component of a virtual component, if it works for the second one, the code should change
boolean iterateInVComp = iterateConn.getSource().getVirtualLinkTo() != null && iterateConn.getSource().getVirtualLinkTo() == EConnectionType.ITERATE;
if(!iterateInVComp){
Set<? extends IConnection> allInLineJobConns = NodeUtil.getAllInLineJobConnections(iterateConn.getTarget());
for (IConnection inLineConn : allInLineJobConns) {
%>
if(execStat){
runStat.updateStatOnConnection("<%=inLineConn.getUniqueName() %>", 3, 0);
}
<%
}
}
if(node.getDesignSubjobStartNode().getComponent().getName().equals("tCollector")){
%>
if(execStat){
runStat.updateStatOnIterate("<%=iterateConn.getUniqueName() %>", RunStat.RUNNING);
}
<%
}else{
%>
if(execStat){
runStat.updateStatOnConnection("<%=iterateConn.getUniqueName() %>", 1, "exec" + NB_ITERATE_<%=iterateNodeName %>);
//Thread.sleep(1000);
}
<%
}
}
}//2
}//1
%>