Files
tdi-studio-se/main/plugins/org.talend.designer.codegen/jet_stub/component_part_footer.javajet

625 lines
23 KiB
Plaintext

<%@ jet
package="org.talend.designer.codegen.translators"
imports="
org.talend.core.model.process.INode
org.talend.core.model.temp.ECodePart
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.IConnection
org.talend.core.model.metadata.IMetadataTable
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.process.EConnectionType
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.codegen.config.NodesSubTree
org.talend.core.model.process.IProcess
org.talend.core.model.utils.NodeUtil
org.talend.core.model.metadata.types.JavaTypesManager
org.talend.core.model.metadata.types.JavaType
java.util.Map
java.util.List
java.util.ArrayList
java.util.Set
java.util.HashSet
"
class="ComponentPartFooter"
skeleton="subprocess_header_java.skeleton"
%>
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/LogUtil.javajet"%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
INode node = (INode)codeGenArgument.getArgument();
log = new LogUtil(node);
IProcess process = node.getProcess();
String startNodeCid=node.getDesignSubjobStartNode().getUniqueName();
NodesSubTree subTree = (NodesSubTree) codeGenArgument.getSubTree();
ECodePart codePart = codeGenArgument.getCodePart();
boolean trace = codeGenArgument.isTrace();
boolean stat = codeGenArgument.isStatistics();
boolean isRunInMultiThread = codeGenArgument.getIsRunInMultiThread();
Set<IConnection> connSet = new HashSet<IConnection>();
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MAIN));
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MERGE));
Set<IConnection> iterateConnSet = new HashSet<IConnection>();
iterateConnSet.addAll(node.getOutgoingConnections(EConnectionType.ITERATE));
List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeConnections();
String cid = node.getUniqueName();
List<? extends INode> jobCatcherNodes = process.getNodesOfType("tJobStructureCatcher");
boolean jobCatcherExists = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
INode jobCatcherNode = jobCatcherExists ? jobCatcherNodes.get(0) : null;
boolean enableLogStash = !Boolean.getBoolean("deactivate_extended_component_log") && jobCatcherExists;
boolean logstashCurrent = enableLogStash && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
boolean iterateInVFComp = (node.getVirtualLinkTo() != null && node.getVirtualLinkTo() == EConnectionType.ITERATE);
if(iterateInVFComp){
List<String> needToEndConnNames = new ArrayList<String>();
INode nextNode = node.getOutgoingConnections(EConnectionType.ITERATE).get(0).getTarget();
NodeUtil.fillConnectionsForStat(needToEndConnNames, nextNode);
if(!needToEndConnNames.isEmpty()) {
if(stat && logstashCurrent) {
%>
runStat.updateStatAndLog(execStat,enableLogStash,iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
<%
} else {
if(stat) {%>
if(execStat){
runStat.updateStatOnConnection(iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
}
<%}%>
<%if(logstashCurrent) {//now only finish the log, not send, TODO%>
if(enableLogStash){
runStat.log(iterateId,2,0<%for(String connName : needToEndConnNames){%>,"<%=connName%>"<%}%>);
}
<%
}
}
}
}
if(connSet.size()>0) {
if(stat && logstashCurrent && (connSet.size()==1)) {//the most common case, write this ugly logic for 65535 issue
for(IConnection con:connSet){
INode source = con.getSource();
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String sourceNodeComponent = source.getComponent().getName();
for (INode jobStructureCatcher : jobCatcherNodes) {
%>
if(runStat.updateStatAndLog(execStat,enableLogStash,resourceMap,iterateId,"<%=con.getUniqueName()%>",2,0,
"<%=sourceNodeId%>","<%=sourceNodeLabel%>","<%=sourceNodeComponent%>","<%=node.getUniqueName()%>","<%=targetNodeLabel%>","<%=node.getComponent().getName()%>","<%="REJECT".equals(con.getConnectorName()) ? "reject" : "output"%>")) {
<%=jobStructureCatcher.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
}
<%
break;
}
}
} else {
if(stat){
%>
if(execStat){
runStat.updateStat(resourceMap,iterateId,2,0<%for(IConnection con : connSet){%>,"<%=con.getUniqueName()%>"<%}%>);
}
<%
}
if(logstashCurrent){
%>
if(enableLogStash) {
<%
for(IConnection con:connSet){
INode source = con.getSource();
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String sourceNodeComponent = source.getComponent().getName();
for (INode jobStructureCatcher : jobCatcherNodes) {
%>
if(runStat.log(resourceMap,iterateId,"<%=con.getUniqueName()%>",2,0,
"<%=sourceNodeId%>","<%=sourceNodeLabel%>","<%=sourceNodeComponent%>","<%=node.getUniqueName()%>","<%=targetNodeLabel%>","<%=node.getComponent().getName()%>","<%="REJECT".equals(con.getConnectorName()) ? "reject" : "output"%>")) {
<%=jobStructureCatcher.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
}
<%
break;
}
}
%>
}
<%
}
}
}
}
%>
<%
if (codePart.equals(ECodePart.MAIN)) {
//================================TDI-17183 start========================================
/**
if the node is autopropagate the schema, then the output row should evaluate the input row
before the track code part(TDI-17183)
*/
String inputColName = null;
if (node.getIncomingConnections()!=null) {
for (IConnection incomingConn : node.getIncomingConnections()) {
if (incomingConn.getLineStyle().equals(EConnectionType.FLOW_MAIN)) {
inputColName = incomingConn.getName();
break;
}
}
}
List<IMetadataTable> metadatas = node.getMetadataList();
if ((!node.isSubProcessStart())&&(NodeUtil.isDataAutoPropagated(node))) {
if (inputColName!=null) {
if ((metadatas != null) && (metadatas.size() > 0)) {
IMetadataTable outputMetadata = metadatas.get(0);
if (outputMetadata != null) {
for (IConnection outputConnection : node.getOutgoingConnections()) {
if (outputConnection.getLineStyle().equals(EConnectionType.FLOW_MAIN) || outputConnection.getLineStyle().equals(EConnectionType.FLOW_MERGE)) {
//void the case: row1.username=row1.username
if(!outputConnection.getName().equals(inputColName)){//111
//void the case: row1Struct row1 = new row1Struct(); row1Struct row3 = row1;
//for (IMetadataColumn column : outputMetadata.getListColumns()) {
%>
<%=outputConnection.getName() %> = <%=inputColName %>;
<%
//}
}//111
}
}
}
}
}
}
//log runtime lineage
boolean enable_runtime_lineage_log = NodeUtil.isJobUsingRuntimeLineage(process) && jobCatcherExists && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
if(enable_runtime_lineage_log) {//}
List<? extends IConnection> outConns = node.getOutgoingConnections();
if(!outConns.isEmpty()) {
%>
if(tos_count_<%=node.getUniqueName() %> == 0) {
<%
//}
}
for (IConnection conn : outConns) {
if(!conn.getLineStyle().equals(EConnectionType.FLOW_MAIN) && !conn.getLineStyle().equals(EConnectionType.FLOW_MERGE) && !conn.getLineStyle().equals(EConnectionType.FLOW_REF)) {
continue;
}
IMetadataTable metadata = conn.getMetadataTable();
if (metadata==null) {
continue;
}
List<IMetadataColumn> columns = metadata.getListColumns();
if(columns == null || columns.isEmpty()) {
continue;
}
%>
class SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> {
public java.util.List<java.util.Map<String, String>> getSchema(final <%=NodeUtil.getPrivateConnClassName(conn) %>Struct <%=conn.getName()%>) {
java.util.List<java.util.Map<String, String>> schema = new java.util.ArrayList<>();
if(<%=conn.getName()%> == null) {
return schema;
}
java.util.Map<String, String> field = null;
<%
for(IMetadataColumn column : columns){
if("id_Dynamic".equals(column.getTalendType())) {
%>
routines.system.Dynamic dynamic = <%=conn.getName()%>.<%=column.getLabel()%>;
if(dynamic != null) {
for(routines.system.DynamicMetadata metadata : dynamic.metadatas) {
field = new java.util.HashMap<>();
field.put("name", metadata.getName());
field.put("origin_name", metadata.getDbName());
field.put("iskey", "" + metadata.isKey());
field.put("talend_type", metadata.getType());
field.put("type", metadata.getDbType());
field.put("nullable", "" + metadata.isNullable());
field.put("pattern", metadata.getFormat());
field.put("length", "" + metadata.getLength());
field.put("precision", "" + metadata.getPrecision());
schema.add(field);
}
}
<%
continue;
}
String pattern = column.getPattern();
if(pattern == null || pattern.isEmpty() || pattern.equals("\"\"")) {
pattern = "\"\"";
}
%>
field = new java.util.HashMap<>();
field.put("name", "<%=column.getLabel()%>");
field.put("origin_name", "<%=column.getOriginalDbColumnName()%>");
field.put("iskey", "<%=column.isKey()%>");
field.put("talend_type", "<%=column.getTalendType()%>");
field.put("type", "<%=column.getType()%>");
field.put("nullable", "<%=column.isNullable()%>");
field.put("pattern", <%=pattern%>);
field.put("length", "<%=column.getLength()%>");
field.put("precision", "<%=column.getPrecision()%>");
schema.add(field);
<%
}
%>
return schema;
}
}
java.util.List<java.util.Map<String, String>> schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> = new SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>().getSchema(<%=conn.getName()%>);
<%
INode target = conn.getTarget();
String targetNodeId = target.getUniqueName();
String targetNodeComponent = target.getComponent().getName();
%>
<%=jobCatcherNode.getUniqueName()%>.addConnectionSchemaMessage("<%=node.getUniqueName()%>","<%=node.getComponent().getName()%>",
"<%=targetNodeId%>","<%=targetNodeComponent%>", "<%=conn.getUniqueName()%>" + iterateId, schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>);
<%=jobCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
if(!outConns.isEmpty()) {
//{
%>
}
<%
}
//{
}
//======================================TDI-17183 end=====================================
boolean traceCodeGenerated = false;
for (IConnection conn : node.getOutgoingConnections()) {
boolean use_reaceenble = ("true").equals(ElementParameterParser.getValue(conn, "__TRACES_CONNECTION_ENABLE__"));
if(node.isSubProcessStart() && node.isSubProcessContainTraceBreakpoint() && !traceCodeGenerated) {
traceCodeGenerated = true;
%>
globalMap.put("USE_CONDITION",Boolean.TRUE);
globalMap.put("TRACE_CONDITION", Boolean.FALSE);
<%
}
IMetadataTable metadata = conn.getMetadataTable();
if (metadata!=null) {
if (conn.getLineStyle().equals(EConnectionType.FLOW_MAIN) ||
conn.getLineStyle().equals(EConnectionType.FLOW_MERGE)) {
if (trace && use_reaceenble) {
%>
if (<%=conn.getName() %> !=null) {
globalMap.put("ENABLE_TRACES_CONNECTION_<%=startNodeCid%>",Boolean.TRUE);
if (runTrace.isPause()) {
while (runTrace.isPause()) {
Thread.sleep(100);
}
} else {
// here we dump the line content for trace purpose
java.util.LinkedHashMap<String, String> runTraceData = new java.util.LinkedHashMap<String,String>();
<%
for(IMetadataColumn column : metadata.getListColumns()){
%>
runTraceData.put("<%=column.getLabel()%>", String.valueOf(<%=conn.getName()%>.<%=column.getLabel()%>));
<%
}
%>
runTrace.sendTrace("<%=conn.getName()%>","<%=startNodeCid%>", runTraceData);
}
<%
boolean use_breakout = ("true").equals(ElementParameterParser.getValue(conn, "__ACTIVEBREAKPOINT__"));
boolean use_advanced = ("true").equals(ElementParameterParser.getValue(conn, "__USE_ADVANCED__"));
List<Map<String, String>> keyColumns = (List<Map<String,String>>)ElementParameterParser.getObjectValue( conn, "__CONDITIONS__");
String advancedCondition = ElementParameterParser.getValue(conn, "__ADVANCED_COND__");
String logical = ElementParameterParser.getValue(conn,"__LOGICAL_OP__");
if (metadata != null) {
if(use_breakout && use_reaceenble){%>
if (
<%
if ((keyColumns!=null &&keyColumns.size() > 0)|| use_advanced) { //111
int ii = 0;
for (Map<String, String> keyColumn : keyColumns){ // 222 ->
ii++;
%>
<%= ii == 1 ? "" : logical %>(
<%
if (!("").equals(keyColumn.get("FUNCTION"))) {
%>
<%=keyColumn.get("FUNCTION").replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
<%
}else {
IMetadataColumn rightColumn = metadata.getColumn(keyColumn.get("INPUT_COLUMN"));
JavaType javaType = JavaTypesManager.getJavaTypeFromId(rightColumn.getTalendType());
if (!javaType.isPrimitive()) {
//this is only for bug:8133, when "Oject" type, and "Empty" function, and compare with "null"
if (keyColumn.get("RVALUE") != null && keyColumn.get("RVALUE").equals("null")){
%>
<%="$source $operator $target".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
<%}else{%>
<%="$source == null? false : $source.compareTo($target) $operator 0".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
<%
}
} else {
%>
<%="$source $operator $target".replace("$source", conn.getName() + "." + keyColumn.get("INPUT_COLUMN")).replace("$target", keyColumn.get("RVALUE")).replace("$operator", keyColumn.get("OPERATOR")) %>
<%
}
}
%>
)
<%
} // <- 222
%>
<%=use_advanced? (keyColumns.size() < 1 ? "" : logical) + advancedCondition.replace("input_row", conn.getName()) : ""%>
<%
} else { //111
%>
true
<%
} //111
%>
){
globalMap.put("TRACE_CONDITION", Boolean.TRUE);
}
<%
}
%>
}
<%
}
}
}
}
}
}
if (codePart.equals(ECodePart.END)) {
log.endWork();
%>
ok_Hash.put("<%=node.getUniqueName() %>", true);
end_Hash.put("<%=node.getUniqueName() %>", System.currentTimeMillis());
<%
String statCatcher = ElementParameterParser.getValue(node,"__TSTATCATCHER_STATS__");
if ((node.getProcess().getNodesOfType("tStatCatcher").size() > 0) & (statCatcher.equals("true"))) {
for (INode statCatcherNode : node.getProcess().getNodesOfType("tStatCatcher")) {
%>
<%=statCatcherNode.getUniqueName() %>.addMessage("end","<%=node.getUniqueName() %>", end_Hash.get("<%=node.getUniqueName() %>")-start_Hash.get("<%=node.getUniqueName() %>"));
<%=statCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
}
for (IConnection outgoingConn : node.getOutgoingConnections()) {
if (outgoingConn.getTarget().isActivate()) {
if (outgoingConn.getLineStyle().equals(EConnectionType.ON_COMPONENT_OK)) {
//send the ok status to socket
if(stat){
%>
if(execStat){
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "ok");
}
<%
}
/*
This boolean is created for the ON_COMPONENT_OK virtual components.
This technic is used on the tWriteXMLField for example. Since the sub method is called in the Thread, we do not have to generate again in the component footer.
*/
boolean generateMethodCall = false;
if((!node.isVirtualGenerateNode())) {
generateMethodCall = true;
} else {
if(node.getVirtualLinkTo()==null || node.getVirtualLinkTo()!=EConnectionType.ON_COMPONENT_OK) {
generateMethodCall = true;
}
// Else, that means the component is a ON_COMPONENT_OK virtual component typed.
}
if(generateMethodCall) {
/*check if parralel iterate call not finished from this component */
boolean parallelIterate = false;
for (IConnection iterateConn : iterateConnSet) {
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
if (parallelIterate) {
if (codePart.equals(ECodePart.END)) {
String iterateTargetNodeName = iterateConn.getTarget().getUniqueName();
%>
mtp_<%=iterateTargetNodeName %>.waitForEndOfQueue();
<%
}
}
}
%>
<%=outgoingConn.getTarget().getUniqueName() %>Process(globalMap);
<%
}
}
if (outgoingConn.getLineStyle().equals(EConnectionType.RUN_IF)) {
%>
if (<%=outgoingConn.getCondition() %>) {
<%//send the true status to socket
if(stat){
%>
if(execStat){
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "true");
}
<%
}
/*check if parralel iterate call not finished from this component */
boolean parallelIterate = false;
for (IConnection iterateConn : iterateConnSet) {
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
if (parallelIterate) {
if (codePart.equals(ECodePart.END)) {
String iterateTargetNodeName = iterateConn.getTarget().getUniqueName();
%>
mtp_<%=iterateTargetNodeName %>.waitForEndOfQueue();
<%
}
}
}
%>
<%=outgoingConn.getTarget().getUniqueName() %>Process(globalMap);
}
<%
//send the false status to socket
if(stat){
%>
else{
if(execStat){
runStat.updateStatOnConnection("<%=outgoingConn.getUniqueName() %>", 0, "false");
}
}
<%
}
}
}
}
}
%>
<%
//This part in order to feedback with the iterate_subprocess_header.javajet and iterate_subprocess_footer.javajet
boolean parallelIterate = false;
for (IConnection iterateConn : iterateConnSet) {
parallelIterate = "true".equals(ElementParameterParser.getValue(iterateConn, "__ENABLE_PARALLEL__"));
if (parallelIterate) {
if (codePart.equals(ECodePart.END)) {
String iterateTargetNodeName = iterateConn.getTarget().getUniqueName();
%>
mtp_<%=iterateTargetNodeName %>.waitForEndOfQueue();
TalendThread errorThread_<%=iterateTargetNodeName %> = mtp_<%=iterateTargetNodeName %>.getErrorThread();
<%if(!isRunInMultiThread){%>
if(errorThread_<%=iterateTargetNodeName %> != null) {
if (errorThread_<%=iterateTargetNodeName %>.errorCode != null) {
if (errorCode == null
|| errorThread_<%=iterateTargetNodeName %>.errorCode.compareTo(errorCode) > 0) {
errorCode = errorThread_<%=iterateTargetNodeName %>.errorCode;
}
}
if (!status.equals("failure")) {
status = errorThread_<%=iterateTargetNodeName %>.status;
}
if(errorThread_<%=iterateTargetNodeName %>.exception!=null){
throw errorThread_<%=iterateTargetNodeName %>.exception;
}
if(errorThread_<%=iterateTargetNodeName %>.error!=null){
throw errorThread_<%=iterateTargetNodeName %>.error;
}
}else{
Integer threadErrorCode = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getErrorCode();
String threadStatus = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getStatus();
if (threadErrorCode != null) {
if (errorCode == null
|| threadErrorCode.compareTo(errorCode) > 0) {
errorCode = threadErrorCode;
}
}
if (!status.equals("failure")) {
status = threadStatus;
}
}
<%}else{%>
if(errorThread_<%=iterateTargetNodeName %> != null) {
Integer localErrorCode = (Integer) (((java.util.Map) threadLocal.get()).get("errorCode"));
String localStatus = (String) (((java.util.Map) threadLocal.get()).get("status"));
if (errorThread_<%=iterateTargetNodeName %>.errorCode != null) {
if (localErrorCode == null || errorThread_<%=iterateTargetNodeName %>.errorCode.compareTo(localErrorCode) > 0) {
((java.util.Map) threadLocal.get()).put("errorCode", errorThread_<%=iterateTargetNodeName %>.errorCode);
}
}
if (!localStatus.equals("failure")) {
((java.util.Map) threadLocal.get()).put("status", errorThread_<%=iterateTargetNodeName %>.status);
}
if(errorThread_<%=iterateTargetNodeName %>.exception!=null){
throw errorThread_<%=iterateTargetNodeName %>.exception;
}
}else{
Integer threadErrorCode = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getErrorCode();
String threadStatus = mtp_<%=iterateTargetNodeName %>.getTalendThreadResult().getStatus();
Integer localErrorCode = (Integer) (((java.util.Map) threadLocal
.get()).get("errorCode"));
String localStatus = (String) (((java.util.Map) threadLocal
.get()).get("status"));
if (threadErrorCode != null) {
if (localErrorCode == null
|| threadErrorCode.compareTo(localErrorCode) > 0) {
((java.util.Map) threadLocal.get()).put("errorCode",
threadErrorCode);
}
}
if (!localStatus.equals("failure")) {
((java.util.Map) threadLocal.get()).put("status",
threadStatus);
}
}
<%}%>
<%
}
continue;
}
}
%>
<%if(codePart.equals(ECodePart.MAIN)){ %>
tos_count_<%=node.getUniqueName() %>++;
<%}%>
/**
* [<%=node.getUniqueName() %> <%=codePart %> ] stop
*/