Compare commits

...

10 Commits

Author SHA1 Message Date
pyzhou-talend
b50fa7cfa2 fix parentheses 2020-01-13 11:59:23 +08:00
pyzhou-talend
33bae99e75 fix(TDI-43470):fix date and other compile issue 2020-01-08 14:50:16 +08:00
wwang-talend
888c8af052 Merge branch 'master' into wwang-talend/TDI-43101 2019-11-26 14:02:36 +08:00
wwang-talend
e5a1ca0ffe add a system property to disable this feature 2019-11-22 14:51:43 +08:00
wwang-talend
dda9b3663d fi something 2019-11-22 13:59:34 +08:00
wwang-talend
f074a40778 Merge branch 'master' into wwang-talend/TDI-43101
Conflicts:
	main/plugins/org.talend.designer.components.libs/libs_src/talend-job-monitoring/pom.xml
2019-11-22 13:40:57 +08:00
wwang-talend
77df7123fe fix(TDI-43101): fix it 2019-11-07 10:38:19 +08:00
wwang-talend
ffa2155f56 fix(TDI-43101): [Research] Generate a sample logging file for runtime
lineage
2019-11-06 18:28:04 +08:00
wwang-talend
fd44235cdd use release version 2019-11-06 14:01:49 +08:00
wwang-talend
3ee94dc088 fix(TDI-33262): Support for Apache Log4j2 2019-11-06 13:57:40 +08:00
7 changed files with 258 additions and 5 deletions

View File

