Compare commits

...

1 Commits

Author SHA1 Message Date
wang wei
140e109fde fix(TDI-39076): Temp files created on disk while look up in tmap do not
clean on killing the job automatically
2017-07-18 16:54:12 +08:00
8 changed files with 246 additions and 39 deletions

View File

@@ -0,0 +1,45 @@
<%@ jet
imports="
org.talend.core.model.process.INode
org.talend.core.model.process.IConnection
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.IHashableInputConnections
org.talend.core.model.process.IHashConfiguration
org.talend.core.model.process.IDataConnection
java.util.List
"
%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
INode node = (INode)codeGenArgument.getArgument();
String cid = node.getUniqueName();
List<IConnection> connections = (List<IConnection>) node.getIncomingConnections();
if (connections != null && connections.size() > 0) {
for (IConnection connection : connections) {
String connectionName = connection.getName();
INode validTarget = ((IDataConnection) connection).getLinkNodeForHash();
if(validTarget != null) {
IHashConfiguration hashConfiguration = null;
boolean bSortOnDisk = "true".equals(ElementParameterParser.getValue(node, "__SORT_ON_DISK__"));
if (validTarget instanceof IHashableInputConnections){
IHashableInputConnections target = (IHashableInputConnections) validTarget;
hashConfiguration = target.getHashConfiguration(connection.getName());
}
if (hashConfiguration != null && hashConfiguration.isPersistent() || bSortOnDisk) {
%>
Object lookupManager_<%=connectionName%> = globalMap.get("tHash_Lookup_<%=connectionName%>");
if(lookupManager_<%=connectionName%>!=null) {
((org.talend.designer.components.lookup.persistent.IPersistentLookupManager)lookupManager_<%=connectionName%>).endPut();
}
<%
}
}
}
}
%>

View File

@@ -933,8 +933,8 @@ for (int iInputTable = 0; iInputTable < sizeInputTables - 1; iInputTable++) { //
};
fsi_<%=uniqueNameComponent%>_<%=currentJoinedTableIndex%>.initPut();
resourceMap.put("fsi_<%=uniqueNameComponent%>_<%=currentJoinedTableIndex%>", fsi_<%=uniqueNameComponent%>_<%=currentJoinedTableIndex%>);
<%
} // T_TM_B_103

View File

@@ -0,0 +1,93 @@
<%@ jet
imports="
java.util.ArrayList
java.util.List
java.util.Map
java.util.HashMap
org.talend.core.model.metadata.IMetadataTable
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.metadata.MetadataTalendType
org.talend.core.model.metadata.types.JavaTypesManager
org.talend.core.model.metadata.types.JavaType
org.talend.designer.mapper.MapperMain
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.mapper.MapperComponent
org.talend.designer.mapper.external.data.ExternalMapperData
org.talend.designer.mapper.external.data.ExternalMapperTable
org.talend.designer.mapper.external.data.ExternalMapperTableEntry
org.talend.core.model.process.IConnection
org.talend.designer.mapper.language.ILanguage
org.talend.designer.mapper.language.generation.GenerationManagerFactory
org.talend.designer.mapper.language.generation.JavaGenerationManager
org.talend.designer.mapper.language.LanguageProvider
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.EConnectionType
org.talend.core.model.process.INode
org.talend.designer.components.lookup.common.ICommonLookup.MATCHING_MODE
org.talend.core.model.utils.TalendTextUtils
"
skeleton="tMap_commons.skeleton"
%>
<%
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
MapperComponent node = (MapperComponent) codeGenArgument.getArgument();
String componentName = node.getUniqueName();
ExternalMapperData data = (ExternalMapperData) node.getExternalData();
boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__"));
boolean isVirtualIn = componentName.endsWith("TMAP_IN");
boolean isVirtualOut = componentName.endsWith("TMAP_OUT");
String uniqueNameComponent = componentName.replaceAll("_TMAP_IN", "");
uniqueNameComponent = uniqueNameComponent.replaceAll("_TMAP_OUT", "");
if(isVirtualIn) {
return stringBuffer.toString();
}
List<IConnection> inputConnections = (List<IConnection>) node.getIncomingConnections();
HashMap<String, IConnection> hNameToConnection = new HashMap<String, IConnection>();
for (IConnection connection : inputConnections) {
hNameToConnection.put(connection.getName(), connection);
}
List<ExternalMapperTable> inputTablesWithInvalid = new ArrayList<ExternalMapperTable>(data.getInputTables());
List<ExternalMapperTable> inputTables = new ArrayList<ExternalMapperTable>();
for(int i=0; i < inputTablesWithInvalid.size(); i++) {
ExternalMapperTable currentLoopTable = inputTablesWithInvalid.get(i);
if(hNameToConnection.get(currentLoopTable.getName()) != null) {
inputTables.add(currentLoopTable);
}
}
int lstSizeInputs = inputTables.size();
for (int i = 1; i < lstSizeInputs; i++) {
ExternalMapperTable inputTable = (ExternalMapperTable) inputTables.get(i);
String tableName = inputTable.getName();
List<ExternalMapperTableEntry> tableEntries = inputTable.getMetadataTableEntries();
if (tableEntries == null) {
continue;
}
if(inputTable.isPersistent()) {
%>
Object lookupManager_<%=tableName%> = globalMap.get("tHash_Lookup_<%=tableName%>");
if(lookupManager_<%=tableName%>!=null) {
((org.talend.designer.components.lookup.persistent.IPersistentLookupManager)tHash_Lookup_<%=tableName%>).endGet();
}
Object fsi_<%=uniqueNameComponent%>_<%=i%> = resourceMap.get("fsi_<%=uniqueNameComponent%>_<%=i%>");
if(fsi_<%=uniqueNameComponent%>_<%=i%>!=null) {
((org.talend.designer.components.lookup.persistent.PersistentRowSorterIterator)fsi_<%=uniqueNameComponent%>_<%=i%>).endPut();
((org.talend.designer.components.lookup.persistent.PersistentRowSorterIterator)fsi_<%=uniqueNameComponent%>_<%=i%>).endGet();
}
<%
}
}
%>

