Compare commits
4 Commits
release/7.
...
wwang-tale
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
29b3e12354 | ||
|
|
f186c5f223 | ||
|
|
ad3493d43a | ||
|
|
66d821d489 |
@@ -38,6 +38,8 @@ public class JobStructureCatcherUtils {
|
||||
public class JobStructureCatcherMessage {
|
||||
|
||||
public String component_id;
|
||||
|
||||
public String component_label;
|
||||
|
||||
public String component_name;
|
||||
|
||||
@@ -57,7 +59,7 @@ public class JobStructureCatcherUtils {
|
||||
|
||||
public String job_version;
|
||||
|
||||
public Long systemPid;
|
||||
public Long systemPid = JobStructureCatcherUtils.getPid();
|
||||
|
||||
public boolean current_connector_as_input;
|
||||
|
||||
@@ -66,6 +68,13 @@ public class JobStructureCatcherUtils {
|
||||
public String current_connector;
|
||||
|
||||
public String currrent_row_content;
|
||||
|
||||
public String sourceId;
|
||||
public String sourceLabel;
|
||||
public String sourceComponentName;
|
||||
public String targetId;
|
||||
public String targetLabel;
|
||||
public String targetComponentName;
|
||||
|
||||
public long row_count;
|
||||
|
||||
@@ -75,44 +84,25 @@ public class JobStructureCatcherUtils {
|
||||
|
||||
public long end_time;
|
||||
|
||||
public String moment;
|
||||
public String moment = sdf.format(new Date());
|
||||
|
||||
public String status;
|
||||
|
||||
public LogType log_type;
|
||||
|
||||
public JobStructureCatcherMessage(String component_id, String component_name,
|
||||
Map<String, String> component_parameters, List<Map<String, String>> component_schema,
|
||||
String input_connectors, String output_connectors,
|
||||
Map<String, String> connector_name_2_connector_schema, String job_name, String job_id,
|
||||
String job_version, boolean current_connector_as_input, String current_connector_type,
|
||||
String current_connector, String currrent_row_content, long row_count, long total_row_number,
|
||||
long start_time, long end_time, String status) {
|
||||
this.component_id = component_id;
|
||||
this.component_name = component_name;
|
||||
this.component_parameters = component_parameters;
|
||||
this.component_schema = component_schema;
|
||||
this.input_connectors = input_connectors;
|
||||
this.output_connectors = output_connectors;
|
||||
this.connector_name_2_connector_schema = connector_name_2_connector_schema;
|
||||
|
||||
this.job_name = job_name;
|
||||
this.job_version = job_version;
|
||||
this.job_id = job_id;
|
||||
this.systemPid = JobStructureCatcherUtils.getPid();
|
||||
|
||||
this.current_connector_as_input = current_connector_as_input;
|
||||
this.current_connector_type = current_connector_type;
|
||||
this.current_connector = current_connector;
|
||||
this.currrent_row_content = currrent_row_content;
|
||||
this.row_count = row_count;
|
||||
this.total_row_number = total_row_number;
|
||||
this.start_time = start_time;
|
||||
this.end_time = end_time;
|
||||
|
||||
this.moment = sdf.format(new Date());
|
||||
this.status = status;
|
||||
public JobStructureCatcherMessage() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static enum LogType {
|
||||
JOBSTART,
|
||||
JOBEND,
|
||||
RUNCOMPONENT,
|
||||
FLOWINPUT,
|
||||
FLOWOUTPUT,
|
||||
PERFORMANCE
|
||||
}
|
||||
|
||||
java.util.List<JobStructureCatcherMessage> messages = java.util.Collections
|
||||
.synchronizedList(new java.util.ArrayList<JobStructureCatcherMessage>());
|
||||
@@ -129,38 +119,72 @@ public class JobStructureCatcherUtils {
|
||||
this.job_version = jobVersion;
|
||||
}
|
||||
|
||||
private void addMessage(String component_id, String component_name, Map<String, String> component_parameters,
|
||||
List<Map<String, String>> component_schema, String input_connectors, String output_connectors,
|
||||
Map<String, String> connector_name_2_connector_schema, boolean current_connector_as_input,
|
||||
String current_connector_type, String current_connector, String currrent_row_content, long row_count,
|
||||
long total_row_number, long start_time, long end_time, String status) {
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage(component_id, component_name,
|
||||
component_parameters, component_schema, input_connectors, output_connectors,
|
||||
connector_name_2_connector_schema, this.job_name, this.job_id, this.job_version,
|
||||
current_connector_as_input, current_connector_type, current_connector, currrent_row_content, row_count,
|
||||
total_row_number, start_time, end_time, status);
|
||||
public void addConnectionMessage(String component_id, String component_label, String component_name, boolean current_connector_as_input,
|
||||
String current_connector_type, String current_connector, long total_row_number, long start_time,
|
||||
long end_time) {
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage();
|
||||
scm.job_name = this.job_name;
|
||||
scm.job_id = this.job_id;
|
||||
scm.job_version = this.job_version;
|
||||
|
||||
scm.component_id = component_id;
|
||||
scm.component_label = component_label;
|
||||
scm.component_name = component_name;
|
||||
scm.current_connector_as_input = current_connector_as_input;
|
||||
scm.current_connector_type = current_connector_type;
|
||||
scm.current_connector = current_connector;
|
||||
scm.total_row_number = total_row_number;
|
||||
scm.start_time = start_time;
|
||||
scm.end_time = end_time;
|
||||
|
||||
if(current_connector_as_input) {
|
||||
scm.log_type = LogType.FLOWINPUT;
|
||||
} else {
|
||||
scm.log_type = LogType.FLOWOUTPUT;
|
||||
}
|
||||
|
||||
messages.add(scm);
|
||||
}
|
||||
|
||||
public void addConnectionMessage(String component_id, String component_name, boolean current_connector_as_input,
|
||||
String current_connector_type, String current_connector, long total_row_number, long start_time,
|
||||
long end_time) {
|
||||
this.addMessage(component_id, component_name, null, null, null, null, null, current_connector_as_input,
|
||||
current_connector_type, current_connector, null, 0, total_row_number, start_time, end_time, null);
|
||||
}
|
||||
public void addCM(String component_id, String component_label, String component_name) {
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage();
|
||||
scm.job_name = this.job_name;
|
||||
scm.job_id = this.job_id;
|
||||
scm.job_version = this.job_version;
|
||||
|
||||
public void addCM(String component_id, String component_name) {
|
||||
this.addMessage(component_id, component_name, null, null, null, null, null, false, null, null,
|
||||
null, 0, 0, 0, 0, null);
|
||||
scm.component_id = component_id;
|
||||
scm.component_label = component_label;
|
||||
scm.component_name = component_name;
|
||||
|
||||
scm.log_type = LogType.RUNCOMPONENT;
|
||||
|
||||
messages.add(scm);
|
||||
}
|
||||
|
||||
public void addJobStartMessage() {
|
||||
this.addMessage(null, null, null, null, null, null, null, false, null, null, null, 0, 0, 0, 0, null);
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage();
|
||||
scm.job_name = this.job_name;
|
||||
scm.job_id = this.job_id;
|
||||
scm.job_version = this.job_version;
|
||||
|
||||
scm.log_type = LogType.JOBSTART;
|
||||
|
||||
messages.add(scm);
|
||||
}
|
||||
|
||||
public void addJobEndMessage(long start_time, long end_time, String status) {
|
||||
this.addMessage(null, null, null, null, null, null, null, false, null, null, null, 0, 0, start_time, end_time,
|
||||
status == "" ? "end" : status);
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage();
|
||||
scm.job_name = this.job_name;
|
||||
scm.job_id = this.job_id;
|
||||
scm.job_version = this.job_version;
|
||||
|
||||
scm.status = (status == "" ? "end" : status);
|
||||
scm.start_time = start_time;
|
||||
scm.end_time = end_time;
|
||||
|
||||
scm.log_type = LogType.JOBEND;
|
||||
|
||||
messages.add(scm);
|
||||
}
|
||||
|
||||
public java.util.List<JobStructureCatcherMessage> getMessages() {
|
||||
@@ -183,4 +207,32 @@ public class JobStructureCatcherUtils {
|
||||
return Thread.currentThread().getId();
|
||||
}
|
||||
}
|
||||
|
||||
public void addConnectionMessage4PerformanceMonitor(String current_connector, String sourceId, String sourceLabel,
|
||||
String sourceComponentName, String targetId, String targetLabel, String targetComponentName, int row_count,
|
||||
long start_time, long end_time) {
|
||||
JobStructureCatcherMessage scm = new JobStructureCatcherMessage();
|
||||
scm.job_name = this.job_name;
|
||||
scm.job_id = this.job_id;
|
||||
scm.job_version = this.job_version;
|
||||
|
||||
scm.current_connector = current_connector;
|
||||
|
||||
scm.sourceId = sourceId;
|
||||
scm.sourceLabel = sourceLabel;
|
||||
scm.sourceComponentName = sourceComponentName;
|
||||
|
||||
scm.targetId = targetId;
|
||||
scm.targetLabel = targetLabel;
|
||||
scm.targetComponentName = targetComponentName;
|
||||
|
||||
scm.row_count = row_count;
|
||||
scm.start_time = start_time;
|
||||
scm.end_time = end_time;
|
||||
|
||||
scm.log_type = LogType.PERFORMANCE;
|
||||
|
||||
messages.add(scm);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,16 @@ public class RunStat implements Runnable {
|
||||
public static String TYPE0_JOB = "0";
|
||||
|
||||
public static String TYPE1_CONNECTION = "1";
|
||||
|
||||
public RunStat() {
|
||||
jscu = null;
|
||||
}
|
||||
|
||||
private final JobStructureCatcherUtils jscu;
|
||||
|
||||
public RunStat(JobStructureCatcherUtils jscu) {
|
||||
this.jscu = jscu;
|
||||
}
|
||||
|
||||
private class StatBean {
|
||||
|
||||
@@ -332,38 +342,31 @@ public class RunStat implements Runnable {
|
||||
|
||||
private Map<String, StatBean> processStats4Meter = new HashMap<String, StatBean>();
|
||||
|
||||
private List<String> keysList4Meter = new LinkedList<String>();
|
||||
|
||||
private final static long INTERVAL = 500;
|
||||
|
||||
private long lastLogUpdate = 0;
|
||||
|
||||
public synchronized boolean log(String connectionId, int mode, int nbLine,
|
||||
String sourceId, String sourceLabel, String sourceComponentName,
|
||||
String targetId, String targetLabel, String targetComponentName) {
|
||||
boolean emit = false;
|
||||
|
||||
StatBean stateBean = log(connectionId, mode, nbLine);
|
||||
|
||||
long currentLogUpdate = System.currentTimeMillis();
|
||||
if (lastLogUpdate == 0 || lastLogUpdate + INTERVAL < currentLogUpdate) {
|
||||
lastLogUpdate = currentLogUpdate;
|
||||
jscu.addConnectionMessage4PerformanceMonitor(
|
||||
connectionId, sourceId, sourceLabel, sourceComponentName, targetId, targetLabel, targetComponentName, stateBean.nbLine, stateBean.startTime, currentLogUpdate);
|
||||
emit = true;
|
||||
}
|
||||
|
||||
return emit;
|
||||
}
|
||||
|
||||
public synchronized StatBean log(String connectionId, int mode, int nbLine) {
|
||||
StatBean bean;
|
||||
String key = connectionId;
|
||||
if (connectionId.contains(".")) {
|
||||
String firstKey = null;
|
||||
String connectionName = connectionId.split("\\.")[0];
|
||||
int nbKeys = 0;
|
||||
for (String myKey : keysList4Meter) {
|
||||
if (myKey.startsWith(connectionName + ".")) {
|
||||
if (firstKey == null) {
|
||||
firstKey = myKey;
|
||||
}
|
||||
nbKeys++;
|
||||
if (nbKeys == 4) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (nbKeys == 4) {
|
||||
keysList4Meter.remove(firstKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (keysList4Meter.contains(key)) {
|
||||
int keyNb = keysList4Meter.indexOf(key);
|
||||
keysList4Meter.remove(key);
|
||||
keysList4Meter.add(keyNb, key);
|
||||
} else {
|
||||
keysList4Meter.add(key);
|
||||
}
|
||||
|
||||
if (processStats4Meter.containsKey(key)) {
|
||||
bean = processStats4Meter.get(key);
|
||||
@@ -371,7 +374,10 @@ public class RunStat implements Runnable {
|
||||
bean = new StatBean(connectionId);
|
||||
}
|
||||
|
||||
bean.setState(mode);
|
||||
bean.setNbLine(bean.getNbLine() + nbLine);
|
||||
//not set it, to avoid too many call as System.currentTimeMillis() is not fast
|
||||
//bean.setEndTime(System.currentTimeMillis());
|
||||
processStats4Meter.put(key, bean);
|
||||
|
||||
if (mode == BEGIN) {
|
||||
@@ -379,25 +385,29 @@ public class RunStat implements Runnable {
|
||||
bean.setStartTime(System.currentTimeMillis());
|
||||
} else if(mode == END) {
|
||||
bean.setEndTime(System.currentTimeMillis());
|
||||
|
||||
processStats4Meter.remove(key);
|
||||
|
||||
keysList4Meter.clear();
|
||||
processStats4Meter.remove(key);
|
||||
}
|
||||
|
||||
return bean;
|
||||
}
|
||||
|
||||
public synchronized boolean log(Map<String, Object> resourceMap, String iterateId, String connectionUniqueName, int mode, int nbLine,
|
||||
JobStructureCatcherUtils jscu, String sourceNodeId, String sourceNodeComponent, String targetNodeId, String targetNodeComponent, String lineType) {
|
||||
String sourceNodeId, String sourceNodeLabel, String sourceNodeComponent, String targetNodeId, String targetNodeLabel, String targetNodeComponent, String lineType) {
|
||||
if(resourceMap.get("inIterateVComp") == null || !((Boolean)resourceMap.get("inIterateVComp"))) {
|
||||
StatBean bean = log(connectionUniqueName, mode, nbLine);
|
||||
StatBean bean = log(connectionUniqueName, mode, nbLine);//TODO use connectionUniqueName + iterateId here?
|
||||
|
||||
String connectionId = connectionUniqueName+iterateId;
|
||||
|
||||
jscu.addConnectionMessage4PerformanceMonitor(
|
||||
connectionId, sourceNodeId, sourceNodeLabel, sourceNodeComponent, targetNodeId, targetNodeLabel, targetNodeComponent, bean.nbLine, bean.startTime, bean.endTime);
|
||||
|
||||
jscu.addConnectionMessage(
|
||||
sourceNodeId,
|
||||
sourceNodeId,
|
||||
sourceNodeLabel,
|
||||
sourceNodeComponent,
|
||||
false,
|
||||
lineType,
|
||||
connectionUniqueName+iterateId,
|
||||
connectionId,
|
||||
bean.getNbLine(),
|
||||
bean.getStartTime(),
|
||||
bean.getEndTime()
|
||||
@@ -405,10 +415,11 @@ public class RunStat implements Runnable {
|
||||
|
||||
jscu.addConnectionMessage(
|
||||
targetNodeId,
|
||||
targetNodeLabel,
|
||||
targetNodeComponent,
|
||||
true,
|
||||
"input",
|
||||
connectionUniqueName+iterateId,
|
||||
connectionId,
|
||||
bean.getNbLine(),
|
||||
bean.getStartTime(),
|
||||
bean.getEndTime()
|
||||
@@ -435,14 +446,14 @@ public class RunStat implements Runnable {
|
||||
* work for avoiding the 65535 issue
|
||||
*/
|
||||
public synchronized boolean updateStatAndLog(boolean execStat, boolean enableLogStash, Map<String, Object> resourceMap, String iterateId, String connectionUniqueName, int mode, int nbLine,
|
||||
JobStructureCatcherUtils jscu, String sourceNodeId, String sourceNodeComponent, String targetNodeId, String targetNodeComponent, String lineType) {
|
||||
String sourceNodeId, String sourceNodeLabel, String sourceNodeComponent, String targetNodeId, String targetNodeLabel, String targetNodeComponent, String lineType) {
|
||||
if(execStat) {
|
||||
updateStat(resourceMap, iterateId, mode, nbLine, connectionUniqueName);
|
||||
}
|
||||
|
||||
if(enableLogStash) {
|
||||
return log(resourceMap, iterateId, connectionUniqueName, mode, nbLine,
|
||||
jscu, sourceNodeId, sourceNodeComponent, targetNodeId, targetNodeComponent, lineType);
|
||||
sourceNodeId, sourceNodeLabel, sourceNodeComponent, targetNodeId, targetNodeLabel, targetNodeComponent, lineType);
|
||||
}
|
||||
|
||||
|
||||
@@ -502,6 +513,23 @@ public class RunStat implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* update stats
|
||||
* @param execStat
|
||||
* @param enableLogStash
|
||||
* @param iterateId
|
||||
* @param mode
|
||||
* @param nbLine
|
||||
* @param connectionUniqueNames
|
||||
*/
|
||||
private synchronized void updateStat(String iterateId, int mode, int nbLine, String... informationGroup) {
|
||||
for(int i=0;i<informationGroup.length;i++) {
|
||||
if((i % 7) == 0) {
|
||||
updateStatOnConnection(informationGroup[i]+iterateId, mode, nbLine);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* work for avoiding the 65535 issue
|
||||
*/
|
||||
@@ -511,6 +539,27 @@ public class RunStat implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* update logs for performance monitor
|
||||
* @param execStat
|
||||
* @param enableLogStash
|
||||
* @param iterateId
|
||||
* @param mode
|
||||
* @param nbLine
|
||||
* @param connectionUniqueNames
|
||||
*/
|
||||
public synchronized boolean updateLog(String iterateId, int mode, int nbLine, String... informationGroup) {
|
||||
boolean emit = false;
|
||||
for(int i=0;i<informationGroup.length;i++) {
|
||||
if((i % 7) == 0) {
|
||||
//informationGroup ==> [connectionid, sourceid, sourcelabel, sourcecomponentname, targetid, targetlabel, targetcomponentname, ...]
|
||||
emit |= log(informationGroup[i]+iterateId, mode, nbLine,
|
||||
informationGroup[i+1], informationGroup[i+2], informationGroup[i+3],informationGroup[i+4], informationGroup[i+5], informationGroup[i+6]);
|
||||
}
|
||||
}
|
||||
return emit;
|
||||
}
|
||||
|
||||
/**
|
||||
* TBD-9420 fix
|
||||
*/
|
||||
@@ -531,6 +580,28 @@ public class RunStat implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* update states, and logs for performance monitor
|
||||
* @param execStat
|
||||
* @param enableLogStash
|
||||
* @param iterateId
|
||||
* @param mode
|
||||
* @param nbLine
|
||||
* @param connectionUniqueNames
|
||||
*/
|
||||
public synchronized boolean update(boolean execStat, boolean enableLogStash, String iterateId, int mode, int nbLine, String... informationGroup) {
|
||||
if(execStat) {
|
||||
updateStat(iterateId, mode, nbLine, informationGroup);
|
||||
}
|
||||
|
||||
if(enableLogStash) {
|
||||
boolean emit = updateLog(iterateId, mode, nbLine, informationGroup);
|
||||
return emit;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* work for avoiding the 65535 issue
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user