@@ -51,6 +51,7 @@ String cid = node.getUniqueName();
List<? extends INode> jobCatcherNodes = process.getNodesOfType("tJobStructureCatcher");
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
INode jobCatcherNode = enableLogStash ? jobCatcherNodes.get(0) : null;
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
@@ -150,6 +151,7 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
}
}
}
List<IMetadataTable> metadatas = node.getMetadataList();
if ((!node.isSubProcessStart())&&(NodeUtil.isDataAutoPropagated(node))) {
if (inputColName!=null) {
@@ -175,6 +177,119 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
}
}
}
//log runtime lineage
boolean enable_runtime_lineage_log = Boolean.getBoolean("enable_runtime_lineage_log");
if(enable_runtime_lineage_log && jobCatcherNode!=null) {//}
List<? extends IConnection> outConns = node.getOutgoingConnections();
if(!outConns.isEmpty()) {
%>
if(tos_count_<%=node.getUniqueName() %> == 0) {
<%
//}
}
for (IConnection conn : outConns) {
if(!conn.getLineStyle().equals(EConnectionType.FLOW_MAIN) && !conn.getLineStyle().equals(EConnectionType.FLOW_MERGE) && !conn.getLineStyle().equals(EConnectionType.FLOW_REF)) {
continue;
}
IMetadataTable metadata = conn.getMetadataTable();
if (metadata==null) {
continue;
}
List<IMetadataColumn> columns = metadata.getListColumns();
if(columns == null || columns.isEmpty()) {
continue;
}
%>
class SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> {
public java.util.List<java.util.Map<String, String>> getSchema(final <%=NodeUtil.getPrivateConnClassName(conn) %>Struct <%=conn.getName()%>) {
java.util.List<java.util.Map<String, String>> schema = new java.util.ArrayList<>();
if(<%=conn.getName()%> == null) {
return schema;
}
java.util.Map<String, String> field = null;
<%
for(IMetadataColumn column : columns){
if("id_Dynamic".equals(column.getTalendType())) {
%>
routines.system.Dynamic dynamic = <%=conn.getName()%>.<%=column.getLabel()%>;
if(dynamic != null) {
for(routines.system.DynamicMetadata metadata : dynamic.metadatas) {
field = new java.util.HashMap<>();
field.put("name", metadata.getName());
field.put("origin_name", metadata.getDbName());
field.put("iskey", "" + metadata.isKey());
field.put("talend_type", metadata.getType());
field.put("type", metadata.getDbType());
field.put("nullable", "" + metadata.isNullable());
field.put("pattern", metadata.getFormat());
field.put("length", "" + metadata.getLength());
field.put("precision", "" + metadata.getPrecision());
schema.add(field);
}
}
<%
continue;
}
String pattern = column.getPattern();
if(pattern == null || pattern.isEmpty() || pattern.equals("\"\"")) {
pattern = "\"\"";
}
%>
field = new java.util.HashMap<>();
field.put("name", "<%=column.getLabel()%>");
field.put("origin_name", "<%=column.getOriginalDbColumnName()%>");
field.put("iskey", "<%=column.isKey()%>");
field.put("talend_type", "<%=column.getTalendType()%>");
field.put("type", "<%=column.getType()%>");
field.put("nullable", "<%=column.isNullable()%>");
field.put("pattern", <%=pattern%>);
field.put("length", "<%=column.getLength()%>");
field.put("precision", "<%=column.getPrecision()%>");
schema.add(field);
<%
}
%>
return schema;
}
}
java.util.List<java.util.Map<String, String>> schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> = new SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>().getSchema(<%=conn.getName()%>);
<%
INode target = conn.getTarget();
String targetNodeId = target.getUniqueName();
String targetNodeComponent = target.getComponent().getName();
%>
<%=jobCatcherNode.getUniqueName()%>.addConnectionSchemaMessage("<%=node.getUniqueName()%>","<%=node.getComponent().getName()%>",
"<%=targetNodeId%>","<%=targetNodeComponent%>", "<%=conn.getUniqueName()%>" + iterateId, schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>);
<%=jobCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
if(!outConns.isEmpty()) {
//{
%>
}
<%
}
//{
}
//======================================TDI-17183 end=====================================
boolean traceCodeGenerated = false;
for (IConnection conn : node.getOutgoingConnections()) {

View File

@@ -142,6 +142,7 @@
List<? extends INode> jobCatcherNodes = node.getProcess().getNodesOfType("tJobStructureCatcher");
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
INode jobCatcherNode = enableLogStash ? jobCatcherNodes.get(0) : null;
String cid = node.getUniqueName();
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
@@ -247,6 +248,69 @@
log.logCompSetting();
if(logstashCurrent) {
boolean enable_runtime_lineage_log = Boolean.getBoolean("enable_runtime_lineage_log");
if(enable_runtime_lineage_log) {
%>
class ParameterUtil_<%=cid%>{
public java.util.Map<String, String> getParameter() throws Exception{
java.util.Map<String, String> component_parameters = new java.util.HashMap<>();
<%
java.util.Set<org.talend.core.model.process.EParameterFieldType> ignoredParamsTypes = new java.util.HashSet<org.talend.core.model.process.EParameterFieldType>();
ignoredParamsTypes.addAll(
java.util.Arrays.asList(
org.talend.core.model.process.EParameterFieldType.SCHEMA_TYPE,
org.talend.core.model.process.EParameterFieldType.LABEL,
org.talend.core.model.process.EParameterFieldType.EXTERNAL,
org.talend.core.model.process.EParameterFieldType.MAPPING_TYPE,
org.talend.core.model.process.EParameterFieldType.IMAGE,
org.talend.core.model.process.EParameterFieldType.TNS_EDITOR,
org.talend.core.model.process.EParameterFieldType.WSDL2JAVA,
org.talend.core.model.process.EParameterFieldType.GENERATEGRAMMARCONTROLLER,
org.talend.core.model.process.EParameterFieldType.GENERATE_SURVIVORSHIP_RULES_CONTROLLER,
org.talend.core.model.process.EParameterFieldType.REFRESH_REPORTS,
org.talend.core.model.process.EParameterFieldType.BROWSE_REPORTS,
org.talend.core.model.process.EParameterFieldType.PALO_DIM_SELECTION,
org.talend.core.model.process.EParameterFieldType.GUESS_SCHEMA,
org.talend.core.model.process.EParameterFieldType.MATCH_RULE_IMEX_CONTROLLER,
org.talend.core.model.process.EParameterFieldType.MEMO_PERL,
org.talend.core.model.process.EParameterFieldType.DBTYPE_LIST,
org.talend.core.model.process.EParameterFieldType.VERSION,
org.talend.core.model.process.EParameterFieldType.TECHNICAL,
org.talend.core.model.process.EParameterFieldType.ICON_SELECTION,
org.talend.core.model.process.EParameterFieldType.JAVA_COMMAND,
org.talend.core.model.process.EParameterFieldType.TREE_TABLE,
org.talend.core.model.process.EParameterFieldType.VALIDATION_RULE_TYPE,
org.talend.core.model.process.EParameterFieldType.DCSCHEMA,
org.talend.core.model.process.EParameterFieldType.SURVIVOR_RELATION,
org.talend.core.model.process.EParameterFieldType.REST_RESPONSE_SCHEMA_TYPE
)
);
for(org.talend.core.model.process.IElementParameter ep : org.talend.core.model.utils.NodeUtil.getDisplayedParameters(node)){
if(!ep.isLog4JEnabled() || ignoredParamsTypes.contains(ep.getFieldType())){
continue;
}
String name = ep.getName();
if(org.talend.core.model.process.EParameterFieldType.PASSWORD.equals(ep.getFieldType())){
//not log password
}else{
String value = org.talend.core.model.utils.NodeUtil.getRuntimeParameterValue(node, ep);
%>
component_parameters.put("<%=name%>", String.valueOf(<%=value%>));
<%
}
}
%>
return component_parameters;
}
}
<%=jobCatcherNode.getUniqueName()%>.addComponentParameterMessage("<%=node.getUniqueName()%>", "<%=node.getComponent().getName()%>",
new ParameterUtil_<%=cid%>().getParameter());
<%=jobCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
for (INode jobStructureCatcher : jobCatcherNodes) {
%>
if(enableLogStash) {

View File

@@ -501,8 +501,39 @@
break;
}
}
if(jobCatcherNode!=null) {
String location = ElementParameterParser.getValue(jobCatcherNode, "__LOCATION__");
if(jobCatcherNode!=null) {
boolean enable_runtime_lineage_log = Boolean.getBoolean("enable_runtime_lineage_log");
if(enable_runtime_lineage_log) {
%>
java.util.Properties p_<%=jobCatcherNode.getUniqueName()%> = new java.util.Properties();
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("root.logger", "runtime");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("encoding", "UTF-8");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("application.name", "Talend Studio");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("service.name", "Talend Studio Job");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("instance.name", "Talend Studio Job Instance");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("propagate.appender.exceptions", "none");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("log.appender", "file");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.path", "runtime.json");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.maxsize", "52428800");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.maxbackup", "20");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("host", "false");
<%if(isLog4jEnabled) {%>
<%if(isLog4j1Enabled) {%>
org.apache.log4j.Logger.getLogger("runtime").setLevel(org.apache.log4j.Level.DEBUG);
<%}%>
<%if(isLog4j2Enabled) {%>
org.apache.logging.log4j.core.config.Configurator.setLevel("runtime", org.apache.logging.log4j.Level.DEBUG);
<%}%>
<%} else {%>
org.apache.log4j.Logger.getLogger("runtime").setLevel(org.apache.log4j.Level.DEBUG);
<%}%>
runtime_lineage_logger_<%=jobCatcherNode.getUniqueName()%> = org.talend.job.audit.JobEventAuditLoggerFactory.createJobAuditLogger(p_<%=jobCatcherNode.getUniqueName()%>);
<%
}
String location = ElementParameterParser.getValue(jobCatcherNode, "__LOCATION__");
%>
if(enableLogStash) {
java.util.Properties properties_<%=jobCatcherNode.getUniqueName()%> = new java.util.Properties();
@@ -537,7 +568,7 @@
}
<%
}
%>
%>
if(clientHost == null) {
clientHost = defaultClientHost;

View File

@@ -372,6 +372,8 @@ public <%=JavaTypesManager.getTypeToGenerate(ctxParam.getType(),true)%> get<%=Ch
%>
<%
boolean enable_runtime_lineage_log = Boolean.getBoolean("enable_runtime_lineage_log");
INode jobCatcherNode = null;
for (INode nodeInProcess : processNodes) {
String componentName = nodeInProcess.getComponent().getName();
@@ -427,8 +429,13 @@ private RunTrace runTrace = new RunTrace();
JobStructureCatcherUtils <%=jobCatcherNode.getUniqueName() %> = new JobStructureCatcherUtils(jobName, "<%=process.getId() %>", "<%=process.getVersion() %>");
org.talend.job.audit.JobAuditLogger auditLogger_<%=jobCatcherNode.getUniqueName()%> = null;
<%
if(enable_runtime_lineage_log) {
%>
org.talend.job.audit.JobAuditLogger runtime_lineage_logger_<%=jobCatcherNode.getUniqueName()%> = null;
<%
}
}
for (INode metterCatcher : process.getNodesOfType("tFlowMeterCatcher")) {
%>
MetterCatcherUtils <%=metterCatcher.getUniqueName() %> = new MetterCatcherUtils("<%=process.getId() %>", "<%=process.getVersion() %>");

View File

@@ -6,7 +6,7 @@
<groupId>org.talend.libraries</groupId>
<artifactId>job-audit</artifactId>
<version>1.1</version>
<version>1.2</version>
<properties>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>

View File

@@ -21,5 +21,11 @@ public interface JobAuditLogger extends EventAuditLogger {
@AuditEvent(category = "connection", message = "Component {connectorType} received {rows} rows in {duration} with {connectionName} line", level = LogLevel.INFO)
void flowInput(Context context);
@AuditEvent(category = "connector", message = "Component {connectorId} parameters : {connectorParameters}", level = LogLevel.INFO)
void componentParameters(Context context);
@AuditEvent(category = "connection", message = "{connectionName} : {schema} from {sourceConnectorId} to {targetConnectorId}", level = LogLevel.INFO)
void schema(Context context);
}

View File

@@ -82,6 +82,36 @@ public class JobContextBuilder {
builder.with("status", status);
return this;
}
public JobContextBuilder connectorParameters(String connectorParameters) {
builder.with("connectorParameters", connectorParameters);
return this;
}
public JobContextBuilder schema(String schema) {
builder.with("schema", schema);
return this;
}
public JobContextBuilder sourceConnectorId(String sourceConnectorId) {
builder.with("sourceConnectorId", sourceConnectorId);
return this;
}
public JobContextBuilder targetConnectorId(String targetConnectorId) {
builder.with("targetConnectorId", targetConnectorId);
return this;
}
public JobContextBuilder sourceConnectorType(String sourceConnectorType) {
builder.with("sourceConnectorType", sourceConnectorType);
return this;
}
public JobContextBuilder targetConnectorType(String targetConnectorType) {
builder.with("targetConnectorType", targetConnectorType);
return this;
}
public Context build() {
return builder.build();