View File

@@ -0,0 +1,35 @@
<%@ jet
imports="
java.util.ArrayList
java.util.List
java.util.Map
java.util.HashMap
org.talend.core.model.metadata.IMetadataTable
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.metadata.types.JavaTypesManager
org.talend.core.model.metadata.types.JavaType
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.mapper.MapperComponent
org.talend.designer.mapper.external.data.ExternalMapperData
org.talend.designer.mapper.external.data.ExternalMapperTable
org.talend.designer.mapper.external.data.ExternalMapperTableEntry
org.talend.core.model.process.IConnection
org.talend.designer.mapper.language.ILanguage
org.talend.designer.mapper.language.generation.GenerationManagerFactory
org.talend.designer.mapper.language.generation.JavaGenerationManager
org.talend.designer.mapper.language.LanguageProvider
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.EConnectionType
org.talend.core.model.process.INode
org.talend.core.model.utils.TalendTextUtils
org.talend.designer.mapper.model.tableentry.TableEntryLocation
org.talend.designer.mapper.utils.DataMapExpressionParser
"
skeleton="../tMap/tMap_commons.skeleton"
%>
<%@ include file="../tMap/tMap_finally.inc.javajet" %>

View File

@@ -0,0 +1,36 @@
<%@ jet
imports="
java.util.ArrayList
java.util.List
java.util.Map
java.util.HashMap
org.talend.core.model.metadata.IMetadataTable
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.metadata.types.JavaTypesManager
org.talend.core.model.metadata.types.JavaType
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.mapper.MapperComponent
org.talend.designer.mapper.external.data.ExternalMapperData
org.talend.designer.mapper.external.data.ExternalMapperTable
org.talend.designer.mapper.external.data.ExternalMapperTableEntry
org.talend.core.model.process.IConnection
org.talend.designer.mapper.language.ILanguage
org.talend.designer.mapper.language.generation.GenerationManagerFactory
org.talend.designer.mapper.language.generation.JavaGenerationManager
org.talend.designer.mapper.language.LanguageProvider
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.EConnectionType
org.talend.core.model.process.INode
org.talend.core.model.utils.TalendTextUtils
org.talend.designer.mapper.model.tableentry.TableEntryLocation
org.talend.designer.mapper.utils.DataMapExpressionParser
"
skeleton="../tMap/tMap_commons.skeleton"
%>
<%@ include file="../tMap/tMap_finally.inc.javajet" %>

View File

