255 lines
9.2 KiB
Plaintext
255 lines
9.2 KiB
Plaintext
<%@ jet
|
|
package="org.talend.designer.codegen.translators"
|
|
imports="
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.temp.ECodePart
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.process.EConnectionType
|
|
org.talend.core.model.process.IExternalNode
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.designer.codegen.config.NodesSubTree
|
|
org.talend.core.model.process.IProcess
|
|
org.talend.core.model.process.ProcessUtils
|
|
org.talend.core.model.utils.NodeUtil
|
|
org.talend.core.model.process.IContextParameter
|
|
java.util.List
|
|
java.util.Set
|
|
java.util.HashSet
|
|
java.util.ArrayList
|
|
"
|
|
class="IterateSubProcessHeader"
|
|
skeleton="subprocess_header_java.skeleton"
|
|
%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
boolean containsTPartitioner = node.getProcess().getNodesOfType("tPartitioner").size() > 0 ? true : false;
|
|
boolean isRunJob = "tRunJob".equals(node.getComponent().getName());
|
|
IProcess process = node.getProcess();
|
|
|
|
boolean isTestContainer=ProcessUtils.isTestContainer(process);
|
|
String className = process.getName();
|
|
if (isTestContainer) {
|
|
className = className + "Test";
|
|
}
|
|
|
|
NodesSubTree subTree = (NodesSubTree) codeGenArgument.getSubTree();
|
|
ECodePart codePart = codeGenArgument.getCodePart();
|
|
//boolean trace = codeGenArgument.isTrace();
|
|
boolean stat = codeGenArgument.isStatistics();
|
|
|
|
boolean isRunInMultiThread = codeGenArgument.getIsRunInMultiThread();
|
|
|
|
Set<IConnection> connSet = new HashSet<IConnection>();
|
|
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MAIN));
|
|
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MERGE));
|
|
|
|
Set<IConnection> iterateConnSet = new HashSet<IConnection>();
|
|
iterateConnSet.addAll(node.getIncomingConnections(EConnectionType.ITERATE));
|
|
|
|
String iterateNodeName = node.getUniqueName();
|
|
|
|
List<IContextParameter> params = new ArrayList<IContextParameter>();
|
|
params = process.getContextManager().getDefaultContext().getContextParameterList();
|
|
|
|
List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeConnections();
|
|
boolean parallelIterate = false;
|
|
for (IConnection iterateConn : iterateConnSet) { //1
|
|
%>
|
|
NB_ITERATE_<%=iterateNodeName %>++;
|
|
<%if(containsTPartitioner){%>
|
|
iterateLoop++;
|
|
<%}%>
|
|
<%
|
|
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
|
|
if (parallelIterate) {//2
|
|
if (codePart.equals(ECodePart.BEGIN)) {//3
|
|
String rowList=",";
|
|
for (IConnection conn : allSubProcessConnection) {
|
|
rowList += conn.getUniqueName()+"Struct "+conn.getUniqueName()+",";
|
|
}
|
|
rowList = rowList.substring(0, rowList.length()-1);
|
|
%>
|
|
class <%=node.getUniqueName() %>Thread extends TalendThread {//implements routines.system.TalendThreadPool.PropertySettable
|
|
class ThreadedMap extends java.util.HashMap<String, Object> {
|
|
|
|
private static final long serialVersionUID = 0L;
|
|
|
|
public ThreadedMap(java.util.Map<String, Object> globalMap) {
|
|
super(globalMap);
|
|
}
|
|
|
|
@Override
|
|
public Object put(String key, Object value) {
|
|
<%if(!isRunInMultiThread){%>
|
|
synchronized (<%=className%>.this.obj) {
|
|
<%}%>
|
|
super.put(key, value);
|
|
return <%=className%>.this.globalMap.put(key, value);
|
|
<%if(!isRunInMultiThread){%>
|
|
}
|
|
<%}%>
|
|
}
|
|
}
|
|
<% if(isRunJob) {%>
|
|
private final ContextProperties localContext = new ContextProperties();
|
|
<% } %>
|
|
private java.util.Map<String, Object> globalMap = null;
|
|
boolean isRunning = false;
|
|
String iterateId = "";
|
|
<%if(containsTPartitioner){%>
|
|
int iterateLoop = 0;
|
|
<%}%>
|
|
<%
|
|
if(!subTree.isMergeSubTree()) {
|
|
List< ? extends IConnection> rootConns = subTree.getRootNode().getOutgoingConnections();
|
|
if ((rootConns!=null)&&(rootConns.size()>0)) {
|
|
%>
|
|
<%=createPrivateClassInstance(subTree.getRootNode(), rootConns.get(0).getName(), false)%>
|
|
<%
|
|
}
|
|
} else {
|
|
List<INode> sortedMergeBranchStarts = subTree.getSortedMergeBranchStarts();
|
|
for (INode startNode : sortedMergeBranchStarts) {
|
|
List< ? extends IConnection> rootConns = startNode.getOutgoingConnections();
|
|
if ((rootConns!=null)&&(rootConns.size()>0)) {
|
|
%>
|
|
<%=createPrivateClassInstance(startNode, rootConns.get(0).getName(), false)%>
|
|
<%
|
|
}
|
|
}
|
|
|
|
List<INode> mergeNodes =subTree.getMergeNodes();
|
|
for(INode mNode:mergeNodes){
|
|
List< ? extends IConnection> rootConns = mNode.getOutgoingConnections();
|
|
if ((rootConns!=null)&&(rootConns.size()>0)) {
|
|
%>
|
|
<%=createPrivateClassInstance(mNode, rootConns.get(0).getName(), false)%>
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
|
|
String schemaInstanceDeclaration = createPriveClassMethodDeclaration(subTree.getRootNode(), subTree.getRootNode().getOutgoingConnections().get(0).getName(), false, new java.util.HashSet<String>());
|
|
if (schemaInstanceDeclaration.length()>0) {
|
|
schemaInstanceDeclaration = "," + schemaInstanceDeclaration.substring(0,schemaInstanceDeclaration.length()-1);
|
|
}
|
|
%>
|
|
|
|
public <%=node.getUniqueName() %>Thread(java.util.Map<String, Object> globalMap<%=schemaInstanceDeclaration %>, int threadID) {
|
|
super();
|
|
<%
|
|
for (IConnection connection : allSubProcessConnection) {
|
|
IMetadataTable table = connection.getMetadataTable();
|
|
%>
|
|
if(<%= connection.getName() %> != null){
|
|
<%
|
|
List<IMetadataColumn> listColumns = table.getListColumns();
|
|
for (IMetadataColumn column : listColumns) {
|
|
%>
|
|
this.<%= connection.getName() %>.<%= column.getLabel() %> = <%= connection.getName() %>.<%= column.getLabel() %>;
|
|
<%
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
<%
|
|
// if codeGenArgument.getIsRunInMultiThread() is true, the job.this.globalMap will wrapped with synchronizedMap, use synchronized(job.this.globalMap) when use globalMap.keySet().iterator()
|
|
// when codeGenArgument.getIsRunInMultiThread() is false, the job.this.globalMap is HashMap, use synchronized(job.this.object) when do the job.this.globalMap.put() operation(tMap,tIterateToFlow).
|
|
if(isRunInMultiThread){%>
|
|
synchronized (globalMap) {
|
|
this.globalMap = java.util.Collections.synchronizedMap(new ThreadedMap(globalMap));
|
|
<%}else{%>
|
|
synchronized (<%=className%>.this.obj) {
|
|
this.globalMap = new ThreadedMap(globalMap);
|
|
<%}%>
|
|
}
|
|
iterateId = "." + threadID;
|
|
<%if(containsTPartitioner){%>
|
|
iterateLoop++;
|
|
<%}%>
|
|
<%if(isRunJob) {%>
|
|
//bug21906 copy context to local for mutilthread
|
|
context.synchronizeContext();
|
|
java.util.Enumeration<?> propertyNames = context.propertyNames();
|
|
while(propertyNames.hasMoreElements()) {
|
|
String propertyName = (String)propertyNames.nextElement();
|
|
String propertyValue = context.getProperty(propertyName);
|
|
localContext.setProperty(propertyName, propertyValue);
|
|
}
|
|
<% for (IContextParameter ctxParam : params){%>
|
|
localContext.<%=ctxParam.getName()%> = context.<%=ctxParam.getName()%>;
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
|
|
|
|
public void run() {
|
|
|
|
java.util.Map threadRunResultMap = new java.util.HashMap();
|
|
threadRunResultMap.put("errorCode", null);
|
|
threadRunResultMap.put("status", "");
|
|
threadLocal.set(threadRunResultMap);
|
|
|
|
this.isRunning = true;
|
|
String currentComponent = "";
|
|
String cLabel = null;
|
|
java.util.Map<String, Object> resourceMap = new java.util.HashMap<String, Object>();
|
|
<%
|
|
if(NodeUtil.hasVirtualComponent(subTree.getNodes())){
|
|
%>
|
|
String currentVirtualComponent = null;
|
|
<%
|
|
}
|
|
%>
|
|
try {
|
|
<%
|
|
if(stat){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=iterateConn.getUniqueName() %>",0,"exec"+iterateId);
|
|
}
|
|
<%
|
|
}
|
|
}//3
|
|
continue;
|
|
}else {//2
|
|
if(stat){
|
|
//iterateConn.getSource().getVirtualLinkTo() only works on the first component of a virtual component, if it works for the second one, the code should change
|
|
boolean iterateInVComp = iterateConn.getSource().getVirtualLinkTo() != null && iterateConn.getSource().getVirtualLinkTo() == EConnectionType.ITERATE;
|
|
if(!iterateInVComp){
|
|
Set<? extends IConnection> allInLineJobConns = NodeUtil.getAllInLineJobConnections(iterateConn.getTarget());
|
|
for (IConnection inLineConn : allInLineJobConns) {
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=inLineConn.getUniqueName() %>", 3, 0);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
if(node.getDesignSubjobStartNode().getComponent().getName().equals("tCollector")){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnIterate("<%=iterateConn.getUniqueName() %>", RunStat.RUNNING);
|
|
}
|
|
<%
|
|
}else{
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=iterateConn.getUniqueName() %>", 1, "exec" + NB_ITERATE_<%=iterateNodeName %>);
|
|
//Thread.sleep(1000);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}//2
|
|
}//1
|
|
%>
|