469 lines
17 KiB
Plaintext
469 lines
17 KiB
Plaintext
<%@ jet
|
|
package="org.talend.designer.codegen.translators"
|
|
imports="
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.temp.ECodePart
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.process.EConnectionType
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.designer.codegen.config.NodesSubTree
|
|
org.talend.core.model.process.IProcess
|
|
org.talend.core.model.utils.NodeUtil
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.metadata.types.JavaType
|
|
java.util.Map
|
|
java.util.List
|
|
java.util.ArrayList
|
|
java.util.Set
|
|
java.util.HashSet
|
|
"
|
|
class="ComponentPartFooter"
|
|
skeleton="subprocess_header_java.skeleton"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
log = new LogUtil(node);
|
|
IProcess process = node.getProcess();
|
|
String startNodeCid=node.getDesignSubjobStartNode().getUniqueName();
|
|
|
|
NodesSubTree subTree = (NodesSubTree) codeGenArgument.getSubTree();
|
|
ECodePart codePart = codeGenArgument.getCodePart();
|
|
boolean trace = codeGenArgument.isTrace();
|
|
boolean stat = codeGenArgument.isStatistics();
|
|
|
|
boolean isRunInMultiThread = codeGenArgument.getIsRunInMultiThread();
|
|
|
|
Set<IConnection> connSet = new HashSet<IConnection>();
|
|
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MAIN));
|
|
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MERGE));
|
|
|
|
Set<IConnection> iterateConnSet = new HashSet<IConnection>();
|
|
iterateConnSet.addAll(node.getOutgoingConnections(EConnectionType.ITERATE));
|
|
|
|
List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeConnections();
|
|
|
|
String cid = node.getUniqueName();
|
|
|
|
List<? extends INode> jobCatcherNodes = process.getNodesOfType("tJobStructureCatcher");
|
|
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
|
|
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
|
|
|
|
if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
|
|
boolean iterateInVFComp = (node.getVirtualLinkTo() != null && node.getVirtualLinkTo() == EConnectionType.ITERATE);
|
|
if(iterateInVFComp){
|
|
List<String> needToEndConnNames = new ArrayList<String>();
|
|
INode nextNode = node.getOutgoingConnections(EConnectionType.ITERATE).get(0).getTarget();
|
|
NodeUtil.fillConnectionsForStat(needToEndConnNames, nextNode);
|
|
if(!needToEndConnNames.isEmpty()) {
|
|
if(stat && logstashCurrent) {
|
|
%>
|
|
runStat.updateStatAndLog(execStat,enableLogStash,iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
|
|
<%
|
|
} else {
|
|
if(stat) {%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection(iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
|
|
}
|
|
<%}%>
|
|
|
|
<%if(logstashCurrent) {//now only finish the log, not send, TODO%>
|
|
if(enableLogStash){
|
|
runStat.log(iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if(connSet.size()>0) {
|
|
if(stat && logstashCurrent && (connSet.size()==1)) {//the most common case, write this ugly logic for 65535 issue
|
|
for(IConnection con:connSet){
|
|
INode source = con.getSource();
|
|
String sourceNodeId = source.getUniqueName();
|
|
String sourceNodeComponent = source.getComponent().getName();
|
|
for (INode jobStructureCatcher : jobCatcherNodes) {
|
|
%>
|
|
if(runStat.updateStatAndLog(execStat,enableLogStash,resourceMap,iterateId,"<%=con.getUniqueName()%>",2,0,
|
|
<%=jobStructureCatcher.getUniqueName()%>,"<%=sourceNodeId%>","<%=sourceNodeComponent%>","<%=node.getUniqueName()%>","<%=node.getComponent().getName()%>","<%="REJECT".equals(con.getConnectorName()) ? "reject" : "output"%>")) {
|
|
<%=jobStructureCatcher.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
|
|
}
|
|
<%
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
if(stat){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStat(resourceMap,iterateId,2,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
|
|
}
|
|
<%
|
|
}
|
|
|
|
if(logstashCurrent){
|
|
%>
|
|
if(enableLogStash) {
|
|
<%
|
|
for(IConnection con:connSet){
|
|
INode source = con.getSource();
|
|
String sourceNodeId = source.getUniqueName();
|
|
String sourceNodeComponent = source.getComponent().getName();
|
|
for (INode jobStructureCatcher : jobCatcherNodes) {
|
|
%>
|
|
if(runStat.log(resourceMap,iterateId,"<%=con.getUniqueName()%>",2,0,
|
|
<%=jobStructureCatcher.getUniqueName()%>,"<%=sourceNodeId%>","<%=sourceNodeComponent%>","<%=node.getUniqueName()%>","<%=node.getComponent().getName()%>","<%="REJECT".equals(con.getConnectorName()) ? "reject" : "output"%>")) {
|
|
<%=jobStructureCatcher.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
|
|
}
|
|
<%
|
|
break;
|
|
}
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|
|
|
|
<%
|
|
|
|
if (codePart.equals(ECodePart.MAIN)) {
|
|
//================================TDI-17183 start========================================
|
|
/**
|
|
if the node is autopropagate the schema, then the output row should evaluate the input row
|
|
before the track code part(TDI-17183)
|
|
*/
|
|
String inputColName = null;
|
|
if (node.getIncomingConnections()!=null) {
|
|
for (IConnection incomingConn : node.getIncomingConnections()) {
|
|
if (incomingConn.getLineStyle().equals(EConnectionType.FLOW_MAIN)) {
|
|
inputColName = incomingConn.getName();
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
if ((!node.isSubProcessStart())&&(NodeUtil.isDataAutoPropagated(node))) {
|
|
if (inputColName!=null) {
|
|
if ((metadatas != null) && (metadatas.size() > 0)) {
|
|
IMetadataTable outputMetadata = metadatas.get(0);
|
|
if (outputMetadata != null) {
|
|
for (IConnection outputConnection : node.getOutgoingConnections()) {
|
|
if (outputConnection.getLineStyle().equals(EConnectionType.FLOW_MAIN) || outputConnection.getLineStyle().equals(EConnectionType.FLOW_MERGE)) {
|
|
|
|
//void the case: row1.username=row1.username
|
|
if(!outputConnection.getName().equals(inputColName)){//111
|
|
//void the case: row1Struct row1 = new row1Struct(); row1Struct row3 = row1;
|
|
//for (IMetadataColumn column : outputMetadata.getListColumns()) {
|
|
%>
|
|
<%=outputConnection.getName() %> = <%=inputColName %>;
|
|
<%
|
|
//}
|
|
}//111
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//======================================TDI-17183 end=====================================
|
|
boolean traceCodeGenerated = false;
|
|
for (IConnection conn : node.getOutgoingConnections()) {
|
|
boolean use_reaceenble = ("true").equals(ElementParameterParser.getValue(conn, "__TRACES_CONNECTION_ENABLE__"));
|
|
if(node.isSubProcessStart() && node.isSubProcessContainTraceBreakpoint() && !traceCodeGenerated) {
|
|
traceCodeGenerated = true;
|
|
%>
|
|
globalMap.put("USE_CONDITION",Boolean.TRUE);
|
|
globalMap.put("TRACE_CONDITION", Boolean.FALSE);
|
|
<%
|
|
}
|
|
IMetadataTable metadata = conn.getMetadataTable();
|
|
if (metadata!=null) {
|
|
if (conn.getLineStyle().equals(EConnectionType.FLOW_MAIN) ||
|
|
conn.getLineStyle().equals(EConnectionType.FLOW_MERGE)) {
|
|
if (trace && use_reaceenble) {
|
|
%>
|
|
|
|
if (<%=conn.getName() %> !=null) {
|
|
globalMap.put("ENABLE_TRACES_CONNECTION_<%=startNodeCid%>",Boolean.TRUE);
|
|
if (runTrace.isPause()) {
|
|
while (runTrace.isPause()) {
|
|
Thread.sleep(100);
|
|
}
|
|
} else {
|
|
|
|
// here we dump the line content for trace purpose
|
|
java.util.LinkedHashMap<String, String> runTraceData = new java.util.LinkedHashMap<String,String>();
|
|
<%
|
|
for(IMetadataColumn column : metadata.getListColumns()){
|
|
%>
|
|
runTraceData.put("<%=column.getLabel()%>", String.valueOf(<%=conn.getName()%>.<%=column.getLabel()%>));
|
|
<%
|
|
}
|
|
%>
|
|
runTrace.sendTrace("<%=conn.getName()%>","<%=startNodeCid%>", runTraceData);
|
|
}
|
|
<%
|
|
|
|
boolean use_breakout = ("true").equals(ElementParameterParser.getValue(conn, "__ACTIVEBREAKPOINT__"));
|
|
boolean use_advanced = ("true").equals(ElementParameterParser.getValue(conn, "__USE_ADVANCED__"));
|
|
List<Map<String, String>> keyColumns = (List<Map<String,String>>)ElementParameterParser.getObjectValue( conn, "__CONDITIONS__");
|
|
String advancedCondition = ElementParameterParser.getValue(conn, "__ADVANCED_COND__");
|
|
String logical = ElementParameterParser.getValue(conn,"__LOGICAL_OP__");
|
|
if (metadata != null) {
|
|
if(use_breakout && use_reaceenble){%>
|
|
if (
|
|
<%
|
|
if ((keyColumns!=null &&keyColumns.size() > 0)|| use_advanced) { //111
|
|
int ii = 0;
|
|
|
|
for (Map<String, String> keyColumn : keyColumns){ // 222 ->
|
|
ii++;
|
|
%>
|
|
<%= ii == 1 ? "" : logical %>(
|
|
<%
|
|
if (!("").equals(keyColumn.get("FUNCTION"))) {
|
|
%>
|
|
<%=keyColumn.get("FUNCTION").replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
|
|
<%
|
|
}else {
|
|
IMetadataColumn rightColumn = metadata.getColumn(keyColumn.get("INPUT_COLUMN"));
|
|
JavaType javaType = JavaTypesManager.getJavaTypeFromId(rightColumn.getTalendType());
|
|
|
|
if (!javaType.isPrimitive()) {
|
|
//this is only for bug:8133, when "Oject" type, and "Empty" function, and compare with "null"
|
|
if (keyColumn.get("RVALUE") != null && keyColumn.get("RVALUE").equals("null")){
|
|
%>
|
|
<%="$source $operator $target".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
|
|
<%}else{%>
|
|
<%="$source == null? false : $source.compareTo($target) $operator 0".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
|
|
<%
|
|
}
|
|
} else {
|
|
%>
|
|
<%="$source $operator $target".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
|
|
<%
|
|
}
|
|
}
|
|
|
|
%>
|
|
)
|
|
<%
|
|
} // <- 222
|
|
%>
|
|
<%=use_advanced? (keyColumns.size() < 1 ? "" : logical) + advancedCondition.replace("input_row", conn.getName()) : ""%>
|
|
<%
|
|
} else { //111
|
|
%>
|
|
true
|
|
<%
|
|
} //111
|
|
%>
|
|
){
|
|
globalMap.put("TRACE_CONDITION", Boolean.TRUE);
|
|
}
|
|
<%
|
|
}
|
|
%>
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
if (codePart.equals(ECodePart.END)) {
|
|
log.endWork();
|
|
%>
|
|
|
|
ok_Hash.put("<%=node.getUniqueName() %>", true);
|
|
end_Hash.put("<%=node.getUniqueName() %>", System.currentTimeMillis());
|
|
|
|
<%
|
|
String statCatcher = ElementParameterParser.getValue(node,"__TSTATCATCHER_STATS__");
|
|
if ((node.getProcess().getNodesOfType("tStatCatcher").size() > 0) & (statCatcher.equals("true"))) {
|
|
for (INode statCatcherNode : node.getProcess().getNodesOfType("tStatCatcher")) {
|
|
%>
|
|
<%=statCatcherNode.getUniqueName() %>.addMessage("end","<%=node.getUniqueName() %>", end_Hash.get("<%=node.getUniqueName() %>")-start_Hash.get("<%=node.getUniqueName() %>"));
|
|
<%=statCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
|
|
<%
|
|
}
|
|
}
|
|
|
|
for (IConnection outgoingConn : node.getOutgoingConnections()) {
|
|
if (outgoingConn.getTarget().isActivate()) {
|
|
|
|
|
|
if (outgoingConn.getLineStyle().equals(EConnectionType.ON_COMPONENT_OK)) {
|
|
//send the ok status to socket
|
|
if(stat){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "ok");
|
|
}
|
|
<%
|
|
}
|
|
|
|
|
|
/*
|
|
This boolean is created for the ON_COMPONENT_OK virtual components.
|
|
This technic is used on the tWriteXMLField for example. Since the sub method is called in the Thread, we do not have to generate again in the component footer.
|
|
*/
|
|
boolean generateMethodCall = false;
|
|
if((!node.isVirtualGenerateNode())) {
|
|
generateMethodCall = true;
|
|
} else {
|
|
if(node.getVirtualLinkTo()==null || node.getVirtualLinkTo()!=EConnectionType.ON_COMPONENT_OK) {
|
|
generateMethodCall = true;
|
|
}
|
|
// Else, that means the component is a ON_COMPONENT_OK virtual component typed.
|
|
}
|
|
|
|
if(generateMethodCall) {
|
|
%>
|
|
<%=outgoingConn.getTarget().getUniqueName() %>Process(globalMap);
|
|
<%
|
|
}
|
|
}
|
|
|
|
|
|
if (outgoingConn.getLineStyle().equals(EConnectionType.RUN_IF)) {
|
|
%>
|
|
if (<%=outgoingConn.getCondition() %>) {
|
|
<%//send the true status to socket
|
|
if(stat){
|
|
%>
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "true");
|
|
}
|
|
<%}%>
|
|
<%=outgoingConn.getTarget().getUniqueName() %>Process(globalMap);
|
|
}
|
|
|
|
<%
|
|
//send the false status to socket
|
|
if(stat){
|
|
%>
|
|
else{
|
|
if(execStat){
|
|
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "false");
|
|
}
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
%>
|
|
|
|
<%
|
|
//This part in order to feedback with the iterate_subprocess_header.javajet and iterate_subprocess_footer.javajet
|
|
|
|
boolean parallelIterate = false;
|
|
for (IConnection iterateConn : iterateConnSet) {
|
|
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
|
|
if (parallelIterate) {
|
|
if (codePart.equals(ECodePart.END)) {
|
|
String iterateTargetNodeName = iterateConn.getTarget().getUniqueName();
|
|
%>
|
|
mtp_<%=iterateTargetNodeName %>.waitForEndOfQueue();
|
|
|
|
TalendThread errorThread_<%=iterateTargetNodeName %> = mtp_<%=iterateTargetNodeName %>.getErrorThread();
|
|
|
|
<%if(!isRunInMultiThread){%>
|
|
if(errorThread_<%=iterateTargetNodeName %> != null) {
|
|
if (errorThread_<%=iterateTargetNodeName %>.errorCode != null) {
|
|
if (errorCode == null
|
|
|| errorThread_<%=iterateTargetNodeName %>.errorCode.compareTo(errorCode) > 0) {
|
|
errorCode = errorThread_<%=iterateTargetNodeName %>.errorCode;
|
|
}
|
|
}
|
|
if (!status.equals("failure")) {
|
|
status = errorThread_<%=iterateTargetNodeName %>.status;
|
|
}
|
|
if(errorThread_<%=iterateTargetNodeName %>.exception!=null){
|
|
throw errorThread_<%=iterateTargetNodeName %>.exception;
|
|
}
|
|
if(errorThread_<%=iterateTargetNodeName %>.error!=null){
|
|
throw errorThread_<%=iterateTargetNodeName %>.error;
|
|
}
|
|
}else{
|
|
Integer threadErrorCode = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getErrorCode();
|
|
String threadStatus = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getStatus();
|
|
|
|
if (threadErrorCode != null) {
|
|
if (errorCode == null
|
|
|| threadErrorCode.compareTo(errorCode) > 0) {
|
|
errorCode = threadErrorCode;
|
|
}
|
|
}
|
|
if (!status.equals("failure")) {
|
|
status = threadStatus;
|
|
}
|
|
}
|
|
<%}else{%>
|
|
if(errorThread_<%=iterateTargetNodeName %> != null) {
|
|
Integer localErrorCode = (Integer) (((java.util.Map) threadLocal.get()).get("errorCode"));
|
|
String localStatus = (String) (((java.util.Map) threadLocal.get()).get("status"));
|
|
|
|
if (errorThread_<%=iterateTargetNodeName %>.errorCode != null) {
|
|
if (localErrorCode == null || errorThread_<%=iterateTargetNodeName %>.errorCode.compareTo(localErrorCode) > 0) {
|
|
((java.util.Map) threadLocal.get()).put("errorCode", errorThread_<%=iterateTargetNodeName %>.errorCode);
|
|
}
|
|
}
|
|
if (!localStatus.equals("failure")) {
|
|
((java.util.Map) threadLocal.get()).put("status", errorThread_<%=iterateTargetNodeName %>.status);
|
|
}
|
|
if(errorThread_<%=iterateTargetNodeName %>.exception!=null){
|
|
throw errorThread_<%=iterateTargetNodeName %>.exception;
|
|
}
|
|
}else{
|
|
Integer threadErrorCode = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getErrorCode();
|
|
String threadStatus = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getStatus();
|
|
|
|
Integer localErrorCode = (Integer) (((java.util.Map) threadLocal
|
|
.get()).get("errorCode"));
|
|
String localStatus = (String) (((java.util.Map) threadLocal
|
|
.get()).get("status"));
|
|
|
|
if (threadErrorCode != null) {
|
|
if (localErrorCode == null
|
|
|| threadErrorCode.compareTo(localErrorCode) > 0) {
|
|
((java.util.Map) threadLocal.get()).put("errorCode",
|
|
threadErrorCode);
|
|
}
|
|
}
|
|
if (!localStatus.equals("failure")) {
|
|
((java.util.Map) threadLocal.get()).put("status",
|
|
threadStatus);
|
|
}
|
|
}
|
|
<%}%>
|
|
|
|
|
|
<%
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
%>
|
|
|
|
<%if(codePart.equals(ECodePart.MAIN)){ %>
|
|
tos_count_<%=node.getUniqueName() %>++;
|
|
<%}%>
|
|
|
|
/**
|
|
* [<%=node.getUniqueName() %> <%=codePart %> ] stop
|
|
*/ |