Compare commits

...

1 Commits

Author SHA1 Message Date
wliu
754bc9cf8c 1. modify to support mr1 2014-08-31 22:16:34 +08:00
9 changed files with 228 additions and 57 deletions

View File

@@ -54,8 +54,16 @@ public enum EInternalTemplate {
MR_SUBPROCESS_HEADER("mr_subprocess_header", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_SUBPROCESS_RUN("mr_subprocess_run", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_SUBPROCESS_FOOTER("mr_subprocess_footer", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
HEADER_ADDITIONAL("header_additional", "0.0.1"); //$NON-NLS-1$ //$NON-NLS-2$
MR_MAPPER_HEADER("mr_mapper_header", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_MAPPER_FOOTER("mr_mapper_footer", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_LOOP_BEGIN("mr_loop_begin", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_LOOP_END("mr_loop_end", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_SUBTREE_BEGIN("mr_subtree_begin", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_SUBTREE_END("mr_subtree_end", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_OUTCOMPONENT_WRITE("mr_outcomponent_write", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_MAPPER_KEYVALUE("mr_mapper_keyvalue", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
MR_CLOSE_BLOCKS_CODE("mr_close_blocks_code", "0.0.1"), //$NON-NLS-1$ //$NON-NLS-2$
HEADER_ADDITIONAL("header_additional", "0.0.1"); //$NON-NLS-1$ //$NON-NLS-2$mr_loop_part
private String templateName;

View File

@@ -64,17 +64,70 @@ public class NodesSubTree {
boolean subTreeContainsParallelIterate = false;
List<INode> mergeBranchStarts;
List<INode> mergeNodes;
List<INode> mergeNodes;
boolean isRefSubTree = false;// for mr only
List<INode> refNodes; // for mr only
List<INode> outNodes; // for mr only
boolean isContainReduce = false;
/* display size of method code in comment */
boolean methodSizeNeeded = false;
String mapKeyClass = null;
String mapKeyValue = null;
// List<IMetadataColumn> mapKeyMetadata = null;
public void setMapKeyClass(String outKeyClass) {
this.mapKeyClass = outKeyClass;
}
public void setMapValueClass(String outValueClass) {
this.mapValueClass = outValueClass;
}
public void setMapKeyClassValue(String outKeyClass, String outKeyValue) {
this.mapKeyClass = outKeyClass;
this.mapKeyValue = outKeyValue;
}
public String getMapKeyClass() {
if (this.mapKeyClass == null) {
return "Writable";
}
return this.mapKeyClass;
}
public String getMapKeyValue() {
return this.mapKeyValue;
}
String mapValueClass = null;
String mapValueValue = null;
public void setMapValueClassValue(String mapValueClass, String mapValueValue) {
this.mapValueClass = mapValueClass;
this.mapValueValue = mapValueValue;
}
public String getMapValueClass() {
if (this.mapValueClass == null) {
return "Writable";
}
return this.mapValueClass;
}
public String getMapValueValue() {
return this.mapValueValue;
}
public boolean isMethodSizeNeeded() {
return methodSizeNeeded;
}
@@ -129,9 +182,12 @@ public class NodesSubTree {
this.rootNode = node;
this.name = node.getUniqueName();
this.nodes = new ArrayList<INode>();
this.outNodes = new ArrayList<INode>();
afterSubProcesses = new ArrayList<String>();
beforeSubProcesses = new ArrayList<String>();
this.visitedNodesMainCode = new HashMap<INode, Integer>();
this.visitedNodesBeginCode = new HashMap<INode, Integer>();
this.visitedNodesEndCode = new HashMap<INode, Integer>();
allMainSubTreeConnections = new ArrayList<IConnection>();
buildMRSubTree(node);
@@ -170,14 +226,14 @@ public class NodesSubTree {
}
// if the node link with the same merge node
for(INode mNode:mergeNodes){
if (node.isActivate() && node.isSubProcessStart() && node.getLinkedMergeInfo() != null
for (INode mNode : mergeNodes) {
if (node.isActivate() && node.isSubProcessStart() && node.getLinkedMergeInfo() != null
&& node.getLinkedMergeInfo().get(mNode) != null) {
mergeBranchStarts.add(node);
buildSubTree(node, true);
}
}
}
}
@@ -210,6 +266,11 @@ public class NodesSubTree {
}
}
}
if (!this.isContainReduce) {
if (node.isVirtualGenerateNode() || ((AbstractNode) node).isMrContainsReduce()) {
this.isContainReduce = true;
}
}
for (IConnection connection : node.getOutgoingSortedConnections()) {
if (connection.getTarget().isActivate()) {
@@ -227,10 +288,30 @@ public class NodesSubTree {
}
}
}
boolean isOutNode = false;
if (!node.isVirtualGenerateNode()) {
List<? extends IConnection> outConns = node.getOutgoingConnections();
isOutNode = true;
for (IConnection conn : outConns) {
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
isOutNode = false;
break;
}
}
}
if (isOutNode && !this.outNodes.contains(node)) {
this.outNodes.add(node);
}
visitedNodesMainCode.put(node, 0);
visitedNodesBeginCode.put(node, 0);
visitedNodesEndCode.put(node, 0);
nodes.add(node);
}
public boolean isReduceJob() {
return this.isContainReduce;
}
private void buildSubTree(INode node, boolean breakWhenMerge) {
if (DEBUG) {
System.out.print(node.getUniqueName());
@@ -452,24 +533,24 @@ public class NodesSubTree {
@Override
public int compare(INode node1, INode node2) {
Map<INode,Integer> mergeInfo1=node1.getLinkedMergeInfo();
Map<INode,Integer> mergeInfo2=node2.getLinkedMergeInfo();
for(INode mNode:mergeNodes){
if (mergeInfo1.get(mNode)!=null && mergeInfo2.get(mNode)!=null) {
if(mergeInfo1.get(mNode) > mergeInfo2.get(mNode)){
return 1;
}else{
return -1;
}
}
if (mergeInfo1.get(mNode)!=null && mergeInfo2.get(mNode)==null) {
return -1;
}
if (mergeInfo1.get(mNode)==null && mergeInfo2.get(mNode)!=null) {
return 1;
}
}
return -1;
Map<INode, Integer> mergeInfo1 = node1.getLinkedMergeInfo();
Map<INode, Integer> mergeInfo2 = node2.getLinkedMergeInfo();
for (INode mNode : mergeNodes) {
if (mergeInfo1.get(mNode) != null && mergeInfo2.get(mNode) != null) {
if (mergeInfo1.get(mNode) > mergeInfo2.get(mNode)) {
return 1;
} else {
return -1;
}
}
if (mergeInfo1.get(mNode) != null && mergeInfo2.get(mNode) == null) {
return -1;
}
if (mergeInfo1.get(mNode) == null && mergeInfo2.get(mNode) != null) {
return 1;
}
}
return -1;
}
});
@@ -481,14 +562,14 @@ public class NodesSubTree {
public boolean isMergeSubTree() {
return this.isMergeSubTree;
}
public List<INode> getMergeNodes() {
return mergeNodes;
}
// public INode getMergeNode() {
// return this.mergeNode;
// }
public List<INode> getMergeNodes() {
return mergeNodes;
}
// public INode getMergeNode() {
// return this.mergeNode;
// }
/**
* Getter for allMainSubTreeConnections.
@@ -551,4 +632,8 @@ public class NodesSubTree {
public void setRefNodes(List<INode> refNodes) {
this.refNodes = refNodes;
}
public List<INode> getOutNodes() {
return this.outNodes;
}
}

View File

@@ -435,6 +435,15 @@ public final class CodeGeneratorEmittersPoolFactory {
if (component.getAvailableCodeParts().contains(ECodePart.MRCONFIG)) {
initComponent(codeLanguage, jetBeans, ECodePart.MRCONFIG, component);
}
if (component.getAvailableCodeParts().contains(ECodePart.MRBEGIN)) {
initComponent(codeLanguage, jetBeans, ECodePart.MRBEGIN, component);
}
if (component.getAvailableCodeParts().contains(ECodePart.MRMAIN)) {
initComponent(codeLanguage, jetBeans, ECodePart.MRMAIN, component);
}
if (component.getAvailableCodeParts().contains(ECodePart.MREND)) {
initComponent(codeLanguage, jetBeans, ECodePart.MREND, component);
}
}
}

View File

@@ -154,7 +154,7 @@ public class TalendJavaEditor extends CompilationUnitEditor implements ISyntaxCh
private void placeCursorToSelection() {
String mainPart;
if (process != null && (ComponentCategory.CATEGORY_4_MAPREDUCE.getName().equals(process.getComponentsType()))) {
mainPart = "[" + currentSelection + " mrconfig ] start"; //$NON-NLS-1$ //$NON-NLS-2$
mainPart = "[" + currentSelection + " mrmain ] start"; //$NON-NLS-1$ //$NON-NLS-2$
} else {
mainPart = "[" + currentSelection + " main ] start"; //$NON-NLS-1$ //$NON-NLS-2$
}

View File

@@ -57,7 +57,6 @@ import org.talend.designer.mapper.external.data.ExternalMapperData;
import org.talend.designer.mapper.external.data.ExternalMapperTable;
import org.talend.designer.mapper.external.data.ExternalMapperTableEntry;
import org.talend.designer.mapper.external.data.ExternalMapperUiProperties;
import org.talend.designer.mapper.i18n.Messages;
import org.talend.designer.mapper.language.LanguageProvider;
import org.talend.designer.mapper.language.generation.GenerationManager;
import org.talend.designer.mapper.language.generation.GenerationManagerFactory;
@@ -104,6 +103,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.core.model.process.AbstractExternalNode#initialize()
*/
@Override
public void initialize() {
super.initialize();
initMapperMain(false);
@@ -120,6 +120,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.designer.core.model.components.IExternalComponent#getPersistentData()
*/
@Override
public IExternalData getExternalData() {
if (this.externalData == null) {
this.externalData = new ExternalMapperData();
@@ -213,6 +214,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
/**
* Restore mapper model from internal stored data.
*/
@Override
public void restoreMapperModelFromInternalData() {
super.restoreMapperModelFromInternalData();
mapperMain.loadModelFromInternalData();
@@ -225,7 +227,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
/**
* Sort outgoingConnections for code generation as visible output zone of tMap.
*/
@SuppressWarnings("unchecked")//$NON-NLS-1$
@SuppressWarnings("unchecked")
private void sortOutputsConnectionsLikeVisualOrder() {
List<IConnection> outgoingConnections = (List<IConnection>) getOutgoingConnections();
Map<String, IConnection> connectionNameToOutgoingConnection = new HashMap<String, IConnection>();
@@ -298,6 +300,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.core.model.process.INode#getMetadataList()
*/
@Override
public List<IMetadataTable> getMetadataList() {
return this.metadataListOut;
}
@@ -307,6 +310,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.core.model.process.INode#setMetadataList(java.util.List)
*/
@Override
public void setMetadataList(List<IMetadataTable> metadataTablesOut) {
this.metadataListOut = metadataTablesOut;
}
@@ -381,6 +385,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
// }
// }
@Override
public void buildExternalData(AbstractExternalData abstractData) {
externalData = new ExternalMapperData();
if (abstractData instanceof MapperData) {
@@ -461,6 +466,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
}
}
@Override
public AbstractExternalData getExternalEmfData() {
final MapperData emfMapperData = MapperFactory.eINSTANCE.createMapperData();
initMapperMain(false);
@@ -510,6 +516,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
}
}
@Override
protected void renameMetadataColumnName(String conectionName, String oldColumnName, String newColumnName) {
if (conectionName == null || oldColumnName == null || newColumnName == null) {
throw new NullPointerException();
@@ -607,8 +614,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
}
TableEntryLocation[] tableEntryLocations = dataMapExpressionParser.parseTableEntryLocations(currentExpression);
// loop on all locations of current expression
for (int i = 0; i < tableEntryLocations.length; i++) {
TableEntryLocation currentLocation = tableEntryLocations[i];
for (TableEntryLocation currentLocation : tableEntryLocations) {
if (tableRenamed && oldLocation.tableName.equals(currentLocation.tableName)) {
oldLocation.columnName = currentLocation.columnName;
newLocation.columnName = currentLocation.columnName;
@@ -626,6 +632,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.core.model.process.AbstractExternalNode#getProblems()
*/
@Override
public List<Problem> getProblems() {
initMapperMain(false);
ProblemsAnalyser problemsAnalyser = new ProblemsAnalyser(mapperMain.getMapperManager());
@@ -667,9 +674,11 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
return this.generationManager;
}
@Override
public List<BlockCode> getBlocksCodeToClose() {
if (generationManager == null) {
throw new IllegalStateException(Messages.getString("MapperComponent.generationNotInitial")); //$NON-NLS-1$
return null;
// throw new IllegalStateException(Messages.getString("MapperComponent.generationNotInitial")); //$NON-NLS-1$
}
return this.generationManager.getBlocksCodeToClose();
}
@@ -742,6 +751,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
* @param renameAction true to rename in all expressions, false to get boolean if present in one of the expressions
* @return
*/
@Override
protected boolean hasOrRenameData(String oldName, String newName, boolean renameAction) {
if (oldName == null || newName == null && renameAction) {
throw new NullPointerException();
@@ -825,6 +835,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
*
* @see org.talend.core.model.process.INode#isUseLoopOnConditionalOutput(java.lang.String)
*/
@Override
public boolean isUseLoopOnConditionalOutput(String outputName) {
for (ExternalMapperTable table : externalData.getOutputTables()) {
if (table.getIsJoinTableOf() != null && table.getIsJoinTableOf().equals(outputName)) {
@@ -834,6 +845,7 @@ public class MapperComponent extends AbstractMapComponent implements IHashableIn
return false;
}
@Override
public List<String> checkNeededRoutines(List<String> possibleRoutines, String additionalString) {
List<String> routinesToAdd = new ArrayList<String>();
for (String routine : possibleRoutines) {

View File

@@ -1127,7 +1127,7 @@ public class MapperManager extends AbstractMapperManager {
defaultSettingMap.put(DataMapTableView.SCHEMA_ID, null);
defaultSettingMap.put(MapperSettingsManager.DIE_ON_ERROR, true);
defaultSettingMap.put(MapperSettingsManager.REPLICATED_JOIN, false);
// boolean parallel = false;
// IElementParameter paraEle = getAbstractMapComponent().getElementParameter("LKUP_PARALLELIZE");
// if (paraEle != null) {

View File

@@ -24,6 +24,8 @@ public class MapperSettingsManager {
public static final String DIE_ON_ERROR = "DIE_ON_ERROR"; //$NON-NLS-1$
public static final String REPLICATED_JOIN = "REPLICATED_JOIN"; //$NON-NLS-1$
public static final String TEMPORARY_DATA_DIRECTORY = "TEMPORARY_DATA_DIRECTORY"; //$NON-NLS-1$
public static final String ROWS_BUFFER_SIZE = "ROWS_BUFFER_SIZE"; //$NON-NLS-1$
@@ -81,6 +83,7 @@ public class MapperSettingsManager {
private void initDefaultModel() {
defaultModel = new MapperSettingModel();
defaultModel.setDieOnError((Boolean) manager.getDefaultSetting().get(DIE_ON_ERROR));
defaultModel.setReplicateJoin((Boolean) manager.getDefaultSetting().get(REPLICATED_JOIN));
defaultModel.setLookInParallel((Boolean) manager.getDefaultSetting().get(LOOKUP_IN_PARALLEL));
defaultModel.setTempDataDir(String.valueOf(manager.getDefaultSetting().get(TEMPORARY_DATA_DIRECTORY)));
defaultModel.setRowBufferSize(String.valueOf(manager.getDefaultSetting().get(ROWS_BUFFER_SIZE)));
@@ -93,6 +96,10 @@ public class MapperSettingsManager {
if (parameter != null && parameter.getValue() != null && parameter.getValue() instanceof Boolean) {
currentModel.setDieOnError((Boolean) parameter.getValue());
}
parameter = component.getElementParameter(REPLICATED_JOIN);
if (parameter != null && parameter.getValue() != null && parameter.getValue() instanceof Boolean) {
currentModel.setReplicateJoin((Boolean) parameter.getValue());
}
parameter = component.getElementParameter(TEMPORARY_DATA_DIRECTORY);
if (parameter != null && parameter.getValue() != null) {
currentModel.setTempDataDir(String.valueOf(parameter.getValue()));
@@ -139,6 +146,10 @@ public class MapperSettingsManager {
if (parameter != null) {
parameter.setValue(currentModel.isDieOnError());
}
parameter = component.getElementParameter(REPLICATED_JOIN);
if (parameter != null) {
parameter.setValue(currentModel.isReplicateJoin());
}
parameter = component.getElementParameter(TEMPORARY_DATA_DIRECTORY);
if (parameter != null) {
parameter.setValue(currentModel.getTempDataDir());

View File

@@ -19,6 +19,8 @@ public class MapperSettingModel implements Cloneable {
private boolean isDieOnError;
private boolean isReplicateJoin;
private boolean isLookInParallel;
private String tempDataDir;
@@ -43,6 +45,24 @@ public class MapperSettingModel implements Cloneable {
this.isDieOnError = isDieOnError;
}
/**
* Getter for isReplicateJoin.
*
* @return the isReplicateJoin
*/
public boolean isReplicateJoin() {
return this.isReplicateJoin;
}
/**
* Sets the isReplicateJoin.
*
* @param isReplicateJoin the isReplicateJoin to set
*/
public void setReplicateJoin(boolean isReplicateJoin) {
this.isReplicateJoin = isReplicateJoin;
}
/**
* Getter for isLookInParallel.
*
@@ -99,52 +119,70 @@ public class MapperSettingModel implements Cloneable {
public int hasDifferNumWith(MapperSettingModel other) {
int num = 0;
if (other == null)
if (other == null) {
return 4;
if (this.isDieOnError != other.isDieOnError)
}
if (this.isDieOnError != other.isDieOnError) {
num++;
if (this.isLookInParallel != other.isLookInParallel)
}
if (this.isLookInParallel != other.isLookInParallel) {
num++;
}
if (this.tempDataDir == null) {
if (other.tempDataDir != null)
if (other.tempDataDir != null) {
num++;
} else if (!this.tempDataDir.equals(other.tempDataDir))
}
} else if (!this.tempDataDir.equals(other.tempDataDir)) {
num++;
}
if (this.rowBufferSize == null) {
if (other.rowBufferSize != null)
if (other.rowBufferSize != null) {
num++;
} else if (!this.rowBufferSize.equals(other.rowBufferSize))
}
} else if (!this.rowBufferSize.equals(other.rowBufferSize)) {
num++;
}
return num;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (obj == null)
}
if (obj == null) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
}
final MapperSettingModel other = (MapperSettingModel) obj;
if (this.isDieOnError != other.isDieOnError)
if (this.isDieOnError != other.isDieOnError) {
return false;
if (this.isLookInParallel != other.isLookInParallel)
}
if (this.isLookInParallel != other.isLookInParallel) {
return false;
}
if (this.tempDataDir == null) {
if (other.tempDataDir != null)
if (other.tempDataDir != null) {
return false;
} else if (!this.tempDataDir.equals(other.tempDataDir))
}
} else if (!this.tempDataDir.equals(other.tempDataDir)) {
return false;
}
if (this.rowBufferSize == null) {
if (other.rowBufferSize != null)
if (other.rowBufferSize != null) {
return false;
} else if (!this.rowBufferSize.equals(other.rowBufferSize))
}
} else if (!this.rowBufferSize.equals(other.rowBufferSize)) {
return false;
}
return true;
}
@Override
public int hashCode() {
int hash = 1;
final int prime = 31;
@@ -155,6 +193,7 @@ public class MapperSettingModel implements Cloneable {
return hash;
}
@Override
public MapperSettingModel clone() throws CloneNotSupportedException {
return (MapperSettingModel) super.clone();
}

View File

@@ -16,6 +16,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.talend.core.model.process.ElementParameterParser;
import org.talend.core.model.process.IConnection;
import org.talend.core.model.process.INode;
import org.talend.designer.mapper.external.data.ExternalMapperData;
@@ -57,6 +58,12 @@ public class MapperHelper {
int sizeInputTables = inputTables.size();
String isFromMRtMap = ElementParameterParser.getValue(mapperNode, "__REPLICATED_JOIN__"); //$NON-NLS-1$
if (isFromMRtMap != null && "false".equals(isFromMRtMap)) { //$NON-NLS-1$
return sizeInputTables > 1;
}
HashMap<String, IConnection> hNameToConnection = new HashMap<String, IConnection>();
for (IConnection connection : incomingConnections) {
hNameToConnection.put(connection.getName(), connection);