Compare commits

...

2 Commits

Author SHA1 Message Date
wwang-talend
d47a4d0b8c fix(TDI-44250): fix the format 2020-06-01 10:46:35 +08:00
wwang-talend
acaf397cdb fix(TDI-44250): MongoDB components consuming more memory 2020-05-29 16:06:39 +08:00
8 changed files with 49 additions and 49 deletions

View File

@@ -48,24 +48,24 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
java.util.Map map_<%=cid%> = new java.util.HashMap();
java.util.Queue<java.util.Map> queue_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<java.util.Map>();
java.util.concurrent.BlockingQueue<java.util.Map> queue_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<java.util.Map>(1000);
<%
}else{
}else{
%>
java.util.Queue<<%= outputRowStructName %>> queue_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<<%= outputRowStructName %>>();
java.util.concurrent.BlockingQueue<<%= outputRowStructName %>> queue_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<<%= outputRowStructName %>>(1000);
<%
}
}
%>
class ThreadXMLField_<%=cid%> extends Thread {
<%
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
java.util.Queue<java.util.Map> queue;
java.util.concurrent.BlockingQueue<java.util.Map> queue;
<%
}else{
%>
java.util.Queue<<%= outputRowStructName %>> queue;
java.util.concurrent.BlockingQueue<<%= outputRowStructName %>> queue;
<%
}
%>
@@ -74,13 +74,13 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
java.lang.Error lastError;
String currentComponent;
ThreadXMLField_<%=cid%>(java.util.Queue q) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q) {
this.queue = q;
globalMap.put("queue_<%=virtualSourceCid%>", queue);
lastException = null;
}
ThreadXMLField_<%=cid%>(java.util.Queue q, java.util.List<java.util.Map<String,String>> l) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q, java.util.List<java.util.Map<String,String>> l) {
this.queue = q;
this.flows = l;
lastException = null;
@@ -120,14 +120,14 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
<%
}else{
if(destination!=null && (destination.indexOf("tCouchbaseOutput_")>=0 || destination.indexOf("tCouchDBOutput_")>=0)){
if(destination!=null && (destination.indexOf("tCouchbaseOutput_")>=0 || destination.indexOf("tCouchDBOutput_")>=0)){
%>
java.util.Map map_<%=cid%> = new java.util.HashMap();
java.util.Queue<java.util.Map> queue_<%=cid%> = (java.util.Queue<java.util.Map>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<java.util.Map> queue_<%=cid%> = (java.util.concurrent.BlockingQueue<java.util.Map>) globalMap.get("queue_<%=cid%>");
<%
}else{
}else{
%>
java.util.Queue<<%= outputRowStructName %>> queue_<%=cid%> = (java.util.Queue<<%= outputRowStructName %>>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<<%= outputRowStructName %>> queue_<%=cid%> = (java.util.concurrent.BlockingQueue<<%= outputRowStructName %>>) globalMap.get("queue_<%=cid%>");
<%
}
}

View File

@@ -34,7 +34,7 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
String outputRowStructName = conns.get(0).getName()+"Struct";//row2
if(destination!=null && (destination.indexOf("tCouchbaseOutput_")>=0 || destination.indexOf("tCouchDBOutput_")>=0)){
%>
map_<%=cid%> = queue_<%=cid %>.poll();
map_<%=cid%> = queue_<%=cid %>.take();
str_<%=cid %> = (String)map_<%=cid%>.get("json_<%=destination%>");
String columnValue_<%=cid%> = "";
<%
@@ -89,7 +89,7 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
}
}else{
%>
<%= outputRowStructName %> result_<%= cid %> = queue_<%=cid %>.poll();
<%= outputRowStructName %> result_<%= cid %> = queue_<%=cid %>.take();
str_<%=cid %> = result_<%= cid %>.<%= jsonField %>;
<%
List<Map<String,String>> groupbys = (List<Map<String,String>>)ElementParameterParser.getObjectValue(node, "__GROUPBYS__");

View File

@@ -35,25 +35,25 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
if(startNode != null){
startNodeCid = startNode.getUniqueName();
}
IConnection nextMergeConn = NodeUtil.getNextMergeConnection(node);
if(nextMergeConn != null && nextMergeConn.getInputId()>1 && startNodeCid != null){
%>
java.util.Queue<String> queue_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<String>();
IConnection nextMergeConn = NodeUtil.getNextMergeConnection(node);
if(nextMergeConn != null && nextMergeConn.getInputId()>1 && startNodeCid != null){
%>
java.util.concurrent.BlockingQueue<String> queue_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<String>(1000);
class ThreadXMLField_<%=cid%> extends Thread {
java.util.Queue<String> queue;
java.util.concurrent.BlockingQueue<String> queue;
java.util.List<java.util.Map<String,String>> flows;
java.lang.Exception lastException;
java.lang.Error lastError;
String currentComponent;
ThreadXMLField_<%=cid%>(java.util.Queue q) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q) {
this.queue = q;
globalMap.put("queue_<%=virtualSourceCid%>", queue);
lastException = null;
}
ThreadXMLField_<%=cid%>(java.util.Queue q, java.util.List<java.util.Map<String,String>> l) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q, java.util.List<java.util.Map<String,String>> l) {
this.queue = q;
this.flows = l;
lastException = null;
@@ -90,10 +90,10 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
ThreadXMLField_<%=cid%> txf_<%=cid%> = new ThreadXMLField_<%=cid%>(queue_<%=cid%>);
txf_<%=cid%>.start();
<%
}else{
<%
}else{
%>
java.util.Queue<String> queue_<%=cid%> = (java.util.Queue<String>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<String> queue_<%=cid%> = (java.util.concurrent.BlockingQueue<String>) globalMap.get("queue_<%=cid%>");
<%
}
%>

View File

@@ -27,11 +27,11 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
if("id_Document".equals(metadata.getColumn(xmlField).getTalendType())) {
%>
<%=conn.getName()%>.<%=xmlField %> = ParserUtils.parseTo_Document(queue_<%=cid %>.poll());
<%=conn.getName()%>.<%=xmlField %> = ParserUtils.parseTo_Document(queue_<%=cid %>.take());
<%
} else {
%>
<%=conn.getName()%>.<%=xmlField %> = queue_<%=cid %>.poll();
<%=conn.getName()%>.<%=xmlField %> = queue_<%=cid %>.take();
<%
}
%>

View File

@@ -79,16 +79,16 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
if(nextMergeConn == null || nextMergeConn.getInputId()==1){
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
java.util.Queue<java.util.Map> listGroupby_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<java.util.Map>();
java.util.concurrent.BlockingQueue<java.util.Map> listGroupby_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<java.util.Map>(1000);
<%
}else{
if(istWriteJSONField){
%>
java.util.Queue<<%= rowStructNameOutput %>> listGroupby_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<<%= rowStructNameOutput %>>();
java.util.concurrent.BlockingQueue<<%= rowStructNameOutput %>> listGroupby_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<<%= rowStructNameOutput %>>(1000);
<%
}else{
%>
java.util.Queue<String> listGroupby_<%=cid%> = new java.util.concurrent.ConcurrentLinkedQueue<String>();
java.util.concurrent.BlockingQueue<String> listGroupby_<%=cid%> = new java.util.concurrent.LinkedBlockingQueue<String>(1000);
<%
}
}
@@ -104,16 +104,16 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
<%
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
java.util.Queue<java.util.Map> queue;
java.util.concurrent.BlockingQueue<java.util.Map> queue;
<%
}else{
if(istWriteJSONField){
%>
java.util.Queue<<%= rowStructNameOutput %>> queue;
java.util.concurrent.BlockingQueue<<%= rowStructNameOutput %>> queue;
<%
}else{
%>
java.util.Queue<String> queue;
java.util.concurrent.BlockingQueue<String> queue;
<%
}
}
@@ -123,13 +123,13 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
java.lang.Error lastError;
String currentComponent;
ThreadXMLField_<%=cid%>(java.util.Queue q) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q) {
this.queue = q;
globalMap.put("queue_<%=virtualTargetCid%>", queue);
lastException = null;
}
ThreadXMLField_<%=cid%>(java.util.Queue q, java.util.List<java.util.Map<String,String>> l) {
ThreadXMLField_<%=cid%>(java.util.concurrent.BlockingQueue q, java.util.List<java.util.Map<String,String>> l) {
this.queue = q;
this.flows = l;
lastException = null;
@@ -172,16 +172,16 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
}else{
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
java.util.Queue<java.util.Map> listGroupby_<%=cid%> = (java.util.Queue<java.util.Map>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<java.util.Map> listGroupby_<%=cid%> = (java.util.concurrent.BlockingQueue<java.util.Map>) globalMap.get("queue_<%=cid%>");
<%
}else{
if(istWriteJSONField){
%>
java.util.Queue<<%= rowStructNameOutput %>> listGroupby_<%=cid%> = (java.util.Queue<<%= rowStructNameOutput %>>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<<%= rowStructNameOutput %>> listGroupby_<%=cid%> = (java.util.concurrent.BlockingQueue<<%= rowStructNameOutput %>>) globalMap.get("queue_<%=cid%>");
<%
} else {
%>
java.util.Queue<String> listGroupby_<%=cid%> = (java.util.Queue<String>) globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue<String> listGroupby_<%=cid%> = (java.util.concurrent.BlockingQueue<String>) globalMap.get("queue_<%=cid%>");
<%
}
}

View File

@@ -211,14 +211,14 @@ if(nb_line_<%=cid %> > 0){
if(removeHeader_<%=cid %>.indexOf("<?xml") >=0 ){
removeHeader_<%=cid %> = removeHeader_<%=cid %>.substring(removeHeader_<%=cid %>.indexOf("?>")+3);
}
listGroupby_<%=cid %>.add(removeHeader_<%=cid %>);
listGroupby_<%=cid %>.put(removeHeader_<%=cid %>);
<%
}else{
String destination = ElementParameterParser.getValue(node, "__DESTINATION__");
if(destination.indexOf("tCouchbaseOutput_")==0 || destination.indexOf("tCouchDBOutput_")==0){
%>
map_<%=cid%>.put("json_<%=destination%>",strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.add(map_<%=cid%>);
listGroupby_<%=cid %>.put(map_<%=cid%>);
<%
}else{
if(istWriteJSONField){
@@ -236,11 +236,11 @@ if(nb_line_<%=cid %> > 0){
}
%>
row_<%= cid %>.<%= jsonField %> = strWriter_<%=cid %>.toString();
listGroupby_<%=cid %>.add(row_<%= cid %>);
listGroupby_<%=cid %>.put(row_<%= cid %>);
<%
}else{
%>
listGroupby_<%=cid %>.add(strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.put(strWriter_<%=cid %>.toString());
<%
}
}
@@ -293,7 +293,7 @@ if(nb_line_<%=cid %> > 0){
out_<%=cid%>.newLine();
out_<%=cid%>.close();
listGroupby_<%=cid %>.add(strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.put(strWriter_<%=cid %>.toString());
<%
}
%>

View File

@@ -24,14 +24,14 @@ if ((metadatas!=null)&&(metadatas.size()>0)) {
IConnection nextMergeConn = NodeUtil.getNextMergeConnection(node);
if(nextMergeConn == null || nextMergeConn.getInputId()==1){
%>
java.util.Queue listGroupby_<%=cid%> = (java.util.Queue)globalMap.get("queue_<%=virtualTargetCid%>");
java.util.concurrent.BlockingQueue listGroupby_<%=cid%> = (java.util.concurrent.BlockingQueue)globalMap.get("queue_<%=virtualTargetCid%>");
if(resourceMap.get("finish_<%=cid%>") == null){
globalMap.put("<%=virtualTargetCid%>_FINISH_WITH_EXCEPTION" + (listGroupby_<%=cid%>==null?"":listGroupby_<%=cid%>.hashCode()), "true");
}
<%
}else{
%>
java.util.Queue listGroupby_<%=cid%> = (java.util.Queue)globalMap.get("queue_<%=cid%>");
java.util.concurrent.BlockingQueue listGroupby_<%=cid%> = (java.util.concurrent.BlockingQueue)globalMap.get("queue_<%=cid%>");
<%
}
%>

View File

@@ -744,13 +744,13 @@ if(("Dom4j").equals(mode)){
if(removeHeader_<%=cid %>.indexOf("<?xml") >=0 ){
removeHeader_<%=cid %> = removeHeader_<%=cid %>.substring(removeHeader_<%=cid %>.indexOf("?>")+3);
}
listGroupby_<%=cid %>.add(removeHeader_<%=cid %>);
listGroupby_<%=cid %>.put(removeHeader_<%=cid %>);
<%
}else{
if(destination!=null && (destination.indexOf("tCouchbaseOutput_")==0) || destination.indexOf("tCouchDBOutput_")==0){
%>
map_<%=cid%>.put("json_<%=destination%>",strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.add(map_<%=cid%>);
listGroupby_<%=cid %>.put(map_<%=cid%>);
<%
}else{
if(istWriteJSONField){
@@ -768,11 +768,11 @@ if(("Dom4j").equals(mode)){
}
%>
row_<%= cid %>.<%= jsonField %> = strWriter_<%=cid %>.toString();
listGroupby_<%=cid %>.add(row_<%= cid %>);
listGroupby_<%=cid %>.put(row_<%= cid %>);
<%
}else{
%>
listGroupby_<%=cid %>.add(strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.put(strWriter_<%=cid %>.toString());
<%
}
}
@@ -988,7 +988,7 @@ else if(("Null").equals(mode)){
// endTabStrs_<%=cid%>.clear();
out_<%=cid %>.write("<%=rowSeparator%>");
out_<%=cid%>.close();
listGroupby_<%=cid %>.add(strWriter_<%=cid %>.toString());
listGroupby_<%=cid %>.put(strWriter_<%=cid %>.toString());
//create a new StringWriter and BufferWriter
//write the head title to the StringWriter