327 lines
12 KiB
Plaintext
327 lines
12 KiB
Plaintext
<%@ jet
|
|
package="org.talend.designer.codegen.translators"
|
|
imports="
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.temp.ECodePart
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.process.EConnectionType
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.utils.NodeUtil
|
|
org.talend.core.model.process.IConnectionCategory
|
|
java.util.List
|
|
java.util.ArrayList
|
|
java.util.Set
|
|
java.util.HashSet
|
|
java.util.Iterator
|
|
"
|
|
class="ComponentPartHeader"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
log = new LogUtil(node);
|
|
boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__"));
|
|
boolean containsTPartitioner = node.getProcess().getNodesOfType("tPartitioner").size() > 0 ? true : false;
|
|
String startNodeCid=node.getDesignSubjobStartNode().getUniqueName();
|
|
ECodePart codePart = codeGenArgument.getCodePart();
|
|
boolean trace = codeGenArgument.isTrace();
|
|
boolean stat = codeGenArgument.isStatistics();
|
|
Set<IConnection> connSet = new HashSet<IConnection>();
|
|
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MAIN));
|
|
connSet.addAll(node.getOutgoingConnections(EConnectionType.FLOW_MERGE));
|
|
String incomingName = codeGenArgument.getIncomingName();
|
|
|
|
Set<IConnection> iterateConnSet = new HashSet<IConnection>();
|
|
iterateConnSet.addAll(node.getOutgoingConnections(EConnectionType.ITERATE));
|
|
|
|
List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeConnections();
|
|
%>
|
|
|
|
/**
|
|
* [<%=node.getUniqueName() %> <%=codePart %> ] start
|
|
*/
|
|
|
|
<%
|
|
//This part in order to feedback with the iterate_subprocess_header.javajet and iterate_subprocess_footer.javajet
|
|
|
|
if (codePart.equals(ECodePart.BEGIN)) {
|
|
boolean parallelIterate = false;
|
|
boolean hasParallelIterate = false;
|
|
for (IConnection iterateConn : iterateConnSet) {
|
|
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
|
|
String iterateNodeName = iterateConn.getTarget().getUniqueName();
|
|
if (parallelIterate) {
|
|
%>
|
|
TalendThreadPool mtp_<%=iterateConn.getTarget().getUniqueName() %> = new TalendThreadPool(<%=ElementParameterParser.getValue(iterateConn,"__NUMBER_PARALLEL__") %>);
|
|
|
|
globalMap.put("lockWrite_<%=iterateNodeName%>", new Object[0]);
|
|
int threadIdCounter_<%=iterateNodeName%> =0;
|
|
<%
|
|
}
|
|
%>
|
|
int NB_ITERATE_<%=iterateNodeName %> = 0; //for statistics
|
|
<%
|
|
continue;
|
|
}
|
|
}
|
|
%>
|
|
|
|
<%
|
|
if (codePart.equals(ECodePart.BEGIN)) {
|
|
%>
|
|
<%
|
|
if(trace){
|
|
%>
|
|
globalMap.put("ENABLE_TRACES_CONNECTION_<%=startNodeCid%>",Boolean.FALSE);
|
|
<%
|
|
}
|
|
%>
|
|
ok_Hash.put("<%=node.getUniqueName() %>", false);
|
|
start_Hash.put("<%=node.getUniqueName() %>", System.currentTimeMillis());
|
|
<%
|
|
String statCatcher = ElementParameterParser.getValue(node,"__TSTATCATCHER_STATS__");
|
|
if ((node.getProcess().getNodesOfType("tStatCatcher").size() > 0) && (statCatcher.equals("true"))) {
|
|
for (INode statCatcherNode : node.getProcess().getNodesOfType("tStatCatcher")) {
|
|
%>
|
|
<%=statCatcherNode.getUniqueName() %>.addMessage("begin","<%=node.getUniqueName() %>");
|
|
<%=statCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
if(codePart.equals(ECodePart.MAIN)) {
|
|
List<INode> meterCatchers = (List<INode>)node.getProcess().getNodesOfType("tFlowMeterCatcher");
|
|
if ((node.getProcess().getNodesOfType("tFlowMeter").size() > 0)){
|
|
for(IConnection temp_conn : node.getIncomingConnections(EConnectionType.FLOW_MAIN)){
|
|
String name_conn = temp_conn.getUniqueName();
|
|
if(temp_conn.isUseByMetter()){
|
|
%>
|
|
<%
|
|
if (meterCatchers != null) {
|
|
for (INode meterCatcher : meterCatchers) {
|
|
%>
|
|
<%=meterCatcher.getUniqueName() %>.addLineToRow("<%=name_conn%>_count");
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for(IConnection temp_conn : node.getIncomingConnections(EConnectionType.FLOW_MERGE)){
|
|
String name_conn = temp_conn.getUniqueName();
|
|
if(name_conn == incomingName && temp_conn.isUseByMetter()){
|
|
if (meterCatchers != null) {
|
|
for (INode meterCatcher : meterCatchers) {
|
|
%>
|
|
<%=meterCatcher.getUniqueName() %>.addLineToRow("<%=name_conn%>_count");
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|
|
<%
|
|
if(node.isVirtualGenerateNode()){
|
|
%>
|
|
currentVirtualComponent = "<%=NodeUtil.getVirtualUniqueName(node)%>";
|
|
<%
|
|
}
|
|
%>
|
|
currentComponent="<%=node.getUniqueName() %>";
|
|
|
|
<%
|
|
connSet = new HashSet<IConnection>();
|
|
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MAIN));
|
|
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MERGE));
|
|
|
|
List<? extends INode> jobCatcherNodes = node.getProcess().getNodesOfType("tJobStructureCatcher");
|
|
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
|
|
String cid = node.getUniqueName();
|
|
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
|
|
|
|
if ((codePart.equals(ECodePart.BEGIN))&&(stat || logstashCurrent)&&connSet.size()>0) {
|
|
if(containsTPartitioner) {
|
|
%>
|
|
if(<%if(stat){%>execStat<%}%><%if(stat && logstashCurrent){%> || <%}%><%if(logstashCurrent){%>enableLogStash<%}%>) {
|
|
runStat.updateStatOnConnectionAndLog(resourceMap,globalMap,iterateLoop,iterateId,<%if(stat){%>execStat<%} else {%>false<%}%>,enableLogStash,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
} else {
|
|
if(stat && logstashCurrent) {
|
|
%>
|
|
runStat.updateStatAndLog(execStat,enableLogStash,resourceMap,iterateId,0,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
<%
|
|
} else {
|
|
if(stat) {
|
|
%>
|
|
if(execStat) {
|
|
runStat.updateStatOnConnection(resourceMap,iterateId,0,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
|
|
if(logstashCurrent) {
|
|
%>
|
|
if(enableLogStash) {
|
|
runStat.log(resourceMap,iterateId,0,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if((codePart.equals(ECodePart.MAIN))&&(stat || logstashCurrent)&&connSet.size()>0){
|
|
if(!node.getComponent().useMerge()) {
|
|
if(stat && logstashCurrent) {
|
|
%>
|
|
runStat.updateStatAndLog(execStat,enableLogStash,iterateId,1,1<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
<%
|
|
} else {
|
|
if(stat) {
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection(iterateId,1,1<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
|
|
if(logstashCurrent) {
|
|
%>
|
|
if(enableLogStash) {
|
|
runStat.log(iterateId,1,1<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
} else {
|
|
for(IConnection connection:connSet){
|
|
if(connection.getUniqueName().equals((String)codeGenArgument.getIncomingName())){
|
|
if(stat && logstashCurrent) {
|
|
%>
|
|
runStat.updateStatAndLog(execStat,enableLogStash,iterateId,1,1<%for(IConnection con : connSet){if(con.getUniqueName().equals((String)codeGenArgument.getIncomingName())){%>,"<%=con.getUniqueName()%>"<%}}%>);
|
|
<%
|
|
} else {
|
|
if(stat) {%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection(iterateId,1,1<%for(IConnection con : connSet){if(con.getUniqueName().equals((String)codeGenArgument.getIncomingName())){%>,"<%=con.getUniqueName()%>"<%}}%>);
|
|
}
|
|
<%}
|
|
|
|
if(logstashCurrent) {%>
|
|
if(enableLogStash) {
|
|
runStat.log(iterateId,1,1<%for(IConnection con : connSet){if(con.getUniqueName().equals((String)codeGenArgument.getIncomingName())){%>,"<%=con.getUniqueName()%>"<%}}%>);
|
|
}
|
|
<%}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Set<IConnection> connSetForLog = new HashSet<IConnection>();
|
|
connSetForLog.addAll(node.getIncomingConnections(EConnectionType.FLOW_MAIN));
|
|
connSetForLog.addAll(node.getIncomingConnections(EConnectionType.FLOW_MERGE));
|
|
if((codePart.equals(ECodePart.MAIN)) && isLog4jEnabled && connSetForLog.size()>0){
|
|
if(!(node.isVirtualGenerateNode() && node.getVirtualLinkTo()==null)){//ignore the connector in virtual component
|
|
for(IConnection con:connSetForLog){
|
|
%>
|
|
if(log.isTraceEnabled()){
|
|
log.trace("<%=con.getUniqueName()%> - " + (<%=con.getName()%>==null? "": <%=con.getName()%>.toLogString()));
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
if(codePart.equals(ECodePart.BEGIN)){
|
|
%>
|
|
int tos_count_<%=node.getUniqueName() %> = 0;
|
|
<%
|
|
log.startWork();
|
|
log.logCompSetting();
|
|
|
|
if(logstashCurrent) {
|
|
for (INode jobStructureCatcher : jobCatcherNodes) {
|
|
%>
|
|
if(enableLogStash) {
|
|
<%=jobStructureCatcher.getUniqueName() %>.addCM("<%=node.getUniqueName()%>", "<%=node.getComponent().getName()%>");
|
|
<%=jobStructureCatcher.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
|
|
}
|
|
<%
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if(stat || logstashCurrent){
|
|
boolean iterateInVFComp = (node.getVirtualLinkTo() != null && node.getVirtualLinkTo() == EConnectionType.ITERATE);
|
|
if(iterateInVFComp){
|
|
if(codePart.equals(ECodePart.BEGIN)){
|
|
List<String> needToStartConnNames = new ArrayList<String>();
|
|
INode nextNode = node.getOutgoingConnections(EConnectionType.ITERATE).get(0).getTarget();
|
|
NodeUtil.fillConnectionsForStat(needToStartConnNames, nextNode);
|
|
if(needToStartConnNames.isEmpty()) {
|
|
//do nothing
|
|
} else if(containsTPartitioner){
|
|
%>
|
|
if(<%if(stat){%>execStat<%}%><%if(stat && logstashCurrent){%> || <%}%><%if(logstashCurrent){%>enableLogStash<%}%>){
|
|
runStat.updateStatOnConnectionAndLog(globalMap,iterateLoop,iterateId,<%if(stat){%>execStat<%} else {%>false<%}%>,enableLogStash,0<%for(String connName : needToStartConnNames){%>,"<%=connName%>"<%}%>);
|
|
}
|
|
<%
|
|
} else {
|
|
if(stat && logstashCurrent) {
|
|
%>
|
|
runStat.updateStatAndLog(execStat,enableLogStash,iterateId,0,0<%for(String connName : needToStartConnNames){%>,"<%=connName%>"<%}%>);
|
|
<%
|
|
} else {
|
|
if(stat){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection(iterateId,0,0<%for(String connName : needToStartConnNames){%>,"<%=connName%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
|
|
<%if(logstashCurrent) {%>
|
|
if(enableLogStash){
|
|
runStat.log(iterateId,0,0<%for(String connName : needToStartConnNames){%>,"<%=connName%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}else if(codePart.equals(ECodePart.MAIN)){
|
|
%>
|
|
resourceMap.put("inIterateVComp", true);
|
|
<%
|
|
}else if(codePart.equals(ECodePart.END)){
|
|
%>
|
|
resourceMap.remove("inIterateVComp");
|
|
<%
|
|
}
|
|
}else{
|
|
IConnection preIterate = (node.getIncomingConnections(EConnectionType.ITERATE) != null && node.getIncomingConnections(EConnectionType.ITERATE).size() == 1) ? node.getIncomingConnections(EConnectionType.ITERATE).get(0) : null;
|
|
if(preIterate != null){
|
|
boolean iterateInVSComp = (preIterate.getSource().getVirtualLinkTo() != null && preIterate.getSource().getVirtualLinkTo() == EConnectionType.ITERATE);
|
|
if(iterateInVSComp){
|
|
if(codePart.equals(ECodePart.BEGIN)){
|
|
%>
|
|
resourceMap.remove("inIterateVComp");
|
|
<%
|
|
}else if(codePart.equals(ECodePart.END)){
|
|
%>
|
|
resourceMap.put("inIterateVComp", true);
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|