@@ -96,16 +96,8 @@ public class PersistentLookupManager<B extends IPersistableRow<B>> implements IP
}
public void initPut() throws IOException {
// objectOutStream = new ObjectOutputStream(new BufferedOutputStream(new
// FileOutputStream(buildDataFilePath())));
File file = new File(buildDataFilePath());
file.deleteOnExit();
objectOutStream = new JBossObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
objectOutStream = new JBossObjectOutputStream(new BufferedOutputStream(new FileOutputStream(buildDataFilePath())));
this.dataInstance = this.rowCreator.createRowInstance();
}
private String buildDataFilePath() {
@@ -116,10 +108,12 @@ public class PersistentLookupManager<B extends IPersistableRow<B>> implements IP
bean.writeData(objectOutStream);
}
public void endPut() throws IOException {
objectOutStream.close();
public synchronized void endPut() throws IOException {
if(objectOutStream!=null) {
objectOutStream.close();
}
objectOutStream = null;
}
public void initGet() throws IOException {
@@ -162,14 +156,17 @@ public class PersistentLookupManager<B extends IPersistableRow<B>> implements IP
return dataInstance;
}
public void endGet() throws IOException {
public synchronized void endGet() throws IOException {
if (this.objectInStream != null) {
this.objectInStream.close();
}
this.objectInStream = null;
if (this.bufferedInStream != null) {
this.bufferedInStream.close();
}
this.bufferedInStream = null;
File file = new File(buildDataFilePath());
file.delete();

View File

@@ -239,9 +239,11 @@ public abstract class PersistentRowSorterIterator<V extends IPersistableRow> imp
beansCount++;
}
public void endPut() throws IOException {
if (bufferBeanIndex > INIT_BUFFER_INDEX) {
writeBuffer();
public synchronized void endPut() throws IOException {
if(buffer!=null) {
if (bufferBeanIndex > INIT_BUFFER_INDEX) {
writeBuffer();
}
}
buffer = null;
}
@@ -274,8 +276,6 @@ public abstract class PersistentRowSorterIterator<V extends IPersistableRow> imp
File file = new File(buildFilePath());
file.deleteOnExit();
count++;
// ObjectOutputStream rw = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
ObjectOutputStream rw = new JBossObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
@@ -454,10 +454,12 @@ public abstract class PersistentRowSorterIterator<V extends IPersistableRow> imp
scArray = null;
// delete files
for (int i = 0; i < files.size(); i++) {
files.get(i).delete();
if(files!=null) {
for (int i = 0; i < files.size(); i++) {
files.get(i).delete();
}
}
files = null;
}
public V getNextFreeRow() {
@@ -481,7 +483,7 @@ public abstract class PersistentRowSorterIterator<V extends IPersistableRow> imp
*
* @see org.talend.designer.components.thash.io.IMapHashFile#endGet(java.lang.String)
*/
public void endGet() {
public synchronized void endGet() {
afterLoopFind();
}

View File

@@ -338,16 +338,14 @@ public class PersistentSortedLookupManager<B extends IPersistableComparableLooku
return removePropertyName;
}
public void endPut() throws IOException {
if (bufferBeanIndex > 0) {
writeBuffer();
public synchronized void endPut() throws IOException {
if (buffer != null) {
if (bufferBeanIndex > 0) {
writeBuffer();
}
}
// Arrays.fill(buffer, null);
buffer = null;
}
private void writeBuffer() throws IOException {
@@ -355,10 +353,8 @@ public class PersistentSortedLookupManager<B extends IPersistableComparableLooku
Arrays.sort(buffer, 0, bufferBeanIndex);
}
File keysDataFile = new File(buildKeysFilePath(fileIndex));
keysDataFile.deleteOnExit();
File valuesDataFile = new File(buildValuesFilePath(fileIndex));
valuesDataFile.deleteOnExit();
BufferedOutputStream keysBufferedOutputStream = new BufferedOutputStream(new FileOutputStream(keysDataFile));
ObjectOutputStream keysDataOutputStream = null;
@@ -581,11 +577,14 @@ public class PersistentSortedLookupManager<B extends IPersistableComparableLooku
}
}
public void endGet() throws IOException {
for (ILookupManagerUnit<B> orderedBeanLookup : lookupList) {
orderedBeanLookup.close();
public synchronized void endGet() throws IOException {
if (lookupList != null) {
for (ILookupManagerUnit<B> orderedBeanLookup : lookupList) {
orderedBeanLookup.close();
}
clear();
}
clear();
lookupList = null;
}