210 lines
9.2 KiB
Plaintext
210 lines
9.2 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
java.util.List
|
|
java.util.Set
|
|
java.util.HashSet
|
|
org.talend.components.api.component.ComponentDefinition
|
|
org.talend.designer.core.generic.model.Component
|
|
org.talend.core.model.metadata.IMetadataColumn
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.types.JavaType
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.process.IConnectionCategory
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.EConnectionType
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.utils.TalendTextUtils
|
|
org.talend.core.model.utils.NodeUtil
|
|
org.talend.designer.core.generic.model.Component
|
|
java.util.ArrayList
|
|
java.util.HashMap
|
|
java.util.Map
|
|
org.talend.components.api.properties.ComponentProperties
|
|
org.talend.daikon.NamedThing
|
|
org.talend.daikon.properties.property.Property
|
|
org.talend.designer.core.generic.constants.IGenericConstants
|
|
org.talend.core.model.utils.ContextParameterUtils
|
|
"
|
|
%>
|
|
<%@include file="@{org.talend.designer.codegen}/jet_stub/generic/component_util_indexedrecord_to_rowstruct.javajet"%>
|
|
<%@include file="@{org.talend.designer.codegen}/jet_stub/generic/component_util_process_properties.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
String cid = node.getUniqueName();
|
|
Component component = (Component)node.getComponent();
|
|
ComponentDefinition def = component.getComponentDefinition();
|
|
|
|
boolean hasInput = !NodeUtil.getIncomingConnections(node, IConnectionCategory.DATA).isEmpty();
|
|
|
|
if(hasInput){
|
|
// These will be initialized if there are outgoing connections and will be
|
|
// null if there isn't a corresponding outgoing connection.
|
|
|
|
boolean hasMainOutput = false;
|
|
boolean hasRejectOutput = false;
|
|
|
|
List<? extends IConnection> outgoingConns = node.getOutgoingSortedConnections();
|
|
if (outgoingConns!=null){
|
|
for (int i = 0; i < outgoingConns.size(); i++) {
|
|
IConnection outgoingConn = outgoingConns.get(i);
|
|
if ("MAIN".equals(outgoingConn.getConnectorName())) {
|
|
hasMainOutput = true;
|
|
}
|
|
if ("REJECT".equals(outgoingConn.getConnectorName())) {
|
|
hasRejectOutput = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Generate the code to handle the incoming records.
|
|
IConnection inputConn = null;
|
|
List< ? extends IConnection> inputConns = node.getIncomingConnections();
|
|
if(inputConns!=null) {
|
|
for (IConnection conn : inputConns) {
|
|
if (conn.getLineStyle().hasConnectionCategory(IConnectionCategory.DATA)) {
|
|
inputConn = conn;
|
|
}
|
|
}
|
|
}
|
|
|
|
boolean hasValidInput = inputConn!=null;
|
|
if (hasValidInput) {
|
|
List<IMetadataColumn> input_columnList = inputConn.getMetadataTable().getListColumns();
|
|
if(input_columnList == null) {
|
|
input_columnList = new ArrayList<IMetadataColumn>();
|
|
}
|
|
// add incoming (not present) columns to enforcer for this comps
|
|
if (cid.contains("tDataStewardship") || cid.contains("tMarkLogic")){
|
|
%>
|
|
boolean shouldCreateRuntimeSchemaForIncomingNode = false;
|
|
<%
|
|
for (int i = 0; i < input_columnList.size(); i++) {
|
|
if(!input_columnList.get(i).getTalendType().equals("id_Dynamic")) {
|
|
%>
|
|
if (incomingEnforcer_<%=cid%>.getDesignSchema().getField("<%=input_columnList.get(i)%>") == null){
|
|
incomingEnforcer_<%=cid%>.addIncomingNodeField("<%=input_columnList.get(i)%>", ((Object) <%=inputConn.getName()%>.<%=input_columnList.get(i)%>).getClass().getCanonicalName());
|
|
shouldCreateRuntimeSchemaForIncomingNode = true;
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
if (shouldCreateRuntimeSchemaForIncomingNode){
|
|
incomingEnforcer_<%=cid%>.createRuntimeSchema();
|
|
}
|
|
<%
|
|
}
|
|
// If there are dynamic columns in the schema, they need to be
|
|
// initialized into the runtime schema of the actual IndexedRecord
|
|
// provided to the component.
|
|
|
|
int dynamicPos = -1;
|
|
for (int i = 0; i < input_columnList.size(); i++) {
|
|
if (input_columnList.get(i).getTalendType().equals("id_Dynamic")) {
|
|
dynamicPos = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (dynamicPos != -1) {
|
|
%>
|
|
if (!incomingEnforcer_<%=cid%>.areDynamicFieldsInitialized()) {
|
|
// Initialize the dynamic columns when they are first encountered.
|
|
for (routines.system.DynamicMetadata dm_<%=cid%> : <%=inputConn.getName()%>.<%=input_columnList.get(dynamicPos).getLabel()%>.metadatas) {
|
|
incomingEnforcer_<%=cid%>.addDynamicField(
|
|
dm_<%=cid%>.getName(),
|
|
dm_<%=cid%>.getType(),
|
|
dm_<%=cid%>.getLogicalType(),
|
|
dm_<%=cid%>.getFormat(),
|
|
dm_<%=cid%>.getDescription(),
|
|
dm_<%=cid%>.isNullable());
|
|
}
|
|
incomingEnforcer_<%=cid%>.createRuntimeSchema();
|
|
}
|
|
<%
|
|
}
|
|
|
|
%>
|
|
incomingEnforcer_<%=cid%>.createNewRecord();
|
|
<%
|
|
for (int i = 0; i < input_columnList.size(); i++) { // column
|
|
IMetadataColumn column = input_columnList.get(i);
|
|
if (dynamicPos != i) {
|
|
%>
|
|
//skip the put action if the input column doesn't appear in component runtime schema
|
|
if (incomingEnforcer_<%=cid%>.getRuntimeSchema().getField("<%=input_columnList.get(i)%>") != null){
|
|
incomingEnforcer_<%=cid%>.put("<%=column.getLabel()%>", <%=inputConn.getName()%>.<%=column.getLabel()%>);
|
|
}
|
|
<%
|
|
} else {
|
|
%>
|
|
for (int i = 0; i < <%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnCount(); i++) {
|
|
incomingEnforcer_<%=cid%>.put(<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnMetadata(i).getName(),
|
|
<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnValue(i));
|
|
}
|
|
<%
|
|
}
|
|
} // column
|
|
|
|
// If necesary, generate the code to handle outgoing connections.
|
|
// TODO: For now, this can only handle one outgoing record for
|
|
// each incoming record. To handle multiple outgoing records, code
|
|
// generation needs to occur in component_begin in order to open
|
|
// a for() loop.
|
|
|
|
// There will be a ClassCastException if the output component does
|
|
// not implement WriterWithFeedback, but permits outgoing
|
|
// connections.
|
|
|
|
ComponentProperties componentProps = node.getComponentProperties();
|
|
ProcessPropertiesGenerator generator = new ProcessPropertiesGenerator(cid, component);
|
|
List<Component.CodegenPropInfo> propsToProcess = component.getCodegenPropInfos(componentProps);
|
|
for (Component.CodegenPropInfo propInfo : propsToProcess) { // propInfo
|
|
List<NamedThing> properties = propInfo.props.getProperties();
|
|
for (NamedThing prop : properties) { // property
|
|
if (prop instanceof Property) { // if, only deal with valued Properties
|
|
Property property = (Property)prop;
|
|
if (property.getFlags() != null && (property.getFlags().contains(Property.Flags.DESIGN_TIME_ONLY) || property.getFlags().contains(Property.Flags.HIDDEN)))
|
|
continue;
|
|
if(property.getTaggedValue(IGenericConstants.DYNAMIC_PROPERTY_VALUE)!=null && Boolean.valueOf(String.valueOf(property.getTaggedValue(IGenericConstants.DYNAMIC_PROPERTY_VALUE)))) {
|
|
generator.setPropertyValues(property, propInfo, null, false, false);
|
|
}
|
|
}
|
|
} // property
|
|
} // propInfo
|
|
|
|
%>
|
|
org.apache.avro.generic.IndexedRecord data_<%=cid%> = incomingEnforcer_<%=cid%>.getCurrentRecord();
|
|
|
|
<%
|
|
boolean isParallelize ="true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__PARALLELIZE__"));
|
|
if (isParallelize) {
|
|
String sourceComponentId = inputConn.getSource().getUniqueName();
|
|
if(sourceComponentId!=null && sourceComponentId.contains("tAsyncIn")) {
|
|
%>
|
|
globalMap.put(buffersSizeKey_<%=cid%>, buffersSize_<%=sourceComponentId%>);
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
|
|
writer_<%=cid%>.write(data_<%=cid%>);
|
|
|
|
nb_line_<%=cid %>++;
|
|
<%if(hasMainOutput){
|
|
%>
|
|
if(!(writer_<%=cid%> instanceof org.talend.components.api.component.runtime.WriterWithFeedback)) {
|
|
// For no feedback writer,just pass the input record to the output
|
|
if (data_<%=cid%>!=null) {
|
|
outgoingMainRecordsList_<%=cid%> = java.util.Arrays.asList(data_<%=cid%>);
|
|
}
|
|
}
|
|
<%
|
|
}
|
|
}
|
|
} // canStart
|
|
%>
|