diff --git a/main/plugins/org.talend.core.runtime/src/main/java/org/talend/core/model/process/BigDataNode.java b/main/plugins/org.talend.core.runtime/src/main/java/org/talend/core/model/process/BigDataNode.java index bfb9f36d30..92b705800f 100644 --- a/main/plugins/org.talend.core.runtime/src/main/java/org/talend/core/model/process/BigDataNode.java +++ b/main/plugins/org.talend.core.runtime/src/main/java/org/talend/core/model/process/BigDataNode.java @@ -224,7 +224,10 @@ public class BigDataNode extends AbstractNode implements IBigDataNode { colName = nodeElemForList.getListItemsDisplayName()[index]; } } - columnList.add(bigDataNode.getIncomingConnections().get(0).getMetadataTable().getColumn(colName)); + if (bigDataNode.getIncomingConnections().size() > 0) { + columnList.add(bigDataNode.getIncomingConnections().get(0).getMetadataTable() + .getColumn(colName)); + } } else { throw new RuntimeException( "The parameter " + partitionKey[0] + "." + partitionKey[1] + " does not exist in the component " + this.getComponentName()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ @@ -308,7 +311,7 @@ public class BigDataNode extends AbstractNode implements IBigDataNode { /* * (non-Javadoc) - * + * * @see org.talend.core.model.process.IBigDataNode#setKeyList(java.lang.String, java.util.List) */ @Override diff --git a/main/plugins/org.talend.librariesmanager/resources/java/routines/system/SparkStreamingRunStat.java b/main/plugins/org.talend.librariesmanager/resources/java/routines/system/SparkStreamingRunStat.java new file mode 100644 index 0000000000..378d0d9827 --- /dev/null +++ b/main/plugins/org.talend.librariesmanager/resources/java/routines/system/SparkStreamingRunStat.java @@ -0,0 +1,130 @@ +// ============================================================================ +// +// Copyright (C) 2006-2015 Talend Inc. - www.talend.com +// +// This source code is available under agreement available at +// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt +// +// You should have received a copy of the agreement +// along with this program; if not, write to Talend SA +// 9 rue Pages 92150 Suresnes, France +// +// ============================================================================ +package routines.system; + +import java.util.ArrayList; +import java.util.List; + +/** + * created by bchen on Jul 24, 2013 Detailled comment + * + */ +public class SparkStreamingRunStat extends MRRunStat { + + protected List messages = new ArrayList<>(); + + public SparkStreamingRunStat() { + super(); + } + + public class StatBean { + + private String subjobId; + + private int batchCompleted; + + private int batchStarted; + + private String lastProcessingDelay; + + private String lastSchedulingDelay; + + private String lastTotalDelay; + + public String getSubjobId() { + return this.subjobId; + } + + public void setSubjobId(String subjobId) { + this.subjobId = subjobId; + } + + public int getBatchCompleted() { + return this.batchCompleted; + } + + public void setBatchCompleted(int batchCompleted) { + this.batchCompleted = batchCompleted; + } + + public int getBatchStarted() { + return this.batchStarted; + } + + public void setBatchStarted(int batchStarted) { + this.batchStarted = batchStarted; + } + + public String getLastProcessingDelay() { + return this.lastProcessingDelay; + } + + public void setLastProcessingDelay(String lastProcessingDelay) { + this.lastProcessingDelay = lastProcessingDelay; + } + + public String getLastSchedulingDelay() { + return this.lastSchedulingDelay; + } + + public void setLastSchedulingDelay(String lastSchedulingDelay) { + this.lastSchedulingDelay = lastSchedulingDelay; + } + + public String getLastTotalDelay() { + return this.lastTotalDelay; + } + + public void setLastTotalDelay(String lastTotalDelay) { + this.lastTotalDelay = lastTotalDelay; + } + + public String toStatFormat() { + String stats = rootPid + "|" + fatherPid + "|" + pid + "|" + this.subjobId + "|" + this.batchCompleted + "|" + + this.batchStarted + "|" + this.lastProcessingDelay + "|" + this.lastSchedulingDelay + "|" + + this.lastTotalDelay; + + return stats; + } + } + + public StatBean createSparkStreamingStatBean() { + return new StatBean(); + } + + @Override + public void sendMessages() { + for (StatBean message : messages) { + pred.println(message.toStatFormat()); + } + messages.clear(); + } + + public synchronized void updateSparkStreamingData(StatBean message) { + messages.add(message); + } + + // for feature:10589 + private String rootPid = null; + + private String fatherPid = null; + + private String pid = "0"; + + // Notice: this API should be invoked after startThreadStat() closely. + public void setAllPID(String rootPid, String fatherPid, String pid) { + this.rootPid = rootPid; + this.fatherPid = fatherPid; + this.pid = pid; + } +}