Compare commits

..

1 Commits

Author SHA1 Message Date
unknown
8b7ef5e95c feat(TBD-10769): delta Features for R08 2020-08-11 17:20:06 +02:00
1421 changed files with 23902 additions and 46799 deletions

View File

@@ -584,11 +584,13 @@ EParameterName.jdbcURL=JDBC URL
EParameterName.driverJar=Driver jar
EParameterName.className=Class name
EParameterName.mappingFile=Mapping file
SetupProcessDependenciesRoutinesAction.title=Setup Codes Dependencies
SetupProcessDependenciesRoutinesAction.title=Setup routine dependencies
SetupProcessDependenciesRoutinesDialog.systemRoutineLabel=System routines
SetupProcessDependenciesRoutinesDialog.userRoutineLabel=User routines
PerformancePreferencePage.addAllSystemRoutines=Add all system routines to job dependencies, when creating a new job
PerformancePreferencePage.addAllUserRoutines=Add all user routines to job dependencies, when creating a new job
ShowRoutineItemsDialog.systemTitle=Select Sytem Routines
ShowRoutineItemsDialog.title=Select Routines
AbstractMultiPageTalendEditor_pleaseWait=Saving Please Wait....
DocumentationPreferencePage.use_css_template=Use CSS file as a template when export to HTML
DocumentationPreferencePage.css_file=CSS File

View File

@@ -1,5 +1,5 @@
NavigatorContent.contexts=Contexts
NavigatorContent.routines=Global Routines
NavigatorContent.routines=Routines
NavigatorContent.sqlTemplates=SQL Templates
NavigatorContent.documentation=Documentation
NavigatorContent.activation=di.fake.for.activation

View File

@@ -6,7 +6,7 @@
<license url="http://www.example.com/license">[Enter License Description here.]</license>
<requires>
<import feature="org.eclipse.test" version="0.0.0" match="greaterOrEqual"/>
<import plugin="org.junit" version="4.13.2" match="greaterOrEqual"/>
<import plugin="org.junit" version="0.0.0" match="greaterOrEqual"/>
<import plugin="org.talend.commons.runtime" version="0.0.0" match="greaterOrEqual"/>
<import plugin="org.talend.commons.ui" version="0.0.0" match="greaterOrEqual"/>
<import plugin="org.talend.core" version="0.0.0" match="greaterOrEqual"/>

View File

@@ -16,9 +16,11 @@
</requires>
<plugin id="org.talend.libraries.apache" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.axis2" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.batik" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.chemistry" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.common" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.cxf" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.google" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.http" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.lucene" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.apache.xml" download-size="0" install-size="0" version="0.0.0"/>
@@ -50,4 +52,5 @@
<plugin id="org.talend.libraries.slf4j" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.xml" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.zmq" download-size="0" install-size="0" version="0.0.0"/>
<plugin id="org.talend.libraries.zookeeper" download-size="0" install-size="0" version="0.0.0"/>
</feature>

View File

@@ -50,11 +50,8 @@ List<IConnection> allSubProcessConnection = codeGenArgument.getAllMainSubTreeCon
String cid = node.getUniqueName();
List<? extends INode> jobCatcherNodes = process.getNodesOfType("tJobStructureCatcher");
boolean jobCatcherExists = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
INode jobCatcherNode = jobCatcherExists ? jobCatcherNodes.get(0) : null;
boolean enableLogStash = !Boolean.getBoolean("deactivate_extended_component_log") && jobCatcherExists;
boolean logstashCurrent = enableLogStash && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
boolean iterateInVFComp = (node.getVirtualLinkTo() != null && node.getVirtualLinkTo() == EConnectionType.ITERATE);
@@ -91,10 +88,10 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
for (INode jobStructureCatcher : jobCatcherNodes) {
@@ -125,10 +122,10 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
for (INode jobStructureCatcher : jobCatcherNodes) {
@@ -167,7 +164,6 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
}
}
}
List<IMetadataTable> metadatas = node.getMetadataList();
if ((!node.isSubProcessStart())&&(NodeUtil.isDataAutoPropagated(node))) {
if (inputColName!=null) {
@@ -193,117 +189,6 @@ if((codePart.equals(ECodePart.END))&&(stat || logstashCurrent)){
}
}
}
//log runtime lineage
boolean enable_runtime_lineage_log = NodeUtil.isJobUsingRuntimeLineage(process) && jobCatcherExists && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
if(enable_runtime_lineage_log) {//}
List<? extends IConnection> outConns = node.getOutgoingConnections();
if(!outConns.isEmpty()) {
%>
if(tos_count_<%=node.getUniqueName() %> == 0) {
<%
//}
}
for (IConnection conn : outConns) {
if(!conn.getLineStyle().equals(EConnectionType.FLOW_MAIN) && !conn.getLineStyle().equals(EConnectionType.FLOW_MERGE) && !conn.getLineStyle().equals(EConnectionType.FLOW_REF)) {
continue;
}
IMetadataTable metadata = conn.getMetadataTable();
if (metadata==null) {
continue;
}
List<IMetadataColumn> columns = metadata.getListColumns();
if(columns == null || columns.isEmpty()) {
continue;
}
%>
class SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> {
public java.util.List<java.util.Map<String, String>> getSchema(final <%=NodeUtil.getPrivateConnClassName(conn) %>Struct <%=conn.getName()%>) {
java.util.List<java.util.Map<String, String>> schema = new java.util.ArrayList<>();
if(<%=conn.getName()%> == null) {
return schema;
}
java.util.Map<String, String> field = null;
<%
for(IMetadataColumn column : columns){
if("id_Dynamic".equals(column.getTalendType())) {
%>
routines.system.Dynamic dynamic = <%=conn.getName()%>.<%=column.getLabel()%>;
if(dynamic != null) {
for(routines.system.DynamicMetadata metadata : dynamic.metadatas) {
field = new java.util.HashMap<>();
field.put("name", metadata.getName());
field.put("origin_name", metadata.getDbName());
field.put("iskey", "" + metadata.isKey());
field.put("talend_type", metadata.getType());
field.put("type", metadata.getDbType());
field.put("nullable", "" + metadata.isNullable());
field.put("pattern", metadata.getFormat());
field.put("length", "" + metadata.getLength());
field.put("precision", "" + metadata.getPrecision());
schema.add(field);
}
}
<%
continue;
}
String pattern = column.getPattern();
if(pattern == null || pattern.isEmpty() || pattern.equals("\"\"")) {
pattern = "\"\"";
}
%>
field = new java.util.HashMap<>();
field.put("name", "<%=column.getLabel()%>");
field.put("origin_name", "<%=column.getOriginalDbColumnName()%>");
field.put("iskey", "<%=column.isKey()%>");
field.put("talend_type", "<%=column.getTalendType()%>");
field.put("type", "<%=column.getType()%>");
field.put("nullable", "<%=column.isNullable()%>");
field.put("pattern", <%=pattern%>);
field.put("length", "<%=column.getLength()%>");
field.put("precision", "<%=column.getPrecision()%>");
schema.add(field);
<%
}
%>
return schema;
}
}
java.util.List<java.util.Map<String, String>> schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%> = new SchemaUtil_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>().getSchema(<%=conn.getName()%>);
<%
INode target = conn.getTarget();
String targetNodeId = target.getUniqueName();
String targetNodeComponent = target.getComponent().getName();
%>
<%=jobCatcherNode.getUniqueName()%>.addConnectionSchemaMessage("<%=node.getUniqueName()%>","<%=node.getComponent().getName()%>",
"<%=targetNodeId%>","<%=targetNodeComponent%>", "<%=conn.getUniqueName()%>" + iterateId, schema_<%=conn.getUniqueName()%>_<%=conn.getMetadataTable().getTableName()%>);
<%=jobCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
if(!outConns.isEmpty()) {
//{
%>
}
<%
}
//{
}
//======================================TDI-17183 end=====================================
boolean traceCodeGenerated = false;
for (IConnection conn : node.getOutgoingConnections()) {

View File

@@ -8,9 +8,6 @@
org.talend.core.model.metadata.IMetadataColumn
org.talend.core.model.process.EConnectionType
org.talend.core.model.process.ElementParameterParser
org.talend.core.model.process.EParameterFieldType
org.talend.designer.core.model.components.EParameterName
org.talend.designer.core.model.components.ElementParameter
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.core.model.utils.NodeUtil
org.talend.core.model.process.IConnectionCategory
@@ -143,14 +140,10 @@
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MAIN));
connSet.addAll(node.getIncomingConnections(EConnectionType.FLOW_MERGE));
String cid = node.getUniqueName();
List<? extends INode> jobCatcherNodes = node.getProcess().getNodesOfType("tJobStructureCatcher");
boolean jobCatcherExists = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
INode jobCatcherNode = jobCatcherExists ? jobCatcherNodes.get(0) : null;
boolean enableLogStash = !Boolean.getBoolean("deactivate_extended_component_log") && jobCatcherExists;
boolean logstashCurrent = enableLogStash && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
boolean enableLogStash = jobCatcherNodes != null && !jobCatcherNodes.isEmpty();
String cid = node.getUniqueName();
boolean logstashCurrent = !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend") && enableLogStash;
//about performance monitor, no way to support more than one job catcher component, also that is not necessary
final String subprocessName4Catcher = logstashCurrent ? jobCatcherNodes.get(0).getDesignSubjobStartNode().getUniqueName() : null;
@@ -197,10 +190,10 @@
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
%>
@@ -233,10 +226,10 @@
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
%>
@@ -260,10 +253,10 @@
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
@@ -296,10 +289,10 @@
String sourceNodeId = source.getUniqueName();
String sourceLabel = ElementParameterParser.getValue(source, "__LABEL__");
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel.trim());
String sourceNodeLabel = ((sourceLabel==null || "__UNIQUE_NAME__".equals(sourceLabel) || sourceLabel.contains("\"")) ? sourceNodeId : sourceLabel);
String targetLabel = ElementParameterParser.getValue(node, "__LABEL__");
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel.trim());
String targetNodeLabel = ((targetLabel==null || "__UNIQUE_NAME__".equals(targetLabel) || targetLabel.contains("\"")) ? node.getUniqueName() : targetLabel);
String sourceNodeComponent = source.getComponent().getName();
@@ -338,104 +331,11 @@
<%
log.startWork();
log.logCompSetting();
boolean enable_runtime_lineage_log = NodeUtil.isJobUsingRuntimeLineage(node.getProcess()) && jobCatcherExists && !cid.startsWith("tJobStructureCatcher") && !cid.startsWith("talend");
if(enable_runtime_lineage_log) {
%>
class ParameterUtil_<%=cid%>{
public java.util.Map<String, String> getParameter() throws Exception{
java.util.Map<String, String> component_parameters = new java.util.HashMap<>();
<%
java.util.Set<org.talend.core.model.process.EParameterFieldType> ignoredParamsTypes = new java.util.HashSet<org.talend.core.model.process.EParameterFieldType>();
ignoredParamsTypes.addAll(
java.util.Arrays.asList(
org.talend.core.model.process.EParameterFieldType.SCHEMA_TYPE,
org.talend.core.model.process.EParameterFieldType.SCHEMA_REFERENCE,
org.talend.core.model.process.EParameterFieldType.LABEL,
org.talend.core.model.process.EParameterFieldType.EXTERNAL,
org.talend.core.model.process.EParameterFieldType.MAPPING_TYPE,
org.talend.core.model.process.EParameterFieldType.IMAGE,
org.talend.core.model.process.EParameterFieldType.TNS_EDITOR,
org.talend.core.model.process.EParameterFieldType.WSDL2JAVA,
org.talend.core.model.process.EParameterFieldType.GENERATEGRAMMARCONTROLLER,
org.talend.core.model.process.EParameterFieldType.GENERATE_SURVIVORSHIP_RULES_CONTROLLER,
org.talend.core.model.process.EParameterFieldType.REFRESH_REPORTS,
org.talend.core.model.process.EParameterFieldType.BROWSE_REPORTS,
org.talend.core.model.process.EParameterFieldType.PALO_DIM_SELECTION,
org.talend.core.model.process.EParameterFieldType.GUESS_SCHEMA,
org.talend.core.model.process.EParameterFieldType.MATCH_RULE_IMEX_CONTROLLER,
org.talend.core.model.process.EParameterFieldType.MEMO_PERL,
org.talend.core.model.process.EParameterFieldType.DBTYPE_LIST,
org.talend.core.model.process.EParameterFieldType.VERSION,
org.talend.core.model.process.EParameterFieldType.TECHNICAL,
org.talend.core.model.process.EParameterFieldType.ICON_SELECTION,
org.talend.core.model.process.EParameterFieldType.JAVA_COMMAND,
org.talend.core.model.process.EParameterFieldType.TREE_TABLE,
org.talend.core.model.process.EParameterFieldType.VALIDATION_RULE_TYPE,
org.talend.core.model.process.EParameterFieldType.DCSCHEMA,
org.talend.core.model.process.EParameterFieldType.SURVIVOR_RELATION,
org.talend.core.model.process.EParameterFieldType.REST_RESPONSE_SCHEMA_TYPE,
org.talend.core.model.process.EParameterFieldType.BUTTON
)
);
for(org.talend.core.model.process.IElementParameter ep : org.talend.core.model.utils.NodeUtil.getDisplayedParameters(node)){
if(!ep.isLog4JEnabled() || ignoredParamsTypes.contains(ep.getFieldType())){
continue;
}
ElementParameter p = (ElementParameter)ep;
Object pluginValue = p.getTaggedValue("org.talend.sdk.component.source");
if(pluginValue != null && String.class.cast(pluginValue).equalsIgnoreCase("tacokit")) {
try {
if (!(Boolean) org.talend.core.runtime.IAdditionalInfo.class.cast(p).func("isPersisted")) {
continue;
}
} catch (Exception ex) {
//do nothing
}
%>
<%@ include file="./tacokit_runtime_log.javajet"%>
<%
continue;
}
String name = ep.getName();
java.util.Set<String> ignoredParamsNames = new java.util.HashSet<String>();
ignoredParamsNames.add("SQLPATTERN_VALUE");
ignoredParamsNames.add("ADDITIONAL_INSERT_COLUMNS");
ignoredParamsNames.add("ADDITIONAL_UPDATE_COLUMNS");
ignoredParamsNames.add("SELECTION_TABLE");
ignoredParamsNames.add("DIFFER_MESSAGE");
ignoredParamsNames.add("NO_DIFFER_MESSAGE");
if(ignoredParamsNames.contains(name)) {
//do nothing
} else if(org.talend.core.model.process.EParameterFieldType.PASSWORD.equals(ep.getFieldType()) || org.talend.core.model.process.EParameterFieldType.HIDDEN_TEXT.equals(ep.getFieldType())){
//not log password
}else{
String value = org.talend.core.model.utils.NodeUtil.getRuntimeParameterValue(node, ep);
%>
component_parameters.put("<%=name%>", String.valueOf(<%=value%>));
<%
}
}
%>
return component_parameters;
}
}
<%=jobCatcherNode.getUniqueName()%>.addComponentParameterMessage("<%=node.getUniqueName()%>", "<%=node.getComponent().getName()%>",
new ParameterUtil_<%=cid%>().getParameter());
<%=jobCatcherNode.getDesignSubjobStartNode().getUniqueName() %>Process(globalMap);
<%
}
if(logstashCurrent) {
for (INode jobStructureCatcher : jobCatcherNodes) {
String label = ElementParameterParser.getValue(node, "__LABEL__");
String nodeLabel = ((label==null || "__UNIQUE_NAME__".equals(label) || label.contains("\"")) ? node.getUniqueName() : label.trim());
String nodeLabel = ((label==null || "__UNIQUE_NAME__".equals(label) || label.contains("\"")) ? node.getUniqueName() : label);
%>
if(enableLogStash) {
<%=jobStructureCatcher.getUniqueName() %>.addCM("<%=node.getUniqueName()%>", "<%=nodeLabel%>", "<%=node.getComponent().getName()%>");

View File

@@ -24,7 +24,6 @@
org.talend.core.model.process.ProcessUtils
org.talend.core.model.components.IComponent
org.talend.core.model.components.EComponentType
org.talend.core.model.utils.NodeUtil
"
class="Footer"
skeleton="footer_java.skeleton"
@@ -126,16 +125,8 @@
boolean exist_tSCP = false;
List<INode> scpComponentsList = (List<INode>)process.getNodesOfType("tSCPConnection");
String parameterNames = "";
int scpsize = scpComponentsList.size();
if (scpsize > 0) {
if (scpComponentsList.size() > 0) {
exist_tSCP = true;
for (int i = 0; i < scpsize; i++) {
parameterNames += "\"conn_" + scpComponentsList.get(i).getUniqueName() + "\"";
if(i < scpsize-1){
parameterNames += ",";
}
}
}
boolean exist_tCassandra = false;
@@ -527,63 +518,8 @@
break;
}
}
boolean enableLogStash = !Boolean.getBoolean("deactivate_extended_component_log") && (jobCatcherNode!=null);
boolean enable_runtime_lineage_log = NodeUtil.isJobUsingRuntimeLineage(process) && (jobCatcherNode!=null);
if(enable_runtime_lineage_log) {
%>
java.util.Properties p_<%=jobCatcherNode.getUniqueName()%> = new java.util.Properties();
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("root.logger", "runtime");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("encoding", "UTF-8");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("application.name", "Talend Studio");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("service.name", "Talend Studio Job");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("instance.name", "Talend Studio Job Instance");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("propagate.appender.exceptions", "none");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("log.appender", "file");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.path", "runtime.json");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.maxsize", "52428800");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("appender.file.maxbackup", "20");
p_<%=jobCatcherNode.getUniqueName()%>.setProperty("host", "false");
final String runtime_dir_<%=jobCatcherNode.getUniqueName()%> = System.getProperty("runtime.lineage.outputpath");
final String runtime_path_<%=jobCatcherNode.getUniqueName()%> = System.getProperty("runtime.lineage.appender.file.path");
if(runtime_path_<%=jobCatcherNode.getUniqueName()%>==null || runtime_path_<%=jobCatcherNode.getUniqueName()%>.isEmpty()) {
if(runtime_dir_<%=jobCatcherNode.getUniqueName()%>!=null && !runtime_dir_<%=jobCatcherNode.getUniqueName()%>.isEmpty()) {
System.setProperty("runtime.lineage.appender.file.path",
new StringBuilder().append(runtime_dir_<%=jobCatcherNode.getUniqueName()%>)
.append((runtime_dir_<%=jobCatcherNode.getUniqueName()%>.endsWith("/") || runtime_dir_<%=jobCatcherNode.getUniqueName()%>.endsWith("\\")) ? "" : java.io.File.separator)
.append(projectName)
.append(java.io.File.separatorChar)
.append(jobName)
.append(java.io.File.separatorChar)
.append(jobVersion)
.append(java.io.File.separatorChar)
.append("runtime_log_")
.append(new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date()))
.append(".json")
.toString()
);
}
}
System.getProperties().stringPropertyNames().stream()
.filter(it -> it.startsWith("runtime.lineage.") && !"runtime.lineage.outputpath".equals(it))
.forEach(key -> p_<%=jobCatcherNode.getUniqueName()%>.setProperty(key.substring("runtime.lineage.".length()), System.getProperty(key)));
<%if(isLog4j1Enabled) {%>
org.apache.log4j.Logger.getLogger(p_<%=jobCatcherNode.getUniqueName()%>.getProperty("root.logger")).setLevel(org.apache.log4j.Level.DEBUG);
<%}%>
<%if(isLog4j2Enabled) {%>
org.apache.logging.log4j.core.config.Configurator.setLevel(p_<%=jobCatcherNode.getUniqueName()%>.getProperty("root.logger"), org.apache.logging.log4j.Level.DEBUG);
<%}%>
runtime_lineage_logger_<%=jobCatcherNode.getUniqueName()%> = org.talend.job.audit.JobEventAuditLoggerFactory.createJobAuditLogger(p_<%=jobCatcherNode.getUniqueName()%>);
<%
}
if(enableLogStash) {
String location = ElementParameterParser.getValue(jobCatcherNode, "__LOCATION__");
if(jobCatcherNode!=null) {
String location = ElementParameterParser.getValue(jobCatcherNode, "__LOCATION__");
%>
if(enableLogStash) {
java.util.Properties properties_<%=jobCatcherNode.getUniqueName()%> = new java.util.Properties();
@@ -614,7 +550,7 @@
}
<%
}
%>
%>
if(clientHost == null) {
clientHost = defaultClientHost;
@@ -650,16 +586,6 @@
}
%>
boolean inOSGi = routines.system.BundleUtils.inOSGi();
if (inOSGi) {
java.util.Dictionary<String, Object> jobProperties = routines.system.BundleUtils.getJobProperties(jobName);
if (jobProperties != null && jobProperties.get("context") != null) {
contextStr = (String)jobProperties.get("context");
}
}
try {
//call job/subjob with an existing context, like: --context=production. if without this parameter, there will use the default context instead.
java.io.InputStream inContext = <%=className%>.class.getClassLoader().getResourceAsStream("<%=jobClassPackageFolder%>/contexts/" + contextStr + ".properties");
@@ -667,15 +593,13 @@
inContext = <%=className%>.class.getClassLoader().getResourceAsStream("config/contexts/" + contextStr + ".properties");
}
if (inContext != null) {
try {
//defaultProps is in order to keep the original context value
if(context != null && context.isEmpty()) {
//defaultProps is in order to keep the original context value
if(context != null && context.isEmpty()) {
defaultProps.load(inContext);
context = new ContextProperties(defaultProps);
}
} finally {
inContext.close();
}
inContext.close();
} else if (!isDefaultContext) {
//print info and job continue to run, for case: context_param is not empty.
System.err.println("Could not find the context " + contextStr);
@@ -741,39 +665,34 @@
<%
} else if(typeToGenerate.equals("java.util.Date")) {
%>
try{
String context_<%=ctxParam.getName()%>_value = context.getProperty("<%=ctxParam.getName()%>");
try{
if (context_<%=ctxParam.getName()%>_value == null){
context_<%=ctxParam.getName()%>_value = "";
}
int context_<%=ctxParam.getName()%>_pos = context_<%=ctxParam.getName()%>_value.indexOf(";");
String context_<%=ctxParam.getName()%>_pattern = "yyyy-MM-dd HH:mm:ss";
if(context_<%=ctxParam.getName()%>_pos > -1){
context_<%=ctxParam.getName()%>_pattern = context_<%=ctxParam.getName()%>_value.substring(0, context_<%=ctxParam.getName()%>_pos);
context_<%=ctxParam.getName()%>_value = context_<%=ctxParam.getName()%>_value.substring(context_<%=ctxParam.getName()%>_pos + 1);
}
context.<%=ctxParam.getName()%>=(java.util.Date)(new java.text.SimpleDateFormat(context_<%=ctxParam.getName()%>_pattern).parse(context_<%=ctxParam.getName()%>_value));
} catch(ParseException e) {
try { <% /*try to check if date passed as long also*/ %>
long context_<%=ctxParam.getName()%>_longValue = Long.valueOf(context_<%=ctxParam.getName()%>_value);
context.<%=ctxParam.getName()%> = new java.util.Date(context_<%=ctxParam.getName()%>_longValue);
} catch (NumberFormatException cantParseToLongException) {
<%
if (isLog4jEnabled) {
%>
log.warn(String.format("<%=warningMessageFormat %>", "<%=ctxParam.getName() %>", "Can't parse date string: " + e.getMessage() + " and long: " + cantParseToLongException.getMessage()));
<%
} else {
%>
System.err.println(String.format("<%=warningMessageFormat %>", "<%=ctxParam.getName() %>", "Can't parse date string: " + e.getMessage() + " and long: " + cantParseToLongException.getMessage()));
<%
}
%>
context.<%=ctxParam.getName()%>=null;
}
if (context_<%=ctxParam.getName()%>_value == null){
context_<%=ctxParam.getName()%>_value = "";
}
int context_<%=ctxParam.getName()%>_pos = context_<%=ctxParam.getName()%>_value.indexOf(";");
String context_<%=ctxParam.getName()%>_pattern = "yyyy-MM-dd HH:mm:ss";
if(context_<%=ctxParam.getName()%>_pos > -1){
context_<%=ctxParam.getName()%>_pattern = context_<%=ctxParam.getName()%>_value.substring(0, context_<%=ctxParam.getName()%>_pos);
context_<%=ctxParam.getName()%>_value = context_<%=ctxParam.getName()%>_value.substring(context_<%=ctxParam.getName()%>_pos + 1);
}
context.<%=ctxParam.getName()%>=(java.util.Date)(new java.text.SimpleDateFormat(context_<%=ctxParam.getName()%>_pattern).parse(context_<%=ctxParam.getName()%>_value));
} catch(ParseException e) {
<%
if (isLog4jEnabled) {
%>
log.warn(String.format("<%=warningMessageFormat %>", "<%=ctxParam.getName() %>", e.getMessage()));
<%
} else {
%>
System.err.println(String.format("<%=warningMessageFormat %>", "<%=ctxParam.getName() %>", e.getMessage()));
<%
}
%>
context.<%=ctxParam.getName()%>=null;
}
<%
} else if(typeToGenerate.equals("Object")||typeToGenerate.equals("String")||typeToGenerate.equals("java.lang.String")) {
%>
@@ -1027,7 +946,7 @@ this.globalResumeTicket = true;//to run tPreJob
<%
}
if(enableLogStash) {
if(jobCatcherNode!=null) {
%>
if(enableLogStash) {
<%=jobCatcherNode.getUniqueName() %>.addJobStartMessage();
@@ -1180,7 +1099,7 @@ this.globalResumeTicket = true;//to run tPostJob
<%
}
if(enableLogStash) {
if(jobCatcherNode!=null) {
%>
if(enableLogStash) {
<%=jobCatcherNode.getUniqueName() %>.addJobEndMessage(startTime, end, status);
@@ -1198,12 +1117,9 @@ this.globalResumeTicket = true;//to run tPostJob
closeJmsConnections();
<% } %>
<% if (exist_tSCP) {
%>
closeCloseableConnections(<%=parameterNames%>);
<%
}
%>
<% if (exist_tSCP) { %>
closeScpConnections();
<% } %>
<%
if (stats) {
@@ -1222,24 +1138,6 @@ if (execStat) {
}
%>
int returnCode = 0;
<%
if (isRunInMultiThread) {
%>
Integer localErrorCode = (Integer)(((java.util.Map)threadLocal.get()).get("errorCode"));
String localStatus = (String)(((java.util.Map)threadLocal.get()).get("status"));
if (localErrorCode != null) {
if (errorCode == null || localErrorCode.compareTo(errorCode) > 0) {
errorCode = localErrorCode;
}
}
if (localStatus != null && !status.equals("failure")){
status = localStatus;
}
<%
}
%>
if(errorCode == null) {
returnCode = status != null && status.equals("failure") ? 1 : 0;
} else {
@@ -1257,7 +1155,7 @@ if (execStat) {
closeJmsConnections();
<% } %>
<% if(exist_tSCP) { %>
closeCloseableConnections(<%=parameterNames%>);
closeScpConnections();
<% } %>
<% if (exist_tSQLDB) { %>
closeSqlDbConnections();
@@ -1325,17 +1223,22 @@ if (execStat) {
<%
if(exist_tSCP) {
%>
private void closeCloseableConnections(String... names) {
java.util.Arrays.stream(names).forEach(name-> {
try {
Object obj_conn = globalMap.remove(name);
if(obj_conn != null){
((java.io.Closeable)obj_conn).close();
}
} catch (IOException ioException) {
private void closeScpConnections() {
try {
Object obj_conn;
<%
for (INode scpNode : scpComponentsList) {
%>
obj_conn = globalMap.remove("conn_<%=scpNode.getUniqueName() %>");
if (null != obj_conn) {
((ch.ethz.ssh2.Connection) obj_conn).close();
}
});
}
<%
}
%>
} catch (java.lang.Exception e) {
}
}
<%
}
%>
@@ -1445,7 +1348,6 @@ if (execStat) {
if ("sftp".equals(type)) { %>
((com.jcraft.jsch.ChannelSftp) obj_conn).quit();
<%} else { %>
((org.apache.commons.net.ftp.FTPClient) obj_conn).logout();
((org.apache.commons.net.ftp.FTPClient) obj_conn).disconnect();
<%}%>
}

View File

@@ -56,28 +56,10 @@ if ((metadatas != null) && (metadatas.size() > 0)) { // metadata
// Set up the component definition, and the properties for all types of
// components.
List<? extends IConnection> allInLineJobConns = NodeUtil.getFirstIncomingLineConnectionsOfType(node, "tRESTRequestIn");
%>
boolean doesNodeBelongToRequest_<%=cid%> = <%= allInLineJobConns.size() %> == 0;
@SuppressWarnings("unchecked")
java.util.Map<String, Object> restRequest_<%=cid%> = (java.util.Map<String, Object>)globalMap.get("restRequest");
String currentTRestRequestOperation_<%=cid%> = (String)(restRequest_<%=cid%> != null ? restRequest_<%=cid%>.get("OPERATION") : null);
<%
for (IConnection inLineConn : allInLineJobConns) {
%>
if("<%= inLineConn.getName() %>".equals(currentTRestRequestOperation_<%=cid%>)) {
doesNodeBelongToRequest_<%=cid%> = true;
}
<%
}
%>
org.talend.components.api.component.ComponentDefinition def_<%=cid %> =
new <%= def.getClass().getName()%>();
org.talend.components.api.component.runtime.Writer writer_<%=cid%> = null;
org.talend.components.api.component.runtime.Reader reader_<%=cid%> = null;
<%
List<Component.CodegenPropInfo> propsToProcess = component.getCodegenPropInfos(componentProps);
%>
@@ -163,7 +145,7 @@ globalMap.put("TALEND_COMPONENTS_VERSION", "<%=component.getVersion()%>");
boolean isParallelize ="true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__PARALLELIZE__"));
if (isParallelize) {
%>
final String buffersSizeKey_<%=cid%> = "buffersSizeKey_<%=cid%>_" + Thread.currentThread().getId();
final String buffersSizeKey_<%=cid%> = "buffersSizeKey_<%=cid%>_" + Thread.currentThread().getId();
<%
}
%>
@@ -233,11 +215,9 @@ if(componentRuntime_<%=cid%> instanceof org.talend.components.api.component.runt
org.talend.components.api.component.runtime.SourceOrSink sourceOrSink_<%=cid%> = null;
if(componentRuntime_<%=cid%> instanceof org.talend.components.api.component.runtime.SourceOrSink) {
sourceOrSink_<%=cid%> = (org.talend.components.api.component.runtime.SourceOrSink)componentRuntime_<%=cid%>;
if (doesNodeBelongToRequest_<%=cid%>) {
org.talend.daikon.properties.ValidationResult vr_<%=cid%> = sourceOrSink_<%=cid%>.validate(container_<%=cid%>);
if (vr_<%=cid%>.getStatus() == org.talend.daikon.properties.ValidationResult.Result.ERROR ) {
throw new RuntimeException(vr_<%=cid%>.getMessage());
}
org.talend.daikon.properties.ValidationResult vr_<%=cid%> = sourceOrSink_<%=cid%>.validate(container_<%=cid%>);
if (vr_<%=cid%>.getStatus() == org.talend.daikon.properties.ValidationResult.Result.ERROR ) {
throw new RuntimeException(vr_<%=cid%>.getMessage());
}
}
@@ -260,11 +240,11 @@ if(isTopologyNone) {
if (hasOutputOnly || asInputComponent) {
%>
if (sourceOrSink_<%=cid%> instanceof org.talend.components.api.component.runtime.Source) {
org.talend.components.api.component.runtime.Source source_<%=cid%> =
(org.talend.components.api.component.runtime.Source)sourceOrSink_<%=cid%>;
reader_<%=cid%> = source_<%=cid%>.createReader(container_<%=cid%>);
reader_<%=cid%> = new org.talend.codegen.flowvariables.runtime.FlowVariablesReader(reader_<%=cid%>, container_<%=cid%>);
org.talend.components.api.component.runtime.Source source_<%=cid%> =
(org.talend.components.api.component.runtime.Source)sourceOrSink_<%=cid%>;
org.talend.components.api.component.runtime.Reader reader_<%=cid%> =
source_<%=cid%>.createReader(container_<%=cid%>);
reader_<%=cid%> = new org.talend.codegen.flowvariables.runtime.FlowVariablesReader(reader_<%=cid%>, container_<%=cid%>);
<%
IConnection main = null;
@@ -286,19 +266,19 @@ if (hasOutputOnly || asInputComponent) {
IConnection schemaSourceConnector = (main!=null ? main : reject);
String schemaSourceConnectorName = schemaSourceConnector.getMetadataTable().getAttachedConnector();
%>
boolean multi_output_is_allowed_<%=cid%> = false;
boolean multi_output_is_allowed_<%=cid%> = false;
<% //take care SourceOrSink.validate will change the schema if it contains include-all-fields, so need to get design Avro schema before validate %>
org.talend.components.api.component.Connector c_<%=cid%> = null;
for (org.talend.components.api.component.Connector currentConnector : props_<%=cid %>.getAvailableConnectors(null, true)) {
if (currentConnector.getName().equals("<%=schemaSourceConnectorName%>")) {
c_<%=cid%> = currentConnector;
}
if (currentConnector.getName().equals("REJECT")) {//it's better to move the code to javajet
multi_output_is_allowed_<%=cid%> = true;
}
org.talend.components.api.component.Connector c_<%=cid%> = null;
for (org.talend.components.api.component.Connector currentConnector : props_<%=cid %>.getAvailableConnectors(null, true)) {
if (currentConnector.getName().equals("<%=schemaSourceConnectorName%>")) {
c_<%=cid%> = currentConnector;
}
org.apache.avro.Schema schema_<%=cid%> = props_<%=cid %>.getSchema(c_<%=cid%>, true);
if (currentConnector.getName().equals("REJECT")) {//it's better to move the code to javajet
multi_output_is_allowed_<%=cid%> = true;
}
}
org.apache.avro.Schema schema_<%=cid%> = props_<%=cid %>.getSchema(c_<%=cid%>, true);
<%
irToRow = new IndexedRecordToRowStructGenerator(cid, null, columnList);
@@ -306,123 +286,117 @@ if (hasOutputOnly || asInputComponent) {
}
%>
// Iterate through the incoming data.
boolean available_<%=cid%> = reader_<%=cid%>.start();
// Iterate through the incoming data.
boolean available_<%=cid%> = reader_<%=cid%>.start();
resourceMap.put("reader_<%=cid%>", reader_<%=cid%>);
resourceMap.put("reader_<%=cid%>", reader_<%=cid%>);
for (; available_<%=cid%>; available_<%=cid%> = reader_<%=cid%>.advance()) {
nb_line_<%=cid %>++;
for (; available_<%=cid%>; available_<%=cid%> = reader_<%=cid%>.advance()) {
nb_line_<%=cid %>++;
<%if(hasDataOutput) {%>
if (multi_output_is_allowed_<%=cid%>) {
<%if(main!=null){%>
<%=main.getName()%> = null;
<%}%>
<%if(hasDataOutput) {%>
if (multi_output_is_allowed_<%=cid%>) {
<%if(main!=null){%>
<%=main.getName()%> = null;
<%}%>
<%if(reject!=null){%>
<%=reject.getName()%> = null;
<%}%>
}
<%}%>
<%if(reject!=null){%>
<%=reject.getName()%> = null;
<%}%>
}
<%}%>
try {
Object data_<%=cid%> = reader_<%=cid%>.getCurrent();
<%
if (main != null) {
%>
try {
Object data_<%=cid%> = reader_<%=cid%>.getCurrent();
<%
if (main != null) {
%>
if(multi_output_is_allowed_<%=cid%>) {
<%=main.getName()%> = new <%=main.getName() %>Struct();
}
if(multi_output_is_allowed_<%=cid%>) {
<%=main.getName()%> = new <%=main.getName() %>Struct();
}
<%
irToRow.generateConvertRecord("data_" + cid, main.getName(), main.getMetadataTable().getListColumns());
}
%>
} catch (org.talend.components.api.exception.DataRejectException e_<%=cid%>) {
java.util.Map<String,Object> info_<%=cid%> = e_<%=cid%>.getRejectInfo();
<%
if (reject!=null) {
%>
Object data_<%=cid%> = info_<%=cid%>.get("talend_record");
<%
irToRow.generateConvertRecord("data_" + cid, main.getName(), main.getMetadataTable().getListColumns());
}
%>
} catch (org.talend.components.api.exception.DataRejectException e_<%=cid%>) {
java.util.Map<String,Object> info_<%=cid%> = e_<%=cid%>.getRejectInfo();
<%
if (reject!=null) {
%>
Object data_<%=cid%> = info_<%=cid%>.get("talend_record");
if (multi_output_is_allowed_<%=cid%>) {
<%=reject.getName()%> = new <%=reject.getName() %>Struct();
}
try{
<%
irToRow.generateConvertRecord("data_" + cid, reject.getName());
%>
}catch(java.lang.Exception e){
// do nothing
}
<%
Set<String> commonColumns = new HashSet<String>();
if (multi_output_is_allowed_<%=cid%>) {
<%=reject.getName()%> = new <%=reject.getName() %>Struct();
}
try{
<%
irToRow.generateConvertRecord("data_" + cid, reject.getName());
%>
}catch(java.lang.Exception e){
// do nothing
}
<%
Set<String> commonColumns = new HashSet<String>();
for (IMetadataColumn column : columnList) {
commonColumns.add(column.getLabel());
}
for (IMetadataColumn column : columnList) {
commonColumns.add(column.getLabel());
}
//pass error columns
List<IMetadataColumn> rejectColumns = reject.getMetadataTable().getListColumns();
for(IMetadataColumn column : rejectColumns) {
String columnName = column.getLabel();
//pass error columns
List<IMetadataColumn> rejectColumns = reject.getMetadataTable().getListColumns();
for(IMetadataColumn column : rejectColumns) {
String columnName = column.getLabel();
// JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
// JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
//error columns
if(!commonColumns.contains(columnName)) {
%>
<%=reject.getName()%>.<%=columnName%> = (<%=typeToGenerate%>)info_<%=cid%>.get("<%=columnName%>");
<%
}
}
} else {
%>
//TODO use a method instead of getting method by the special key "error/errorMessage"
Object errorMessage_<%=cid%> = null;
if(info_<%=cid%>.containsKey("error")){
errorMessage_<%=cid%> = info_<%=cid%>.get("error");
}else if(info_<%=cid%>.containsKey("errorMessage")){
errorMessage_<%=cid%> = info_<%=cid%>.get("errorMessage");
}else{
errorMessage_<%=cid%> = "Rejected but error message missing";
}
errorMessage_<%=cid%> = "Row "+ nb_line_<%=cid %> + ": "+errorMessage_<%=cid%>;
System.err.println(errorMessage_<%=cid%>);
<%
}
if (main != null) {
%>
// If the record is reject, the main line record should put NULL
<%=main.getName()%> = null;
<%
}
%>
} // end of catch
//error columns
if(!commonColumns.contains(columnName)) {
%>
<%=reject.getName()%>.<%=columnName%> = (<%=typeToGenerate%>)info_<%=cid%>.get("<%=columnName%>");
<%
}
}
} else {
%>
//TODO use a method instead of getting method by the special key "error/errorMessage"
Object errorMessage_<%=cid%> = null;
if(info_<%=cid%>.containsKey("error")){
errorMessage_<%=cid%> = info_<%=cid%>.get("error");
}else if(info_<%=cid%>.containsKey("errorMessage")){
errorMessage_<%=cid%> = info_<%=cid%>.get("errorMessage");
}else{
errorMessage_<%=cid%> = "Rejected but error message missing";
}
errorMessage_<%=cid%> = "Row "+ nb_line_<%=cid %> + ": "+errorMessage_<%=cid%>;
System.err.println(errorMessage_<%=cid%>);
<%
}
if (main != null) {
%>
// If the record is reject, the main line record should put NULL
<%=main.getName()%> = null;
<%
}
%>
}
<%
// The for loop around the incoming records from the reader is left open.
} else if (hasInput) {
%>
org.talend.codegen.enforcer.IncomingSchemaEnforcer incomingEnforcer_<%=cid%> = null;
if (sourceOrSink_<%=cid%> instanceof org.talend.components.api.component.runtime.Sink) {
org.talend.components.api.component.runtime.Sink sink_<%=cid%> =
(org.talend.components.api.component.runtime.Sink)sourceOrSink_<%=cid%>;
org.talend.components.api.component.runtime.WriteOperation writeOperation_<%=cid%> = sink_<%=cid%>.createWriteOperation();
if (doesNodeBelongToRequest_<%=cid%>) {
writeOperation_<%=cid%>.initialize(container_<%=cid%>);
}
writer_<%=cid%> = writeOperation_<%=cid%>.createWriter(container_<%=cid%>);
if (doesNodeBelongToRequest_<%=cid%>) {
writer_<%=cid%>.open("<%=cid%>");
}
org.talend.components.api.component.runtime.Sink sink_<%=cid%> =
(org.talend.components.api.component.runtime.Sink)sourceOrSink_<%=cid%>;
org.talend.components.api.component.runtime.WriteOperation writeOperation_<%=cid%> = sink_<%=cid%>.createWriteOperation();
writeOperation_<%=cid%>.initialize(container_<%=cid%>);
org.talend.components.api.component.runtime.Writer writer_<%=cid%> = writeOperation_<%=cid%>.createWriter(container_<%=cid%>);
writer_<%=cid%>.open("<%=cid%>");
resourceMap.put("writer_<%=cid%>", writer_<%=cid%>);
resourceMap.put("writer_<%=cid%>", writer_<%=cid%>);
} // end of "sourceOrSink_<%=cid%> instanceof ...Sink"
org.talend.components.api.component.Connector c_<%=cid%> = null;
for (org.talend.components.api.component.Connector currentConnector : props_<%=cid %>.getAvailableConnectors(null, false)) {
if (currentConnector.getName().equals("MAIN")) {
@@ -431,7 +405,8 @@ if (hasOutputOnly || asInputComponent) {
}
}
org.apache.avro.Schema designSchema_<%=cid%> = props_<%=cid %>.getSchema(c_<%=cid%>, false);
incomingEnforcer_<%=cid%> = new org.talend.codegen.enforcer.IncomingSchemaEnforcer(designSchema_<%=cid%>);
org.talend.codegen.enforcer.IncomingSchemaEnforcer incomingEnforcer_<%=cid%>
= new org.talend.codegen.enforcer.IncomingSchemaEnforcer(designSchema_<%=cid%>);
<%
List<? extends IConnection> outgoingConns = node.getOutgoingSortedConnections();
if (outgoingConns!=null){
@@ -467,8 +442,7 @@ if (hasOutputOnly || asInputComponent) {
}
}
}
%>
%>
java.lang.Iterable<?> outgoingMainRecordsList_<%=cid%> = new java.util.ArrayList<Object>();
java.util.Iterator outgoingMainRecordsIt_<%=cid%> = null;

View File

@@ -58,24 +58,13 @@ if(isTopologyNone) {
else if(hasOutputOnly || asInputComponent){
%>
} // while
<%
if (hasOutputOnly || asInputComponent) {
%>
} // end of "if (sourceOrSink_<%=cid%> instanceof ...Source)"
<% } %>
java.util.Map<String, Object> resultMap_<%=cid%> = null;
if (reader_<%=cid%> != null) {
reader_<%=cid%>.close();
resultMap_<%=cid%> = reader_<%=cid%>.getReturnValues();
}
reader_<%=cid%>.close();
final java.util.Map<String, Object> resultMap_<%=cid%> = reader_<%=cid%>.getReturnValues();
<%
}else if(hasInput){
%>
java.util.Map<String, Object> resultMap_<%=cid%> = null;
if (writer_<%=cid%> != null) {
org.talend.components.api.component.runtime.Result resultObject_<%=cid%> = (org.talend.components.api.component.runtime.Result)writer_<%=cid%>.close();
resultMap_<%=cid%> = writer_<%=cid%>.getWriteOperation().finalize(java.util.Arrays.<org.talend.components.api.component.runtime.Result>asList(resultObject_<%=cid%>), container_<%=cid%>);
}
org.talend.components.api.component.runtime.Result resultObject_<%=cid%> = (org.talend.components.api.component.runtime.Result)writer_<%=cid%>.close();
final java.util.Map<String, Object> resultMap_<%=cid%> = writer_<%=cid%>.getWriteOperation().finalize(java.util.Arrays.<org.talend.components.api.component.runtime.Result>asList(resultObject_<%=cid%>), container_<%=cid%>);
<%
} else {
return stringBuffer.toString();

View File

@@ -84,7 +84,7 @@ if(hasInput){
for (int i = 0; i < input_columnList.size(); i++) {
if(!input_columnList.get(i).getTalendType().equals("id_Dynamic")) {
%>
if (incomingEnforcer_<%=cid%> != null && incomingEnforcer_<%=cid%>.getDesignSchema().getField("<%=input_columnList.get(i)%>") == null){
if (incomingEnforcer_<%=cid%>.getDesignSchema().getField("<%=input_columnList.get(i)%>") == null){
incomingEnforcer_<%=cid%>.addIncomingNodeField("<%=input_columnList.get(i)%>", ((Object) <%=inputConn.getName()%>.<%=input_columnList.get(i)%>).getClass().getCanonicalName());
shouldCreateRuntimeSchemaForIncomingNode = true;
}
@@ -92,7 +92,7 @@ if(hasInput){
}
}
%>
if (shouldCreateRuntimeSchemaForIncomingNode && incomingEnforcer_<%=cid%> != null){
if (shouldCreateRuntimeSchemaForIncomingNode){
incomingEnforcer_<%=cid%>.createRuntimeSchema();
}
<%
@@ -111,7 +111,7 @@ if(hasInput){
if (dynamicPos != -1) {
%>
if (incomingEnforcer_<%=cid%> != null && !incomingEnforcer_<%=cid%>.areDynamicFieldsInitialized()) {
if (!incomingEnforcer_<%=cid%>.areDynamicFieldsInitialized()) {
// Initialize the dynamic columns when they are first encountered.
for (routines.system.DynamicMetadata dm_<%=cid%> : <%=inputConn.getName()%>.<%=input_columnList.get(dynamicPos).getLabel()%>.metadatas) {
incomingEnforcer_<%=cid%>.addDynamicField(
@@ -120,8 +120,7 @@ if(hasInput){
dm_<%=cid%>.getLogicalType(),
dm_<%=cid%>.getFormat(),
dm_<%=cid%>.getDescription(),
dm_<%=cid%>.isNullable(),
dm_<%=cid%>.isKey());
dm_<%=cid%>.isNullable());
}
incomingEnforcer_<%=cid%>.createRuntimeSchema();
}
@@ -129,26 +128,22 @@ if(hasInput){
}
%>
if (incomingEnforcer_<%=cid%> != null) {
incomingEnforcer_<%=cid%>.createNewRecord();
}
incomingEnforcer_<%=cid%>.createNewRecord();
<%
for (int i = 0; i < input_columnList.size(); i++) { // column
IMetadataColumn column = input_columnList.get(i);
if (dynamicPos != i) {
%>
//skip the put action if the input column doesn't appear in component runtime schema
if (incomingEnforcer_<%=cid%> != null && incomingEnforcer_<%=cid%>.getRuntimeSchema().getField("<%=input_columnList.get(i)%>") != null){
if (incomingEnforcer_<%=cid%>.getRuntimeSchema().getField("<%=input_columnList.get(i)%>") != null){
incomingEnforcer_<%=cid%>.put("<%=column.getLabel()%>", <%=inputConn.getName()%>.<%=column.getLabel()%>);
}
<%
} else {
%>
if (incomingEnforcer_<%=cid%> != null) {
for (int i = 0; i < <%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnCount(); i++) {
incomingEnforcer_<%=cid%>.put(<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnMetadata(i).getName(),
<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnValue(i));
}
for (int i = 0; i < <%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnCount(); i++) {
incomingEnforcer_<%=cid%>.put(<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnMetadata(i).getName(),
<%=inputConn.getName()%>.<%=column.getLabel()%>.getColumnValue(i));
}
<%
}
@@ -182,11 +177,7 @@ if(hasInput){
} // propInfo
%>
org.apache.avro.generic.IndexedRecord data_<%=cid%> = null;
if (incomingEnforcer_<%=cid%> != null) {
data_<%=cid%> = incomingEnforcer_<%=cid%>.getCurrentRecord();
}
org.apache.avro.generic.IndexedRecord data_<%=cid%> = incomingEnforcer_<%=cid%>.getCurrentRecord();
<%
boolean isParallelize ="true".equalsIgnoreCase(ElementParameterParser.getValue(node, "__PARALLELIZE__"));
@@ -199,9 +190,8 @@ if(hasInput){
}
}
%>
if (writer_<%=cid%> != null && data_<%=cid%> != null) {
writer_<%=cid%>.write(data_<%=cid%>);
}
writer_<%=cid%>.write(data_<%=cid%>);
nb_line_<%=cid %>++;
<%if(hasMainOutput){

View File

@@ -73,9 +73,6 @@ import pigudf.<%=routine%>;
import routines.<%=routine%>;
<% }
}%>
<%for (String codesJar : CodeGeneratorRoutine.getRequiredCodesJarName(process)) {%>
import <%=codesJar%>;
<%}%>
import routines.system.*;
import routines.system.api.*;
import java.text.ParseException;
@@ -392,26 +389,13 @@ public <%=JavaTypesManager.getTypeToGenerate(ctxParam.getType(),true)%> get<%=Ch
break;
}
}
boolean enableLogStash = !Boolean.getBoolean("deactivate_extended_component_log") && (jobCatcherNode!=null);
boolean enable_runtime_lineage_log = NodeUtil.isJobUsingRuntimeLineage(process) && (jobCatcherNode!=null);
if(jobCatcherNode!=null) {
%>
private final JobStructureCatcherUtils <%=jobCatcherNode.getUniqueName() %> = new JobStructureCatcherUtils(jobName, "<%=process.getId() %>", "<%=process.getVersion() %>");
<%
}
if(enable_runtime_lineage_log) {
%>
private org.talend.job.audit.JobAuditLogger runtime_lineage_logger_<%=jobCatcherNode.getUniqueName()%> = null;
<%
}
boolean enableLogStash = jobCatcherNode != null;
if (enableLogStash) {
%>
private final JobStructureCatcherUtils <%=jobCatcherNode.getUniqueName() %> = new JobStructureCatcherUtils(jobName, "<%=process.getId() %>", "<%=process.getVersion() %>");
private org.talend.job.audit.JobAuditLogger auditLogger_<%=jobCatcherNode.getUniqueName()%> = null;
private RunStat runStat = new RunStat(<%=jobCatcherNode.getUniqueName() %>, System.getProperty("audit.interval"));
private RunStat runStat = new RunStat(<%=jobCatcherNode.getUniqueName() %>);
<%
} else if(stats) {
%>
@@ -440,20 +424,6 @@ private RunTrace runTrace = new RunTrace();
globalMap.put(KEY_DB_DATASOURCES, talendDataSources);
globalMap.put(KEY_DB_DATASOURCES_RAW, new java.util.HashMap<String, javax.sql.DataSource>(dataSources));
}
public void setDataSourceReferences(List serviceReferences) throws Exception{
java.util.Map<String, routines.system.TalendDataSource> talendDataSources = new java.util.HashMap<String, routines.system.TalendDataSource>();
java.util.Map<String, javax.sql.DataSource> dataSources = new java.util.HashMap<String, javax.sql.DataSource>();
for (java.util.Map.Entry<String, javax.sql.DataSource> entry : BundleUtils.getServices(serviceReferences, javax.sql.DataSource.class).entrySet()) {
dataSources.put(entry.getKey(), entry.getValue());
talendDataSources.put(entry.getKey(), new routines.system.TalendDataSource(entry.getValue()));
}
globalMap.put(KEY_DB_DATASOURCES, talendDataSources);
globalMap.put(KEY_DB_DATASOURCES_RAW, new java.util.HashMap<String, javax.sql.DataSource>(dataSources));
}
<%
for (INode logCatcher : process.getNodesOfType("tLogCatcher")) {

View File

@@ -12,7 +12,6 @@
org.talend.designer.codegen.config.CodeGeneratorArgument
org.talend.designer.codegen.config.NodesSubTree
org.talend.core.model.process.IProcess
org.talend.core.model.process.ProcessUtils
org.talend.core.model.utils.NodeUtil
org.talend.core.model.process.IContextParameter
java.util.List
@@ -29,13 +28,7 @@ INode node = (INode)codeGenArgument.getArgument();
boolean containsTPartitioner = node.getProcess().getNodesOfType("tPartitioner").size() > 0 ? true : false;
boolean isRunJob = "tRunJob".equals(node.getComponent().getName());
IProcess process = node.getProcess();
boolean isTestContainer=ProcessUtils.isTestContainer(process);
String className = process.getName();
if (isTestContainer) {
className = className + "Test";
}
NodesSubTree subTree = (NodesSubTree) codeGenArgument.getSubTree();
ECodePart codePart = codeGenArgument.getCodePart();
//boolean trace = codeGenArgument.isTrace();
@@ -85,10 +78,10 @@ for (IConnection iterateConn : iterateConnSet) { //1
@Override
public Object put(String key, Object value) {
<%if(!isRunInMultiThread){%>
synchronized (<%=className%>.this.obj) {
synchronized (<%=process.getName()%>.this.obj) {
<%}%>
super.put(key, value);
return <%=className%>.this.globalMap.put(key, value);
return <%=process.getName()%>.this.globalMap.put(key, value);
<%if(!isRunInMultiThread){%>
}
<%}%>
@@ -165,7 +158,7 @@ for (IConnection iterateConn : iterateConnSet) { //1
synchronized (globalMap) {
this.globalMap = java.util.Collections.synchronizedMap(new ThreadedMap(globalMap));
<%}else{%>
synchronized (<%=className%>.this.obj) {
synchronized (<%=process.getName()%>.this.obj) {
this.globalMap = new ThreadedMap(globalMap);
<%}%>
}

View File

@@ -44,7 +44,7 @@
INode startNode = subTree.getRootNode();
String startNodeId = startNode.getUniqueName();
if ("tCollector".equals( startNode.getComponent().getOriginalName() )) {
if(startNodeId!=null && startNodeId.startsWith("tCollector")) {
List<? extends INode> departitioners = startNode.getProcess().getNodesOfType("tDepartitioner");
if(departitioners!=null) {
for(INode departitioner : departitioners) {

View File

@@ -1,121 +0,0 @@
<%
//copy from configuration.javajet for tacokit
%>
<%
//TODO: modify this part for Maps and nested lists.
if (p.getFieldType() == EParameterFieldType.TABLE || p.getFieldType() == EParameterFieldType.TACOKIT_SUGGESTABLE_TABLE) {
java.util.List<java.util.Map<String, String>> tableValues = ElementParameterParser.createTableValues((java.util.List<java.util.Map<String, Object>>) p.getValue(), p);
String[] items = p.getListItemsDisplayCodeName();
String tableName = p.getName().replace('$', '.');
boolean primitiveTable = items.length == 1 && items[0].equals(tableName + "[]");
String tableNamePrefix = tableName + "[]";
for (int i = 0; i < tableValues.size(); i++) {
java.util.Map<String, String> lineValues = tableValues.get(i);
for (int j = 0; j < items.length; j++) {
String key = tableName + "[" + i + "]";
if (!primitiveTable) {
final String columnName = items[j].substring(tableNamePrefix.length(), items[j].length());
key = key + columnName;
}
String value = lineValues.get(items[j]);
if (!org.talend.core.model.utils.ContextParameterUtils.isDynamic(value)) {
value = org.talend.core.model.utils.TalendTextUtils.removeQuotes(value);
value = org.talend.core.model.utils.TalendTextUtils.addQuotes(value);
}
if(value==null || "null".equals(value.trim())) {
value = "(Object)null";
}
%>
component_parameters.put("<%=key%>",String.valueOf(<%=value%>));
<%
}
}
} else if(p.getFieldType() == EParameterFieldType.SCHEMA_TYPE) {
final String parameterName = p.getName();
IConnection connection = null;
final List<? extends IConnection> connections = NodeUtil.getOutgoingConnections(node, p.getContext());
if(connections != null && !connections.isEmpty()) {
connection = connections.get(0);
}
if(connection != null) {
IMetadataTable metaTable = connection.getMetadataTable();
List<IMetadataColumn> columns = metaTable.getListColumns();
for(int i = 0; i < columns.size(); i++) {
IMetadataColumn column = columns.get(i);
%>
component_parameters.put("<%=parameterName%>[<%=i%>]", "<%=column.getLabel()%>");
<%
}
}
} else if (p.getFieldType() == EParameterFieldType.TACOKIT_INPUT_SCHEMA) {
final String parameterName = p.getName();
IConnection connection = null;
final List<? extends IConnection> connections = NodeUtil.getIncomingConnections(node, p.getContext());
if(connections != null && !connections.isEmpty()) {
connection = connections.get(0);
}
if(connection != null) {
IMetadataTable metaTable = connection.getMetadataTable();
List<IMetadataColumn> columns = metaTable.getListColumns();
for(int i = 0; i < columns.size(); i++) {
IMetadataColumn column = columns.get(i);
%>
component_parameters.put("<%=parameterName%>[<%=i%>]", "<%=column.getLabel()%>");
<%
}
}
} else {
final String key;
if(!p.getName().contains("$")){
key = p.getName();
}else{
final StringBuilder keyBuilder = new StringBuilder();
for (String part : p.getName().split("\\.")) {
if (keyBuilder.length() != 0) {
keyBuilder.append(".");
}
if (part.contains("$") && !part.startsWith("$")) {
keyBuilder.append(part.replace("$", "."));
} else {
keyBuilder.append(part);
}
}
key = keyBuilder.toString();
}
String value = null;
if(p.getFieldType() == EParameterFieldType.PASSWORD) {
continue;
} else {
value = ElementParameterParser.getStringElementParameterValue(p);
if (!org.talend.core.model.utils.ContextParameterUtils.isDynamic(value)) {
value = org.talend.core.model.utils.TalendTextUtils.removeQuotes(value);
value = org.talend.core.model.utils.TalendTextUtils.addQuotes(value);
}
}
if (value != null) {
if(key.endsWith("$maxBatchSize")){
%>
<%
} else if(p.getFieldType() == EParameterFieldType.CLOSED_LIST) {
String valueTemp = org.talend.core.model.utils.TalendTextUtils.removeQuotes(value);
if ("".equals(valueTemp)) {
String[] listItemsDisplayCodeValue = p.getListItemsDisplayCodeName();
if(listItemsDisplayCodeValue != null && listItemsDisplayCodeValue.length > 0){
valueTemp = listItemsDisplayCodeValue[0];
value = org.talend.core.model.utils.TalendTextUtils.addQuotes(valueTemp);
}
}
}
if(value==null || "null".equals(value.trim())) {
value = "(Object)null";
}
%>
component_parameters.put("<%=key%>", String.valueOf(<%=value%>));
<%
} // else do not put value in configuration
}
%>

View File

@@ -68,14 +68,6 @@
id="org.talend.designer.components.model.UserComponentsProvider">
</ComponentsProvider>
</extension>
<extension
point="org.talend.core.components_provider">
<ComponentsProvider
class="org.talend.designer.codegen.components.model.SharedStudioUserComponentProvider"
folderName="user"
id="org.talend.designer.codegen.components.model.SharedStudioUserComponentProvider">
</ComponentsProvider>
</extension>
<extension
point="org.eclipse.core.runtime.preferences">
<initializer

View File

@@ -69,15 +69,6 @@ public class JavaRoutineSynchronizer extends AbstractRoutineSynchronizer {
syncRoutineItems(getRoutines(true), true);
}
@Override
public void syncAllInnerCodes() throws SystemException {
syncInnerCodeItems(false);
}
@Override
public void syncAllInnerCodesForLogOn() throws SystemException {
syncInnerCodeItems(true);
}
private void syncRoutineItems(Collection<RoutineItem> routineObjects, boolean forceUpdate) throws SystemException {
for (RoutineItem routineItem : routineObjects) {

View File

@@ -26,8 +26,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,7 +75,6 @@ import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.model.components.IComponentsHandler;
import org.talend.core.model.components.filters.ComponentsFactoryProviderManager;
import org.talend.core.model.components.filters.IComponentFactoryFilter;
import org.talend.core.runtime.util.ComponentsLocationProvider;
import org.talend.core.ui.IJobletProviderService;
import org.talend.core.ui.ISparkJobletProviderService;
import org.talend.core.ui.ISparkStreamingJobletProviderService;
@@ -82,6 +83,8 @@ import org.talend.core.ui.images.CoreImageProvider;
import org.talend.core.utils.TalendCacheUtils;
import org.talend.designer.codegen.CodeGeneratorActivator;
import org.talend.designer.codegen.i18n.Messages;
import org.talend.designer.core.ITisLocalProviderService;
import org.talend.designer.core.ITisLocalProviderService.ResClassLoader;
import org.talend.designer.core.model.components.ComponentBundleToPath;
import org.talend.designer.core.model.components.ComponentFilesNaming;
import org.talend.designer.core.model.components.EmfComponent;
@@ -161,11 +164,7 @@ public class ComponentsFactory implements IComponentsFactory {
throw new RuntimeException(e);
}
isInitialising.set(true);
try {
removeOldComponentsUserFolder();
} catch (IOException ex) {
ExceptionHandler.process(ex);
} // not used anymore
removeOldComponentsUserFolder(); // not used anymore
long startTime = System.currentTimeMillis();
// TimeMeasure.display = true;
@@ -388,12 +387,10 @@ public class ComponentsFactory implements IComponentsFactory {
ComponentManager.saveResource();
}
private void removeOldComponentsUserFolder() throws IOException {
private void removeOldComponentsUserFolder() {
String userPath = IComponentsFactory.COMPONENTS_INNER_FOLDER + File.separatorChar
+ ComponentUtilities.getExtFolder(OLD_COMPONENTS_USER_INNER_FOLDER);
ComponentsProviderManager componentsProviderManager = ComponentsProviderManager.getInstance();
AbstractComponentsProvider componentsProvider = componentsProviderManager.loadUserComponentsProvidersFromExtension();
File componentsLocation = getComponentsLocation(componentsProvider, userPath);
File componentsLocation = getComponentsLocation(userPath);
if (componentsLocation != null && componentsLocation.exists()) {
FilesUtils.removeFolder(componentsLocation, true);
}
@@ -674,38 +671,114 @@ public class ComponentsFactory implements IComponentsFactory {
*
* @param currentFolder
* @return
* @throws IOException
* @throws BusinessException
*/
private File getComponentsLocation(AbstractComponentsProvider componentsProvider, String folder) throws IOException {
if (componentsProvider instanceof ComponentsLocationProvider) {
return componentsProvider.getInstallationFolder();
} else {
String componentsPath = IComponentsFactory.COMPONENTS_LOCATION;
IBrandingService breaningService = (IBrandingService) GlobalServiceRegister.getDefault()
.getService(IBrandingService.class);
if (breaningService.isPoweredOnlyCamel()) {
componentsPath = IComponentsFactory.CAMEL_COMPONENTS_LOCATION;
}
Bundle b = Platform.getBundle(componentsPath);
private File getComponentsLocation(String folder) {
String componentsPath = IComponentsFactory.COMPONENTS_LOCATION;
IBrandingService breaningService = (IBrandingService) GlobalServiceRegister.getDefault()
.getService(IBrandingService.class);
if (breaningService.isPoweredOnlyCamel()) {
componentsPath = IComponentsFactory.CAMEL_COMPONENTS_LOCATION;
}
Bundle b = Platform.getBundle(componentsPath);
File file = null;
try {
File file = null;
try {
URL url = FileLocator.find(b, new Path(folder), null);
if (url == null) {
return null;
}
URL fileUrl = FileLocator.toFileURL(url);
file = new File(fileUrl.getPath());
} catch (Exception e) {
// e.printStackTrace();
ExceptionHandler.process(e);
}
return file;
}
private File getComponentsLocation(String folder, AbstractComponentsProvider provider) {
File file = null;
try {
if (provider != null) {
file = provider.getInstallationFolder();
} else {
String componentsPath = IComponentsFactory.COMPONENTS_LOCATION;
Bundle b = Platform.getBundle(componentsPath);
IBrandingService breaningService = (IBrandingService) GlobalServiceRegister.getDefault()
.getService(IBrandingService.class);
if (breaningService.isPoweredOnlyCamel()) {
componentsPath = IComponentsFactory.CAMEL_COMPONENTS_LOCATION;
}
URL url = FileLocator.find(b, new Path(folder), null);
if (url == null) {
return null;
}
URL fileUrl = FileLocator.toFileURL(url);
file = new File(fileUrl.getPath());
} catch (Exception e) {
// e.printStackTrace();
ExceptionHandler.process(e);
}
return file;
}
}
} catch (Exception e) {
ExceptionHandler.process(e);
}
return file;
}
private ResourceBundle getComponentResourceBundle(IComponent currentComp, String source, String cachedPathSource,
AbstractComponentsProvider provider) {
try {
AbstractComponentsProvider currentProvider = provider;
if (currentProvider == null) {
ComponentsProviderManager componentsProviderManager = ComponentsProviderManager.getInstance();
Collection<AbstractComponentsProvider> providers = componentsProviderManager.getProviders();
for (AbstractComponentsProvider curProvider : providers) {
String path = new Path(curProvider.getInstallationFolder().toString()).toPortableString();
if (source.startsWith(path)) {
// fix for TDI-19889 and TDI-20507 to get the correct component provider
if (cachedPathSource != null) {
if (path.contains(cachedPathSource)) {
currentProvider = curProvider;
break;
}
} else {
currentProvider = curProvider;
break;
}
}
}
}
String installPath = currentProvider.getInstallationFolder().toString();
String label = ComponentFilesNaming.getInstance().getBundleName(currentComp.getName(),
installPath.substring(installPath.lastIndexOf(IComponentsFactory.COMPONENTS_INNER_FOLDER)));
if (currentProvider.isUseLocalProvider()) {
// if the component use local provider as storage (for user / ecosystem components)
// then get the bundle resource from the current main component provider.
// note: code here to review later, service like this shouldn't be used...
ResourceBundle bundle = null;
IBrandingService brandingService = (IBrandingService) GlobalServiceRegister.getDefault()
.getService(IBrandingService.class);
if (brandingService.isPoweredOnlyCamel()) {
bundle = currentProvider.getResourceBundle(label);
} else {
ITisLocalProviderService service = (ITisLocalProviderService) GlobalServiceRegister.getDefault()
.getService(ITisLocalProviderService.class);
bundle = service.getResourceBundle(label);
}
return bundle;
} else {
ResourceBundle bundle = ResourceBundle.getBundle(label, Locale.getDefault(),
new ResClassLoader(currentProvider.getClass().getClassLoader()));
return bundle;
}
} catch (IOException e) {
ExceptionHandler.process(e);
}
return null;
}
private String getCodeLanguageSuffix() {
@@ -1009,13 +1082,5 @@ public class ComponentsFactory implements IComponentsFactory {
public void setComponentsHandler(IComponentsHandler componentsHandler) {
this.componentsHandler = componentsHandler;
}
public String getCustomComponentBundlePath() {
ComponentsProviderManager componentsProviderManager = ComponentsProviderManager.getInstance();
AbstractComponentsProvider componentsProvider = componentsProviderManager.loadUserComponentsProvidersFromExtension();
String bundle = componentsProvider.getComponentsBundle();
return ComponentBundleToPath.getPathFromBundle(bundle);
}
}

View File

@@ -23,7 +23,6 @@ import org.eclipse.core.runtime.IExtensionRegistry;
import org.eclipse.core.runtime.Platform;
import org.talend.core.GlobalServiceRegister;
import org.talend.core.model.components.AbstractComponentsProvider;
import org.talend.core.runtime.util.SharedStudioInfoProvider;
import org.talend.core.ui.branding.IBrandingService;
import org.talend.designer.codegen.i18n.Messages;
@@ -70,9 +69,6 @@ public final class ComponentsProviderManager {
try {
AbstractComponentsProvider componentsProvider = (AbstractComponentsProvider) configurationElement
.createExecutableExtension("class"); //$NON-NLS-1$
if (componentsProvider instanceof SharedStudioInfoProvider && !((SharedStudioInfoProvider)componentsProvider).isSupportCurrentMode()) {
continue;
}
componentsProvider.setId(id);
componentsProvider.setFolderName(folderName);
componentsProvider.setContributer(contributerName);
@@ -85,15 +81,15 @@ public final class ComponentsProviderManager {
}
}
public AbstractComponentsProvider loadUserComponentsProvidersFromExtension() {
if (providers == null) {
loadComponentsProvidersFromExtension();
}
for (AbstractComponentsProvider provider : providers) {
if (provider instanceof UserComponentsProvider) {
return provider;
}
}
return null;
}
public AbstractComponentsProvider loadUserComponentsProvidersFromExtension() {
if (providers == null) {
loadComponentsProvidersFromExtension();
}
for (AbstractComponentsProvider provider : providers) {
if ("org.talend.designer.components.model.UserComponentsProvider".equals(provider.getId())) {
return provider;
}
}
return null;
}
}

View File

@@ -1,61 +0,0 @@
package org.talend.designer.codegen.components.model;
//============================================================================
//
//Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
//This source code is available under agreement available at
//%InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
//
//You should have received a copy of the agreement
//along with this program; if not, write to Talend SA
//9 rue Pages 92150 Suresnes, France
//
//============================================================================
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ResourceBundle;
import org.eclipse.core.runtime.IPath;
import org.eclipse.core.runtime.Path;
import org.eclipse.core.runtime.Platform;
import org.talend.core.model.components.ComponentUtilities;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.runtime.util.ComponentsLocationProvider;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.designer.core.model.components.ComponentBundleToPath;
public class SharedStudioUserComponentProvider extends UserComponentsProvider implements ComponentsLocationProvider{
@Override
public File getInstallationFolder() throws IOException {
File componentFolder = SharedStudioUtils.getSharedStudioComponentsParentFolder();
IPath path = new Path(IComponentsFactory.COMPONENTS_INNER_FOLDER);
path = path.append(IComponentsFactory.EXTERNAL_COMPONENTS_INNER_FOLDER).append(ComponentUtilities.getExtFolder(getFolderName()));
File installationFolder = new File (componentFolder, path.toOSString());
return installationFolder;
}
public String getComponentsBundle() {
return ComponentBundleToPath.SHARED_STUDIO_CUSTOM_COMPONENT_BUNDLE;
}
public boolean isSupportCurrentMode() {
if (SharedStudioUtils.isSharedStudioMode()) {
return true;
}
return false;
}
@Override
public ResourceBundle getResourceBundle(String label) {
URL configFolderUrl = Platform.getConfigurationLocation().getURL();
URLClassLoader urlLoader = new URLClassLoader(new java.net.URL[]{configFolderUrl});
java.util.ResourceBundle bundle = java.util.ResourceBundle.getBundle( label ,
java.util.Locale.getDefault(), urlLoader );
return bundle;
}
}

View File

@@ -34,15 +34,13 @@ import org.talend.core.model.components.ComponentUtilities;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.model.general.Project;
import org.talend.core.model.repository.ERepositoryObjectType;
import org.talend.core.runtime.util.SharedStudioInfoProvider;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.core.ui.branding.IBrandingService;
import org.talend.designer.codegen.CodeGeneratorActivator;
import org.talend.designer.codegen.components.ui.IComponentPreferenceConstant;
import org.talend.repository.ProjectManager;
/***/
public class UserComponentsProvider extends AbstractCustomComponentsProvider implements SharedStudioInfoProvider{
public class UserComponentsProvider extends AbstractCustomComponentsProvider {
@Override
protected File getExternalComponentsLocation() {
@@ -149,11 +147,5 @@ public class UserComponentsProvider extends AbstractCustomComponentsProvider imp
public String getComponentsBundle() {
return IComponentsFactory.COMPONENTS_LOCATION;
}
public boolean isSupportCurrentMode() {
if (SharedStudioUtils.isSharedStudioMode()) {
return false;
}
return true;
}
}

View File

@@ -18,7 +18,6 @@ import java.util.Map;
import org.eclipse.core.runtime.Platform;
import org.talend.commons.exception.ExceptionHandler;
import org.talend.commons.utils.StringUtils;
import org.talend.designer.core.model.components.ComponentBundleToPath;
/**
* Jet container for a particular component.
@@ -214,17 +213,8 @@ public class JetBean {
if (pluginIdToBundle.containsKey(pluginId)) {
base = pluginIdToBundle.get(pluginId);
} else {
if (ComponentBundleToPath.SHARED_STUDIO_CUSTOM_COMPONENT_BUNDLE.equals(pluginId)) {
base = ComponentBundleToPath.getPathFromBundle(pluginId);
if (!base.endsWith("/")) {
base = base + "/";
}
pluginIdToBundle.put(pluginId, base);
} else {
base = Platform.getBundle(pluginId).getEntry("/").toString(); //$NON-NLS-1$
pluginIdToBundle.put(pluginId, base);
}
base = Platform.getBundle(pluginId).getEntry("/").toString(); //$NON-NLS-1$
pluginIdToBundle.put(pluginId, base);
}
String result = base + relativeUri;
return result;

View File

@@ -136,11 +136,13 @@ public class TalendJETCompiler extends JETCompiler {
// get the plugin name from fileURI
String refPluginName = matcher.group(1);
// retrieve the plugin URI by pluginName.
String realURI = TemplateUtil.getPlatformUrlOfBundle(refPluginName);
if (realURI != null) {
Bundle refBundle = Platform.getBundle(refPluginName);
if (refBundle != null) {
String realURI = TemplateUtil.getPlatformUrlOfBundle(refPluginName);
// replace the old fileURI to new one by pluginURI
String newFileURI = fileURI.replaceFirst(PLUGIN_VAR_PATTERN.pattern(), realURI);
return newFileURI;
}
}
}

View File

@@ -14,7 +14,6 @@ package org.talend.designer.codegen.config;
import org.eclipse.core.runtime.Platform;
import org.osgi.framework.Bundle;
import org.talend.designer.core.model.components.ComponentBundleToPath;
/**
* CodeGenerator Templates Ressources Utils.
@@ -162,25 +161,10 @@ public class TemplateUtil {
* @return
*/
public static String getPlatformUrlOfBundle(String bundleName) {
if (ComponentBundleToPath.SHARED_STUDIO_CUSTOM_COMPONENT_BUNDLE.equals(bundleName)) {
String basePath = ComponentBundleToPath.getPathFromBundle(bundleName);
if (!basePath.endsWith("/")) {
basePath = basePath + "/";
}
return basePath;
} else {
Bundle bundle = Platform.getBundle(bundleName);
if (bundle == null) {
return null;
}
StringBuilder sb = new StringBuilder();
sb.append("platform:/plugin/");
sb.append(bundle.getSymbolicName());
sb.append("_");
sb.append(bundle.getVersion().toString());
sb.append("/");
return sb.toString();
}
Bundle bundle = Platform.getBundle(bundleName);
if (bundle == null) {
return null;
}
return "platform:/plugin/" + bundle.getSymbolicName() + "_" + bundle.getVersion().toString() + "/";
}
}

View File

@@ -47,7 +47,6 @@ import org.talend.core.ui.component.ComponentsFactoryProvider;
import org.talend.designer.codegen.CodeGeneratorActivator;
import org.talend.designer.codegen.config.TemplateUtil;
import org.talend.designer.codegen.i18n.Messages;
import org.talend.designer.core.model.components.ComponentBundleToPath;
/**
* DOC xtan
@@ -257,9 +256,10 @@ public final class JetSkeletonManager {
};
for (TemplateUtil template : CodeGeneratorInternalTemplatesFactoryProvider.getInstance().getTemplates()) {
Bundle b = Platform.getBundle(template.getJetPluginRepository());
URL resourcesUrl = null;
try {
resourcesUrl = FileLocator.toFileURL(ComponentBundleToPath.findComponentsBundleURL(template.getJetPluginRepository(), new Path(template.getTemplateRelativeUri()), null));
resourcesUrl = FileLocator.toFileURL(FileLocator.find(b, new Path(template.getTemplateRelativeUri()), null));
} catch (IOException e) {
ExceptionHandler.process(e);
}

View File

@@ -8,7 +8,6 @@ Require-Bundle: org.eclipse.core.runtime,
org.eclipse.ui,
org.apache.log4j,
org.apache.commons.collections,
org.apache.commons.discovery,
org.apache.commons.logging,
org.apache.commons.beanutils,
org.apache.commons.io,
@@ -26,6 +25,7 @@ Require-Bundle: org.eclipse.core.runtime,
org.talend.repository,
org.talend.core.repository,
org.talend.updates.runtime,
org.apache.axis,
org.eclipse.ui.intro,
org.eclipse.ui.forms,
org.eclipse.jface.text

View File

@@ -9,14 +9,6 @@
id="org.talend.designer.components.exchange.ExchangeComponentsProvider">
</ComponentsProvider>
</extension>
<extension
point="org.talend.core.components_provider">
<ComponentsProvider
class="org.talend.designer.components.exchange.SharedStudioExchangeComponentsProvider"
folderName="exchange"
id="org.talend.designer.components.exchange.SharedStudioExchangeComponentsProvider">
</ComponentsProvider>
</extension>
<extension
point="org.talend.core.runtime.service">
<Service

View File

@@ -28,15 +28,13 @@ import org.talend.core.GlobalServiceRegister;
import org.talend.core.model.components.AbstractComponentsProvider;
import org.talend.core.model.components.ComponentUtilities;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.runtime.util.SharedStudioInfoProvider;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.core.ui.branding.IBrandingService;
import org.talend.designer.components.exchange.util.ExchangeUtils;
/**
* DOC hcyi class global comment. Detailled comment
*/
public class ExchangeComponentsProvider extends AbstractComponentsProvider implements SharedStudioInfoProvider{
public class ExchangeComponentsProvider extends AbstractComponentsProvider {
/**
* ExchangeComponentsProvider constructor.
@@ -186,10 +184,4 @@ public class ExchangeComponentsProvider extends AbstractComponentsProvider imple
return IComponentsFactory.COMPONENTS_LOCATION;
}
public boolean isSupportCurrentMode() {
if (SharedStudioUtils.isSharedStudioMode()) {
return false;
}
return true;
}
}

View File

@@ -1,59 +0,0 @@
package org.talend.designer.components.exchange;
//============================================================================
//
//Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
//This source code is available under agreement available at
//%InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
//
//You should have received a copy of the agreement
//along with this program; if not, write to Talend SA
//9 rue Pages 92150 Suresnes, France
//
//============================================================================
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ResourceBundle;
import org.eclipse.core.runtime.IPath;
import org.eclipse.core.runtime.Path;
import org.eclipse.core.runtime.Platform;
import org.talend.core.model.components.ComponentUtilities;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.runtime.util.ComponentsLocationProvider;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.designer.core.model.components.ComponentBundleToPath;
public class SharedStudioExchangeComponentsProvider extends ExchangeComponentsProvider implements ComponentsLocationProvider{
@Override
public File getInstallationFolder() throws IOException {
File componentFolder = SharedStudioUtils.getSharedStudioComponentsParentFolder();
IPath path = new Path(IComponentsFactory.COMPONENTS_INNER_FOLDER);
path = path.append(IComponentsFactory.EXTERNAL_COMPONENTS_INNER_FOLDER).append(ComponentUtilities.getExtFolder(getFolderName()));
File installationFolder = new File (componentFolder, path.toOSString());
return installationFolder;
}
public String getComponentsBundle() {
return ComponentBundleToPath.SHARED_STUDIO_CUSTOM_COMPONENT_BUNDLE;
}
public boolean isSupportCurrentMode() {
if (SharedStudioUtils.isSharedStudioMode()) {
return true;
}
return false;
}
@Override
public ResourceBundle getResourceBundle(String label) {
URL configFolderUrl = Platform.getConfigurationLocation().getURL();
URLClassLoader urlLoader = new URLClassLoader(new java.net.URL[]{configFolderUrl});
java.util.ResourceBundle bundle = java.util.ResourceBundle.getBundle( label ,
java.util.Locale.getDefault(), urlLoader );
return bundle;
}
}

View File

@@ -0,0 +1,66 @@
// ============================================================================
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
//
// You should have received a copy of the agreement
// along with this program; if not, write to Talend SA
// 9 rue Pages 92150 Suresnes, France
//
// ============================================================================
package org.talend.designer.components.exchange.proxy;
import org.apache.commons.lang.StringUtils;
/**
*
* DOC hcyi class global comment. Detailled comment
*/
public class DefaultHTTPSTransportClientProperties extends DefaultHTTPTransportClientProperties {
/**
* @see org.apache.axis.components.net.TransportClientProperties#getProxyHost()
*/
@Override
public String getProxyHost() {
return StringUtils.trimToEmpty(System.getProperty("https.proxyHost")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getNonProxyHosts()
*/
@Override
public String getNonProxyHosts() {
return StringUtils.trimToEmpty(System.getProperty("https.nonProxyHosts")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getPort()
*/
@Override
public String getProxyPort() {
return StringUtils.trimToEmpty(System.getProperty("https.proxyPort")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getUser()
*/
@Override
public String getProxyUser() {
return StringUtils.trimToEmpty(System.getProperty("https.proxyUser")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getPassword()
*/
@Override
public String getProxyPassword() {
return StringUtils.trimToEmpty(System.getProperty("https.proxyPassword")); //$NON-NLS-1$
}
}

View File

@@ -0,0 +1,58 @@
// ============================================================================
//
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
//
// You should have received a copy of the agreement
// along with this program; if not, write to Talend SA
// 9 rue Pages 92150 Suresnes, France
//
// ============================================================================
package org.talend.designer.components.exchange.proxy;
import org.apache.axis.components.net.TransportClientProperties;
import org.apache.commons.lang.StringUtils;
/**
*
* DOC hcyi class global comment. Detailled comment
*/
public class DefaultHTTPTransportClientProperties implements TransportClientProperties {
/**
* @see org.apache.axis.components.net.TransportClientProperties#getProxyHost()
*/
public String getProxyHost() {
return StringUtils.trimToEmpty(System.getProperty("http.proxyHost")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getNonProxyHosts()
*/
public String getNonProxyHosts() {
return StringUtils.trimToEmpty(System.getProperty("http.nonProxyHosts")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getPort()
*/
public String getProxyPort() {
return StringUtils.trimToEmpty(System.getProperty("http.proxyPort")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getProxyUser()
*/
public String getProxyUser() {
return StringUtils.trimToEmpty(System.getProperty("http.proxyUser")); //$NON-NLS-1$
}
/**
* @see org.apache.axis.components.net.TransportClientProperties#getProxyPassword()
*/
public String getProxyPassword() {
return StringUtils.trimToEmpty(System.getProperty("http.proxyPassword")); //$NON-NLS-1$
}
}

View File

@@ -52,7 +52,6 @@ import org.talend.core.download.IDownloadHelper;
import org.talend.core.model.components.ComponentManager;
import org.talend.core.model.components.IComponent;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.core.ui.component.ComponentPaletteUtilities;
import org.talend.core.ui.component.ComponentsFactoryProvider;
import org.talend.designer.codegen.ICodeGeneratorService;
@@ -313,54 +312,51 @@ public class DownloadComponenentsAction extends Action implements IIntroAction {
protected void afterDownload(IProgressMonitor monitor, ComponentExtension extension, File localZipFile) throws Exception {
if (UpdatesHelper.isComponentUpdateSite(localZipFile)) {
if (!SharedStudioUtils.isSharedStudioMode()) {
final File workFolder = org.talend.utils.files.FileUtils.createTmpFolder("downloadedComponents", ""); //$NON-NLS-1$ //$NON-NLS-2$
final File workFolder = org.talend.utils.files.FileUtils.createTmpFolder("downloadedComponents", ""); //$NON-NLS-1$ //$NON-NLS-2$
try {
FilesUtils.copyFile(localZipFile, new File(workFolder, localZipFile.getName()));
try {
FilesUtils.copyFile(localZipFile, new File(workFolder, localZipFile.getName()));
ComponentsInstallComponent component = LocalComponentInstallHelper.getComponent();
if (component != null) {
try {
component.setComponentFolder(workFolder);
if (component.install()) {
ComponentsInstallComponent component = LocalComponentInstallHelper.getComponent();
if (component != null) {
try {
component.setComponentFolder(workFolder);
if (component.install()) {
if (component.needRelaunch()) {
askReboot();
} else {
MessageDialog.openInformation(DisplayUtils.getDefaultShell(),
Messages.getString("DownloadComponenentsAction.installComponentsTitle"),
component.getInstalledMessages());
}
} else {// install failure
MessageDialog.openWarning(DisplayUtils.getDefaultShell(),
Messages.getString("DownloadComponenentsAction_failureTitle"), //$NON-NLS-1$
Messages.getString("DownloadComponenentsAction_failureMessage", extension.getLabel())); //$NON-NLS-1$
if (component.needRelaunch()) {
askReboot();
} else {
MessageDialog.openInformation(DisplayUtils.getDefaultShell(),
Messages.getString("DownloadComponenentsAction.installComponentsTitle"),
component.getInstalledMessages());
}
} finally {
// after install, clear the setting for service.
component.setComponentFolder(null);
}
}
} catch (Exception e) {
// Popup dialog to user to waring install failed.
Display.getDefault().syncExec(new Runnable() {
@Override
public void run() {
MessageDialog.openError(DisplayUtils.getDefaultShell(false),
} else {// install failure
MessageDialog.openWarning(DisplayUtils.getDefaultShell(),
Messages.getString("DownloadComponenentsAction_failureTitle"), //$NON-NLS-1$
Messages.getString("DownloadComponenentsAction_failureMessage", extension.getLabel())); //$NON-NLS-1$
}
});
throw e;
} finally {
FilesUtils.deleteFolder(workFolder, true);
} finally {
// after install, clear the setting for service.
component.setComponentFolder(null);
}
}
monitor.done();
ExchangeManager.getInstance().saveDownloadedExtensionsToFile(extension);
}
} catch (Exception e) {
// Popup dialog to user to waring install failed.
Display.getDefault().syncExec(new Runnable() {
@Override
public void run() {
MessageDialog.openError(DisplayUtils.getDefaultShell(false),
Messages.getString("DownloadComponenentsAction_failureTitle"), //$NON-NLS-1$
Messages.getString("DownloadComponenentsAction_failureMessage", extension.getLabel())); //$NON-NLS-1$
}
});
throw e;
} finally {
FilesUtils.deleteFolder(workFolder, true);
}
monitor.done();
ExchangeManager.getInstance().saveDownloadedExtensionsToFile(extension);
} else {
File installedLocation = ComponentInstaller.unzip(localZipFile.getAbsolutePath(), getComponentsFolder()
.getAbsolutePath());

View File

@@ -37,7 +37,6 @@ import org.eclipse.swt.widgets.Shell;
import org.eclipse.ui.PlatformUI;
import org.talend.commons.ui.runtime.exception.ExceptionHandler;
import org.talend.core.download.DownloadHelper;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.designer.components.exchange.i18n.Messages;
import org.talend.designer.components.exchange.model.Category;
import org.talend.designer.components.exchange.model.VersionRevision;
@@ -106,7 +105,7 @@ public class ImportExchangeDialog extends Dialog {
@Override
protected void okPressed() {
IPath tempPath = SharedStudioUtils.getTempFolderPath();
IPath tempPath = new Path(System.getProperty("user.dir")).append("temp"); //$NON-NLS-1$ //$NON-NLS-2$
File pathFile = tempPath.toFile();
if (downloadproperty.getFileName() == null || downloadproperty.getFileName() == null) {
MessageBox box = new MessageBox(Display.getCurrent().getActiveShell(), SWT.ICON_WARNING | SWT.OK);

View File

@@ -25,9 +25,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.axis.components.net.TransportClientProperties;
import org.apache.axis.components.net.TransportClientPropertiesFactory;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.commons.discovery.tools.ManagedProperties;
import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.NameValuePair;
@@ -53,7 +55,6 @@ import org.talend.core.language.ECodeLanguage;
import org.talend.core.language.LanguageManager;
import org.talend.core.model.components.IComponentsFactory;
import org.talend.core.model.general.Project;
import org.talend.core.runtime.util.SharedStudioUtils;
import org.talend.core.ui.component.ComponentPaletteUtilities;
import org.talend.core.ui.component.ComponentsFactoryProvider;
import org.talend.designer.components.exchange.ExchangePlugin;
@@ -149,17 +150,14 @@ public class ExchangeUtils {
public static String sendGetRequest(String urlAddress) throws Exception {
HttpClient httpclient = new HttpClient();
GetMethod getMethod = new GetMethod(urlAddress);
String proxyUser = ManagedProperties.getProperty("http.proxyUser");
String proxyPassword = ManagedProperties.getProperty("http.proxyPassword");
String proxyHost = ManagedProperties.getProperty("http.proxyHost");
proxyHost = proxyHost != null ? proxyHost : "";
String proxyPort = ManagedProperties.getProperty("http.proxyPort");
if (proxyHost.length() != 0) {
TransportClientProperties tcp = TransportClientPropertiesFactory.create("http");
if (tcp.getProxyHost().length() != 0) {
UsernamePasswordCredentials creds = new UsernamePasswordCredentials(
proxyUser != null ? proxyUser : "", proxyPassword != null ? proxyPassword : "");
tcp.getProxyUser() != null ? tcp.getProxyUser() : "",
tcp.getProxyPassword() != null ? tcp.getProxyUser() : "");
httpclient.getState().setProxyCredentials(AuthScope.ANY, creds);
HostConfiguration hcf = new HostConfiguration();
hcf.setProxy(proxyHost, Integer.parseInt(proxyPort));
hcf.setProxy(tcp.getProxyHost(), Integer.parseInt(tcp.getProxyPort()));
httpclient.executeMethod(hcf, getMethod);
} else {
httpclient.executeMethod(getMethod);
@@ -207,19 +205,14 @@ public class ExchangeUtils {
* @return
*/
public static File getComponentFolder(String componentfolder) {
if (SharedStudioUtils.isSharedStudioMode()) {
File componentFolder = SharedStudioUtils.getSharedStudioComponentsExtFolder();
return new File (componentFolder, componentfolder);
} else {
URL url = FileLocator.find(ExchangePlugin.getDefault().getBundle(), new Path(componentfolder), null);
try {
URL fileUrl = FileLocator.toFileURL(url);
return new File(fileUrl.getPath());
} catch (Exception e) {
ExceptionHandler.process(e);
}
return null;
}
URL url = FileLocator.find(ExchangePlugin.getDefault().getBundle(), new Path(componentfolder), null);
try {
URL fileUrl = FileLocator.toFileURL(url);
return new File(fileUrl.getPath());
} catch (Exception e) {
ExceptionHandler.process(e);
}
return null;
}
/**

View File

@@ -11,7 +11,7 @@
<!-- modification 2: compile classpath -->
<path id="compile.classpath">
<pathelement location="../../../../../../tcommon-studio-se/main/plugins/org.talend.libraries.dom4j-jaxen/lib/dom4j-2.1.3.jar" />
<pathelement location="../../../../../../tcommon-studio-se/main/plugins/org.talend.libraries.dom4j-jaxen/lib/dom4j-1.6.1.jar" />
<pathelement location="../../../../../../tcommon-studio-se/main/plugins/org.talend.libraries.apache.common/lib/commons-lang-2.6.jar" />
</path>

View File

@@ -2,6 +2,7 @@
<project name="org.talend.designer.components.libs" default="buildall" basedir=".">
<target name="buildall">
<ant antfile="talend_file_enhanced_20070724/build.xml" target="process" inheritall="no" />
<ant antfile="sugarCRMManagement/build.xml" target="process" inheritall="no" />
<ant antfile="TalendSAX/build.xml" target="process" inheritall="no" />
</target>

View File

@@ -1,8 +1,8 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<artifactId>checkArchive</artifactId>
<version>1.2-20210901</version>
<groupId>org.talend.libraries</groupId>
<artifactId>checkArchive-1.1-20190917</artifactId>
<version>6.0.0</version>
<name>checkArchive</name>
<description>Dependence for tFileArchive and tFileUnAchive</description>
<url>http://maven.apache.org</url>
@@ -10,7 +10,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<java.source.version>1.8</java.source.version>
<java.source.version>1.7</java.source.version>
</properties>
<distributionManagement>
@@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<version>1.19</version>
</dependency>
</dependencies>
<build>

View File

@@ -3,32 +3,11 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components.lib</groupId>
<artifactId>commons-net-ftps-proxy</artifactId>
<version>3.6.1-talend-20200902</version>
<version>3.6.1-talend-20190819</version>
<name>commons-net-talend</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<slf4.version>1.7.25</slf4.version>
<lombok.version>1.18.12</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
@@ -36,6 +15,10 @@
</dependency>
</dependencies>
<properties>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
</properties>
<distributionManagement>
<snapshotRepository>
<id>talend_nexus_deployment</id>

View File

@@ -11,10 +11,8 @@ import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSessionContext;
import javax.net.ssl.SSLSocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPSClient;
@Slf4j
public class SSLSessionReuseFTPSClient extends FTPSClient {
public SSLSessionReuseFTPSClient(boolean isImplicit, SSLContext context) {
@@ -26,12 +24,6 @@ public class SSLSessionReuseFTPSClient extends FTPSClient {
if (socket instanceof SSLSocket) {
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
if (context == null) {
// TDI-44654 (may be reproduced with Syncplify server)
log.info("SSL Session Context is null. SSL Session was re-initialized.");
return;
}
try {
final Field sessionHostPortCache = context.getClass().getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
@@ -40,10 +32,10 @@ public class SSLSessionReuseFTPSClient extends FTPSClient {
putMethod.setAccessible(true);
InetAddress address = socket.getInetAddress();
int port = socket.getPort();
String key = String.format("%s:%s", address.getHostName(), String.valueOf(port)).toLowerCase(Locale.ROOT);
putMethod.invoke(cache, key, session);
key = String.format("%s:%s", address.getHostAddress(), String.valueOf(port)).toLowerCase(Locale.ROOT);
putMethod.invoke(cache, key, session);
} catch (Exception e) {

View File

@@ -2,9 +2,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<groupId>org.talend.libraries</groupId>
<artifactId>filecopy</artifactId>
<version>2.0.3</version>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>talend-copy</name>
@@ -14,7 +14,6 @@
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<java.source.version>1.8</java.source.version>
<junit5.version>5.4.2</junit5.version>
<slf4j.version>1.7.28</slf4j.version>
</properties>
<distributionManagement>
@@ -53,12 +52,7 @@
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@@ -15,21 +15,13 @@ package org.talend;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DOC Administrator class global comment. Detailled comment
*/
public class FileCopy {
static Logger logger = LoggerFactory.getLogger(Object.class);
/** Private constructor, only static methods */
private FileCopy() {
}
@@ -42,57 +34,16 @@ public class FileCopy {
* @param delSrc : true if delete source.
* @throws IOException : if IO pb.
*/
public static void copyFile(String srcFileName, String desFileName, boolean delSrc, boolean keepModified)
throws IOException {
final Path source = Paths.get(srcFileName);
final Path destination = Paths.get(desFileName);
FileTime lastModifiedTime = null;
try {
lastModifiedTime = Files.getLastModifiedTime(source);
} catch (IOException e) {
logger.warn(e.getLocalizedMessage());
}
public static void copyFile(String srcFileName, String desFileName, boolean delSrc) throws IOException {
final File source = new File(srcFileName);
final File destination = new File(desFileName);
if (delSrc) {
// move : more efficient if in same FS and mustr delete existing file.
Files.move(source, destination, StandardCopyOption.REPLACE_EXISTING);
Files.move(source.toPath(), destination.toPath(), StandardCopyOption.REPLACE_EXISTING);
} else {
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING);
Files.copy(source.toPath(), destination.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
if(keepModified){
try {
Files.setLastModifiedTime(destination,lastModifiedTime);
} catch (IOException e) {
logger.warn(e.getLocalizedMessage());
}
}
}
public static void copyFile(String srcFileName, String desFileName, boolean delSrc ) throws IOException {
copyFile(srcFileName,desFileName,delSrc,true);
}
/**
* Force Copy and Delete files.
*
* @param srcFileName : file name for source file.
* @param desFileName : file name for destination file.
* @throws IOException : if IO pb.
*/
public static void forceCopyAndDelete(String srcFileName, String desFileName, boolean keepModified) throws IOException {
final Path source = Paths.get(srcFileName);
final Path destination = Paths.get(desFileName);
final long lastModifiedTime = new File(srcFileName).lastModified();
Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING);
Files.delete(source);
if(keepModified){
destination.toFile().setLastModified(lastModifiedTime);
}
}
public static void forceCopyAndDelete(String srcFileName, String desFileName) throws IOException {
forceCopyAndDelete(srcFileName,desFileName,true);
}
}

View File

@@ -100,44 +100,6 @@ class FileCopyTest {
Assertions.assertEquals(referenceSize, copy.length(), "Size error");
}
@Test
void testForceCopyWithDelete() throws Exception {
final URL repCopy = Thread.currentThread().getContextClassLoader().getResource("copy");
File file = this.buildFile("fileToDelete.txt", 10L * 1024L);
file.deleteOnExit();
File copy = new File(repCopy.getPath(), "fileToDelete.txt");
long referenceSize = file.length();
if (!copy.exists()) {
copy.createNewFile();
}
copy.deleteOnExit();
FileCopy.forceCopyAndDelete(file.getPath(), copy.getPath());
Assertions.assertFalse(file.exists(), "file not delete");
Assertions.assertTrue(copy.exists(), "small file : original file deleted");
Assertions.assertEquals(referenceSize, copy.length(), "Size error");
}
@Test
void testLastModifiedTime() throws Exception {
final URL repCopy = Thread.currentThread().getContextClassLoader().getResource("copy");
File file = this.buildFile("fileLMT.txt", 10L * 1024L);
file.deleteOnExit();
long referencceTime = 324723894L;
file.setLastModified(referencceTime);
File copy = new File(repCopy.getPath(), "fileLMTDestination.txt");
if (copy.exists()) {
copy.delete();
}
copy.deleteOnExit();
FileCopy.copyFile(file.getPath(), copy.getPath(), true);
Assertions.assertEquals(referencceTime, copy.lastModified(), "modified time is not idential");
}
/**
* Generate a new file for testing.
*
@@ -163,22 +125,4 @@ class FileCopyTest {
return generatedFile;
}
@Test
void testKeepLastModifiedTime() throws Exception {
final URL repCopy = Thread.currentThread().getContextClassLoader().getResource("copy");
File file = this.buildFile("fileLMT.txt", 10L * 1024L);
file.deleteOnExit();
long referencceTime = 324723894L;
file.setLastModified(referencceTime);
File copy = new File(repCopy.getPath(), "fileLMTDestination.txt");
if (copy.exists()) {
copy.delete();
}
copy.deleteOnExit();
FileCopy.copyFile(file.getPath(), copy.getPath(), true,true);
Assertions.assertEquals(referencceTime, copy.lastModified(), "modified time is not idential");
}
}

View File

@@ -11,7 +11,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cxf.version>3.3.10</cxf.version>
<cxf.version>3.1.2</cxf.version>
</properties>
<build>

View File

@@ -13,9 +13,9 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.1.3</version>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>

View File

@@ -2,9 +2,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<artifactId>simpleexcel</artifactId>
<version>2.4-20200923</version>
<groupId>org.talend.libraries</groupId>
<artifactId>simpleexcel-2.2-20190722</artifactId>
<version>6.0.0</version>
<packaging>jar</packaging>
<name>simpleexcel</name>
@@ -13,7 +13,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<java.source.version>1.8</java.source.version>
<java.source.version>1.6</java.source.version>
</properties>
<distributionManagement>
@@ -43,30 +43,48 @@
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.1.2</version>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-scratchpad</artifactId>
<version>4.1.2</version>
<version>4.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.1.2</version>
<version>4.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml-schemas -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml-schemas</artifactId>
<version>4.1.2</version>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.geronimo.specs/geronimo-stax-api_1.0_spec -->
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-stax-api_1.0_spec</artifactId>
<version>1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/dom4j/dom4j -->
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xmlbeans/xmlbeans -->
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
<build>
<resources>

View File

@@ -1,6 +1,6 @@
// ============================================================================
//
// Copyright (C) 2006-2020 Talend Inc. - www.talend.com
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt

View File

@@ -1,6 +1,6 @@
// ============================================================================
//
// Copyright (C) 2006-2020 Talend Inc. - www.talend.com
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt

View File

@@ -1,6 +1,6 @@
// ============================================================================
//
// Copyright (C) 2006-2020 Talend Inc. - www.talend.com
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt

View File

@@ -1,6 +1,6 @@
// ============================================================================
//
// Copyright (C) 2006-2020 Talend Inc. - www.talend.com
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt

View File

@@ -1,6 +1,6 @@
// ============================================================================
//
// Copyright (C) 2006-2020 Talend Inc. - www.talend.com
// Copyright (C) 2006-2019 Talend Inc. - www.talend.com
//
// This source code is available under agreement available at
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt

View File

@@ -1,15 +1,16 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<artifactId>components-soap</artifactId>
<version>2.3-20200918</version>
<groupId>org.talend.libraries</groupId>
<artifactId>talend-soap</artifactId>
<version>2.1-20190716</version>
<packaging>jar</packaging>
<name>talend-soap</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cxf.version>3.1.1</cxf.version>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
</properties>
@@ -45,24 +46,29 @@
<systemPath>${java.home}/lib/rt.jar</systemPath>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.1.3</version>
<groupId>jdom</groupId>
<artifactId>jdom</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.sun.xml.messaging.saaj</groupId>
<artifactId>saaj-impl</artifactId>
<version>1.5.2</version>
<groupId>com.sun.xml.messaging.saaj</groupId>
<artifactId>saaj-impl</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<version>2.12.0</version>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.14</version>
<version>1.9</version>
</dependency>
</dependencies>
<build>
@@ -102,4 +108,4 @@
</plugin>
</plugins>
</build>
</project>
</project>

View File

@@ -32,7 +32,8 @@ import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.codec.binary.Base64;
import org.dom4j.io.DOMReader;
import org.jdom.input.DOMBuilder;
import org.jdom.output.XMLOutputter;
import org.talend.soap.sun.SunNtlmAuthenticationUpdater;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -44,6 +45,8 @@ public class SOAPUtil {
private static final String vmVendor = System.getProperty("java.vendor.url");
private static final String ibmVmVendor = "http://www.ibm.com/";
private static final String sunVmVendor = "http://java.sun.com/";
private static final String oracleVmVendor = "http://java.oracle.com/";
@@ -137,7 +140,12 @@ public class SOAPUtil {
StreamSource preppedMsgSrc = new StreamSource(stream);
soapPart.setContent(preppedMsgSrc);
// InputStream stream = new FileInputStream(new File("d://soap.txt"));
// StreamSource preppedMsgSrc = new StreamSource(stream);
// soapPart.setContent(preppedMsgSrc);
message.saveChanges();
// Send the message
SOAPMessage reply = connection.call(message, destination);
@@ -218,7 +226,7 @@ public class SOAPUtil {
Node content;
Element headerRootElem = document.createElement("Header");
Iterator<javax.xml.soap.Node> childElements = header.getChildElements();
Iterator childElements = header.getChildElements();
org.w3c.dom.Node domNode = null;
while (childElements.hasNext()) {
domNode = (org.w3c.dom.Node) childElements.next();
@@ -237,11 +245,12 @@ public class SOAPUtil {
return reHeaderMessage;
}
private String Doc2StringWithoutDeclare(Document doc) {
DOMReader reader = new DOMReader();
org.dom4j.Document document = reader.read(doc);
return document.getRootElement().asXML();
}
private String Doc2StringWithoutDeclare(Document doc) {
DOMBuilder builder = new DOMBuilder();
org.jdom.Document jdomDoc = builder.build(doc);
XMLOutputter outputter = new XMLOutputter();
return outputter.outputString(jdomDoc.getRootElement());
}
/**
* invoke soap and return the response document
@@ -354,4 +363,4 @@ public class SOAPUtil {
headers.setHeader("Authorization", "Basic " + encodeUserInfo);
}
}
}

View File

@@ -1,66 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components.lib</groupId>
<artifactId>talend-aws</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<name>talend-aws</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<java.source.version>1.8</java.source.version>
</properties>
<distributionManagement>
<snapshotRepository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceSnapshot/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</snapshotRepository>
<repository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceRelease/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.848</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${java.source.version}</source>
<target>${java.source.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,277 +0,0 @@
package org.talend.aws;
import static com.amazonaws.event.SDKProgressPublisher.publishProgress;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressListenerChain;
import com.amazonaws.services.s3.model.LegacyS3ProgressListener;
import com.amazonaws.services.s3.transfer.Transfer;
import com.amazonaws.services.s3.transfer.TransferProgress;
import com.amazonaws.services.s3.transfer.internal.TransferMonitor;
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
/**
* Abstract transfer implementation.
*/
public abstract class AbstractTransfer implements Transfer {
/** The current state of this transfer. */
protected volatile TransferState state = TransferState.Waiting;
protected TransferMonitor monitor;
/** The progress of this transfer. */
private final TransferProgress transferProgress;
private final String description;
/** Hook for adding/removing more progress listeners. */
protected final ProgressListenerChain listenerChain;
/** Collection of listeners to be notified for changes to the state of this transfer via setState() */
protected final Collection<TransferStateChangeListener> stateChangeListeners = new LinkedList<TransferStateChangeListener>();
AbstractTransfer(String description, TransferProgress transferProgress, ProgressListenerChain progressListenerChain) {
this(description, transferProgress, progressListenerChain, null);
}
AbstractTransfer(String description, TransferProgress transferProgress,
ProgressListenerChain progressListenerChain, TransferStateChangeListener stateChangeListener) {
this.description = description;
this.listenerChain = progressListenerChain;
this.transferProgress = transferProgress;
addStateChangeListener(stateChangeListener);
}
/**
* Returns whether or not the transfer is finished (i.e. completed successfully,
* failed, or was canceled). This method should never block.
*
* @return Returns <code>true</code> if this transfer is finished (i.e. completed successfully,
* failed, or was canceled). Returns <code>false</code> if otherwise.
*/
public final synchronized boolean isDone() {
return (state == TransferState.Failed ||
state == TransferState.Completed ||
state == TransferState.Canceled);
}
/**
* Waits for this transfer to complete. This is a blocking call; the current
* thread is suspended until this transfer completes.
*
* @throws AmazonClientException
* If any errors were encountered in the client while making the
* request or handling the response.
* @throws AmazonServiceException
* If any errors occurred in Amazon S3 while processing the
* request.
* @throws InterruptedException
* If this thread is interrupted while waiting for the transfer
* to complete.
*/
public void waitForCompletion()
throws AmazonClientException, AmazonServiceException, InterruptedException {
try {
Object result = null;
while (!monitor.isDone() || result == null) {
Future<?> f = monitor.getFuture();
result = f.get();
}
} catch (ExecutionException e) {
rethrowExecutionException(e);
}
}
/**
* Waits for this transfer to finish and returns any error that occurred, or
* returns <code>null</code> if no errors occurred.
* This is a blocking call; the current thread
* will be suspended until this transfer either fails or completes
* successfully.
*
* @return Any error that occurred while processing this transfer.
* Otherwise returns <code>null</code> if no errors occurred.
*
* @throws InterruptedException
* If this thread is interrupted while waiting for the transfer
* to complete.
*/
public AmazonClientException waitForException() throws InterruptedException {
try {
/**
* Do not remove the while loop. We need this as the future returned by
* monitor.getFuture() is set two times during the upload and copy operations.
*/
while (!monitor.isDone()) {
monitor.getFuture().get();
}
monitor.getFuture().get();
return null;
} catch (ExecutionException e) {
return unwrapExecutionException(e);
}
}
/**
* Returns a human-readable description of this transfer.
*
* @return A human-readable description of this transfer.
*/
public String getDescription() {
return description;
}
/**
* Returns the current state of this transfer.
*
* @return The current state of this transfer.
*/
public synchronized TransferState getState() {
return state;
}
/**
* Sets the current state of this transfer.
*/
public void setState(TransferState state) {
synchronized (this) {
this.state = state;
}
for ( TransferStateChangeListener listener : stateChangeListeners ) {
listener.transferStateChanged(this, state);
}
}
/**
* Notifies all the registered state change listeners of the state update.
*/
public void notifyStateChangeListeners(TransferState state) {
for ( TransferStateChangeListener listener : stateChangeListeners ) {
listener.transferStateChanged(this, state);
}
}
/**
* Adds the specified progress listener to the list of listeners
* receiving updates about this transfer's progress.
*
* @param listener
* The progress listener to add.
*/
public synchronized void addProgressListener(ProgressListener listener) {
listenerChain.addProgressListener(listener);
}
/**
* Removes the specified progress listener from the list of progress
* listeners receiving updates about this transfer's progress.
*
* @param listener
* The progress listener to remove.
*/
public synchronized void removeProgressListener(ProgressListener listener) {
listenerChain.removeProgressListener(listener);
}
/**
* @deprecated Replaced by {@link #addProgressListener(ProgressListener)}
*/
@Deprecated
public synchronized void addProgressListener(com.amazonaws.services.s3.model.ProgressListener listener) {
listenerChain.addProgressListener(new LegacyS3ProgressListener(listener));
}
/**
* @deprecated Replaced by {@link #removeProgressListener(ProgressListener)}
*/
@Deprecated
public synchronized void removeProgressListener(com.amazonaws.services.s3.model.ProgressListener listener) {
listenerChain.removeProgressListener(new LegacyS3ProgressListener(listener));
}
/**
* Adds the given state change listener to the collection of listeners.
*/
public synchronized void addStateChangeListener(TransferStateChangeListener listener) {
if ( listener != null )
stateChangeListeners.add(listener);
}
/**
* Removes the given state change listener from the collection of listeners.
*/
public synchronized void removeStateChangeListener(TransferStateChangeListener listener) {
if ( listener != null )
stateChangeListeners.remove(listener);
}
/**
* Returns progress information about this transfer.
*
* @return The progress information about this transfer.
*/
public TransferProgress getProgress() {
return transferProgress;
}
/**
* Sets the monitor used to poll for transfer completion.
*/
public void setMonitor(TransferMonitor monitor) {
this.monitor = monitor;
}
public TransferMonitor getMonitor() {
return monitor;
}
protected void fireProgressEvent(final ProgressEventType eventType) {
publishProgress(listenerChain, eventType);
}
/**
* Examines the cause of the specified ExecutionException and either
* rethrows it directly (if it's a type of AmazonClientException) or wraps
* it in an AmazonClientException and rethrows it.
*
* @param e
* The execution exception to examine.
*/
protected void rethrowExecutionException(ExecutionException e) {
throw unwrapExecutionException(e);
}
/**
* Unwraps the root exception that caused the specified ExecutionException
* and returns it. If it was not an instance of AmazonClientException, it is
* wrapped as an AmazonClientException.
*
* @param e
* The ExecutionException to unwrap.
*
* @return The root exception that caused the specified ExecutionException.
*/
protected AmazonClientException unwrapExecutionException(ExecutionException e) {
Throwable t = e;
while (t.getCause() != null && t instanceof ExecutionException) {
t = t.getCause();
}
if (t instanceof AmazonClientException) {
return (AmazonClientException) t;
}
return new AmazonClientException("Unable to complete transfer: " + t.getMessage(), t);
}
}

View File

@@ -1,39 +0,0 @@
package org.talend.aws;
import com.amazonaws.annotation.SdkInternalApi;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.transfer.Transfer;
import java.io.File;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* Helper class to merge all the individual part files into a destinationFile.
*/
@SdkInternalApi
public class CompleteMultipartDownload implements Callable<File> {
private final List<Future<File>> partFiles;
private final File destinationFile;
private final DownloadImpl download;
private Integer currentPartNumber;
public CompleteMultipartDownload(List<Future<File>> files, File destinationFile, DownloadImpl download, Integer currentPartNumber) {
this.partFiles = files;
this.destinationFile = destinationFile;
this.download = download;
this.currentPartNumber = currentPartNumber;
}
@Override
public File call() throws Exception {
for (Future<File> file : partFiles) {
ServiceUtils.appendFile(file.get(), destinationFile);
download.updatePersistableTransfer(currentPartNumber++);
}
download.setState(Transfer.TransferState.Completed);
return destinationFile;
}
}

View File

@@ -1,60 +0,0 @@
package org.talend.aws;
import java.io.IOException;
import com.amazonaws.services.s3.model.CryptoMode;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.transfer.Transfer;
import com.amazonaws.services.s3.transfer.exception.PauseException;
/**
* Represents an asynchronous download from Amazon S3.
*/
public interface Download extends Transfer {
/**
* Returns the ObjectMetadata for the object being downloaded.
*
* @return The ObjectMetadata for the object being downloaded.
*/
public ObjectMetadata getObjectMetadata();
/**
* The name of the bucket where the object is being downloaded from.
*
* @return The name of the bucket where the object is being downloaded from.
*/
public String getBucketName();
/**
* The key under which this object was stored in Amazon S3.
*
* @return The key under which this object was stored in Amazon S3.
*/
public String getKey();
/**
* Cancels this download.
*
* @throws IOException
*/
public void abort() throws IOException;
/**
* Pause the current download operation and returns the information that can
* be used to resume the download at a later time.
*
* Resuming a download would not perform ETag check as range get is
* performed for downloading the object's remaining contents.
*
* Resuming a download for an object encrypted using
* {@link CryptoMode#StrictAuthenticatedEncryption} would result in
* AmazonClientException as authenticity cannot be guaranteed for a range
* get operation.
*
* @throws PauseException
* If any errors were encountered while trying to pause the
* download.
*/
public PersistableDownload pause() throws PauseException;
}

View File

@@ -1,312 +0,0 @@
package org.talend.aws;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.net.ssl.SSLProtocolException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkClientException;
import com.amazonaws.annotation.SdkInternalApi;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.internal.FileLocks;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.internal.ServiceUtils.RetryableS3DownloadTask;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.Transfer.TransferState;
import com.amazonaws.services.s3.transfer.exception.FileLockException;
import com.amazonaws.util.IOUtils;
@SdkInternalApi
final class DownloadCallable implements Callable<File> {
private static final Log LOG = LogFactory.getLog(DownloadCallable.class);
private final AmazonS3 s3;
private final CountDownLatch latch;
private final GetObjectRequest req;
private final boolean resumeExistingDownload;
private final DownloadImpl download;
private final File dstfile;
private final long origStartingByte;
private final long timeout;
private final ScheduledExecutorService timedExecutor;
/** The thread pool in which parts are downloaded downloaded. */
private final ExecutorService executor;
private final List<Future<File>> futureFiles;
private final boolean isDownloadParallel;
private Integer lastFullyMergedPartNumber;
private final boolean resumeOnRetry;
private long expectedFileLength;
DownloadCallable(AmazonS3 s3, CountDownLatch latch,
GetObjectRequest req, boolean resumeExistingDownload,
DownloadImpl download, File dstfile, long origStartingByte,
long expectedFileLength, long timeout,
ScheduledExecutorService timedExecutor,
ExecutorService executor,
Integer lastFullyDownloadedPartNumber, boolean isDownloadParallel, boolean resumeOnRetry)
{
if (s3 == null || latch == null || req == null || dstfile == null || download == null)
throw new IllegalArgumentException();
this.s3 = s3;
this.latch = latch;
this.req = req;
this.resumeExistingDownload = resumeExistingDownload;
this.download = download;
this.dstfile = dstfile;
this.origStartingByte = origStartingByte;
this.expectedFileLength = expectedFileLength;
this.timeout = timeout;
this.timedExecutor = timedExecutor;
this.executor = executor;
this.futureFiles = new ArrayList<Future<File>>();
this.lastFullyMergedPartNumber = lastFullyDownloadedPartNumber;
this.isDownloadParallel = isDownloadParallel;
this.resumeOnRetry = resumeOnRetry;
}
/**
* This method must return a non-null object, or else the existing
* implementation in {@link AbstractTransfer#waitForCompletion()}
* would block forever.
*
* @return the downloaded file
*/
@Override
public File call() throws Exception {
try {
latch.await();
if (isTimeoutEnabled()) {
timedExecutor.schedule(new Runnable() {
public void run() {
try {
if (download.getState() != TransferState.Completed) {
download.abort();
}
} catch(Exception e) {
throw new SdkClientException(
"Unable to abort download after timeout", e);
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
download.setState(TransferState.InProgress);
ServiceUtils.createParentDirectoryIfNecessary(dstfile);
if (isDownloadParallel) {
downloadInParallel(ServiceUtils.getPartCount(req, s3));
} else {
S3Object s3Object = retryableDownloadS3ObjectToFile(dstfile,
new DownloadTaskImpl(s3, download, req));
updateDownloadStatus(s3Object);
}
return dstfile;
} catch (Throwable t) {
// Cancel all the futures
for (Future<File> f : futureFiles) {
f.cancel(true);
}
// Downloads aren't allowed to move from canceled to failed
if (download.getState() != TransferState.Canceled) {
download.setState(TransferState.Failed);
}
if (t instanceof Exception)
throw (Exception) t;
else
throw (Error) t;
}
}
/**
* Takes the result from serial download,
* updates the transfer state and monitor in downloadImpl object
* based on the result.
*/
private void updateDownloadStatus(S3Object result) {
if (result == null) {
download.setState(TransferState.Canceled);
download.setMonitor(new DownloadMonitor(download, null));
} else {
download.setState(TransferState.Completed);
}
}
/**
* Downloads each part of the object into a separate file synchronously and
* combines all the files into a single file.
*/
private void downloadInParallel(int partCount) throws Exception {
if (lastFullyMergedPartNumber == null) {
lastFullyMergedPartNumber = 0;
}
for (int i = lastFullyMergedPartNumber + 1; i <= partCount; i++) {
GetObjectRequest getPartRequest = new GetObjectRequest(req.getBucketName(), req.getKey(),
req.getVersionId()).withUnmodifiedSinceConstraint(req.getUnmodifiedSinceConstraint())
.withModifiedSinceConstraint(req.getModifiedSinceConstraint())
.withResponseHeaders(req.getResponseHeaders()).withSSECustomerKey(req.getSSECustomerKey())
.withGeneralProgressListener(req.getGeneralProgressListener());
getPartRequest.setMatchingETagConstraints(req.getMatchingETagConstraints());
getPartRequest.setNonmatchingETagConstraints(req.getNonmatchingETagConstraints());
getPartRequest.setRequesterPays(req.isRequesterPays());
futureFiles.add(
executor.submit(new DownloadPartCallable(s3, getPartRequest.withPartNumber(i), dstfile)));
}
truncateDestinationFileIfNecessary();
Future<File> future = executor.submit(new CompleteMultipartDownload(futureFiles, dstfile, download, ++lastFullyMergedPartNumber));
((DownloadMonitor) download.getMonitor()).setFuture(future);
}
/**
* If only partial part object is merged into the dstFile(due to pause
* operation), adjust the file length so that the part starts writing from
* the correct position.
*/
private void truncateDestinationFileIfNecessary() {
RandomAccessFile raf = null;
if (!FileLocks.lock(dstfile)) {
throw new FileLockException("Fail to lock " + dstfile);
}
try {
raf = new RandomAccessFile(dstfile, "rw");
if (lastFullyMergedPartNumber == 0) {
raf.setLength(0);
} else {
long lastByte = ServiceUtils.getLastByteInPart(s3, req, lastFullyMergedPartNumber);
if (dstfile.length() < lastByte) {
throw new SdkClientException(
"File " + dstfile.getAbsolutePath() + " has been modified since last pause.");
}
raf.setLength(lastByte + 1);
download.getProgress().updateProgress(lastByte + 1);
}
} catch (Exception e) {
throw new SdkClientException("Unable to append part file to dstfile " + e.getMessage(), e);
} finally {
IOUtils.closeQuietly(raf, LOG);
FileLocks.unlock(dstfile);
}
}
/**
* This method is called only if it is a resumed download.
*
* Adjust the range of the get request, and the expected (ie current) file
* length of the destination file to append to.
*/
private void adjustRequest(GetObjectRequest req) {
long[] range = req.getRange();
long lastByte = range[1];
long totalBytesToDownload = lastByte - this.origStartingByte + 1;
if (dstfile.exists()) {
if (!FileLocks.lock(dstfile)) {
throw new FileLockException("Fail to lock " + dstfile
+ " for range adjustment");
}
try {
expectedFileLength = dstfile.length();
long startingByte = this.origStartingByte + expectedFileLength;
LOG.info("Adjusting request range from " + Arrays.toString(range)
+ " to "
+ Arrays.toString(new long[] { startingByte, lastByte })
+ " for file " + dstfile);
req.setRange(startingByte, lastByte);
totalBytesToDownload = lastByte - startingByte + 1;
} finally {
FileLocks.unlock(dstfile);
}
}
if (totalBytesToDownload < 0) {
throw new IllegalArgumentException(
"Unable to determine the range for download operation. lastByte="
+ lastByte + ", origStartingByte=" + origStartingByte
+ ", expectedFileLength=" + expectedFileLength
+ ", totalBytesToDownload=" + totalBytesToDownload);
}
}
private S3Object retryableDownloadS3ObjectToFile(File file,
RetryableS3DownloadTask retryableS3DownloadTask) {
boolean hasRetried = false;
S3Object s3Object;
for (;;) {
final boolean appendData = resumeExistingDownload || (resumeOnRetry && hasRetried);
if (appendData && hasRetried) {
// Need to adjust the get range or else we risk corrupting the downloaded file
adjustRequest(req);
}
s3Object = retryableS3DownloadTask.getS3ObjectStream();
if (s3Object == null)
return null;
try {
if (testing && resumeExistingDownload && !hasRetried) {
throw new SdkClientException("testing");
}
ServiceUtils.downloadToFile(s3Object, file,
retryableS3DownloadTask.needIntegrityCheck(),
appendData, expectedFileLength);
return s3Object;
} catch (AmazonClientException ace) {
if (!ace.isRetryable())
throw ace;
// Determine whether an immediate retry is needed according to the captured SdkClientException.
// (There are three cases when downloadObjectToFile() throws SdkClientException:
// 1) SocketException or SSLProtocolException when writing to disk (e.g. when user aborts the download)
// 2) Other IOException when writing to disk
// 3) MD5 hashes don't match
// For 1) If SocketException is the result of the client side resetting the connection, this is retried
// Cases 2) and 3) will always be retried
final Throwable cause = ace.getCause();
if ((cause instanceof SocketException && !cause.getMessage().equals("Connection reset"))
|| (cause instanceof SSLProtocolException)) {
throw ace;
} else {
if (hasRetried)
throw ace;
else {
LOG.info("Retry the download of object " + s3Object.getKey() + " (bucket " + s3Object.getBucketName() + ")", ace);
hasRetried = true;
}
}
} finally {
s3Object.getObjectContent().abort();
}
}
}
private boolean isTimeoutEnabled() {
return timeout > 0;
}
private static boolean testing;
/**
* Used for testing purpose only.
*/
static void setTesting(boolean b) {
testing = b;
}
}

View File

@@ -1,202 +0,0 @@
package org.talend.aws;
import java.io.File;
import java.io.IOException;
import com.amazonaws.annotation.SdkInternalApi;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListenerChain;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferProgress;
import com.amazonaws.services.s3.transfer.exception.PauseException;
import com.amazonaws.services.s3.transfer.internal.S3ProgressPublisher;
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils;
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
public class DownloadImpl extends AbstractTransfer implements Download {
private S3Object s3Object;
/**
* Information to resume if the download is paused.
*/
private PersistableDownload persistableDownload;
/**
* The last part that has been successfully written into the downloaded file.
*/
private Integer lastFullyDownloadedPartNumber;
private final GetObjectRequest getObjectRequest;
private final File file;
private final ObjectMetadata objectMetadata;
private final ProgressListenerChain progressListenerChain;
@Deprecated
public DownloadImpl(String description, TransferProgress transferProgress,
ProgressListenerChain progressListenerChain, S3Object s3Object, TransferStateChangeListener listener,
GetObjectRequest getObjectRequest, File file) {
this(description, transferProgress, progressListenerChain, s3Object, listener,
getObjectRequest, file, null, false);
}
public DownloadImpl(String description, TransferProgress transferProgress,
ProgressListenerChain progressListenerChain, S3Object s3Object, TransferStateChangeListener listener,
GetObjectRequest getObjectRequest, File file,
ObjectMetadata objectMetadata, boolean isDownloadParallel) {
super(description, transferProgress, progressListenerChain, listener);
this.s3Object = s3Object;
this.objectMetadata = objectMetadata;
this.getObjectRequest = getObjectRequest;
this.file = file;
this.progressListenerChain = progressListenerChain;
this.persistableDownload = captureDownloadState(getObjectRequest, file);
S3ProgressPublisher.publishTransferPersistable(progressListenerChain, persistableDownload);
}
/**
* Returns the ObjectMetadata for the object being downloaded.
*
* @return The ObjectMetadata for the object being downloaded.
*/
public synchronized ObjectMetadata getObjectMetadata() {
if (s3Object != null) {
return s3Object.getObjectMetadata();
}
return objectMetadata;
}
/**
* The name of the bucket where the object is being downloaded from.
*
* @return The name of the bucket where the object is being downloaded from.
*/
public String getBucketName() {
return getObjectRequest.getBucketName();
}
/**
* The key under which this object was stored in Amazon S3.
*
* @return The key under which this object was stored in Amazon S3.
*/
public String getKey() {
return getObjectRequest.getKey();
}
/**
* Only for internal use.
* For parallel downloads, Updates the persistableTransfer each time a
* part is successfully merged into download file.
* Then notify the listeners that new persistableTransfer is available.
*/
@SdkInternalApi
public void updatePersistableTransfer(Integer lastFullyDownloadedPartNumber) {
synchronized (this) {
this.lastFullyDownloadedPartNumber = lastFullyDownloadedPartNumber;
}
persistableDownload = captureDownloadState(getObjectRequest, file);
S3ProgressPublisher.publishTransferPersistable(progressListenerChain, persistableDownload);
}
/**
* For parallel downloads, returns the last part number that was
* successfully written into the download file.
* Returns null for serial downloads.
*/
public synchronized Integer getLastFullyDownloadedPartNumber() {
return lastFullyDownloadedPartNumber;
}
/**
* Cancels this download.
*
* @throws IOException
*/
public synchronized void abort() throws IOException {
this.monitor.getFuture().cancel(true);
if ( s3Object != null ) {
s3Object.getObjectContent().abort();
}
setState(TransferState.Canceled);
}
/**
* Cancels this download, but skip notifying the state change listeners.
*
* @throws IOException
*/
public synchronized void abortWithoutNotifyingStateChangeListener() throws IOException {
this.monitor.getFuture().cancel(true);
this.state = TransferState.Canceled;
}
/**
* Set the S3 object to download.
*/
public synchronized void setS3Object(S3Object s3Object) {
this.s3Object = s3Object;
}
/**
* This method is also responsible for firing COMPLETED signal to the
* listeners.
*/
@Override
public void setState(TransferState state) {
super.setState(state);
switch (state) {
case Completed :
fireProgressEvent(ProgressEventType.TRANSFER_COMPLETED_EVENT);
break;
case Canceled:
fireProgressEvent(ProgressEventType.TRANSFER_CANCELED_EVENT);
break;
case Failed:
fireProgressEvent(ProgressEventType.TRANSFER_FAILED_EVENT);
break;
default:
break;
}
}
/**
* Returns the captured state of the download; or null if it should not be
* captured (for security reason).
*/
private PersistableDownload captureDownloadState(
final GetObjectRequest getObjectRequest, final File file) {
if (getObjectRequest.getSSECustomerKey() == null) {
return new PersistableDownload(
getObjectRequest.getBucketName(), getObjectRequest.getKey(),
getObjectRequest.getVersionId(), getObjectRequest.getRange(),
getObjectRequest.getResponseHeaders(), getObjectRequest.isRequesterPays(),
file.getAbsolutePath(), getLastFullyDownloadedPartNumber(),
getObjectMetadata().getLastModified().getTime());
}
return null;
}
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.s3.transfer.Download#pause()
*/
@Override
public PersistableDownload pause() throws PauseException {
boolean forceCancel = true;
TransferState currentState = getState();
this.monitor.getFuture().cancel(true);
if (persistableDownload == null) {
throw new PauseException(TransferManagerUtils.determinePauseStatus(
currentState, forceCancel));
}
return persistableDownload;
}
}

View File

@@ -1,30 +0,0 @@
package org.talend.aws;
import com.amazonaws.services.s3.transfer.internal.TransferMonitor;
import java.util.concurrent.Future;
public class DownloadMonitor implements TransferMonitor {
private Future<?> future;
private final DownloadImpl download;
public DownloadMonitor(DownloadImpl download, Future<?> future) {
this.download = download;
this.future = future;
}
@Override
public synchronized Future<?> getFuture() {
return future;
}
public synchronized void setFuture(Future<?> future) {
this.future = future;
}
@Override
public boolean isDone() {
return download.isDone();
}
}

View File

@@ -1,52 +0,0 @@
package org.talend.aws;
import com.amazonaws.util.StringUtils;
import java.io.File;
import java.util.UUID;
import java.util.concurrent.Callable;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Helper class to get a part from s3,
* write the part data to a temporary file and
* return the temporary file.
*/
public class DownloadPartCallable implements Callable<File> {
private static final Log LOG = LogFactory.getLog(DownloadPartCallable.class);
private static final String TEMP_FILE_MIDDLE_NAME = ".part.";
private final AmazonS3 s3;
private final GetObjectRequest getPartRequest;
private final File destinationFile;
private final String destinationFilePath;
public DownloadPartCallable(AmazonS3 s3, GetObjectRequest getPartRequest, File destinationFile) {
this.s3 = s3;
this.getPartRequest = getPartRequest;
this.destinationFile = destinationFile;
this.destinationFilePath = destinationFile.getAbsolutePath();
}
public File call() throws Exception {
final File partFile = File.createTempFile(
UUID.nameUUIDFromBytes(destinationFile.getName().getBytes(StringUtils.UTF8)).toString(),
TEMP_FILE_MIDDLE_NAME + getPartRequest.getPartNumber().toString(),
new File(destinationFilePath.substring(0, destinationFilePath.lastIndexOf(File.separator))));
try {
partFile.deleteOnExit();
} catch (SecurityException exception) {
LOG.warn("SecurityException denied delete access to file " + partFile.getAbsolutePath());
}
if (s3.getObject(getPartRequest, partFile) == null) {
throw new SdkClientException(
"There is no object in S3 satisfying this request. The getObject method returned null");
}
return partFile;
}
}

View File

@@ -1,37 +0,0 @@
package org.talend.aws;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Encryption;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.internal.SkipMd5CheckStrategy;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
final class DownloadTaskImpl implements
ServiceUtils.RetryableS3DownloadTask
{
private final AmazonS3 s3;
private final DownloadImpl download;
private final GetObjectRequest getObjectRequest;
private final SkipMd5CheckStrategy skipMd5CheckStrategy = SkipMd5CheckStrategy.INSTANCE;
DownloadTaskImpl(AmazonS3 s3, DownloadImpl download,
GetObjectRequest getObjectRequest) {
this.s3 = s3;
this.download = download;
this.getObjectRequest = getObjectRequest;
}
@Override
public S3Object getS3ObjectStream() {
S3Object s3Object = s3.getObject(getObjectRequest);
download.setS3Object(s3Object);
return s3Object;
}
@Override
public boolean needIntegrityCheck() {
// Don't perform the integrity check if the checksum won't matchup.
return !(s3 instanceof AmazonS3Encryption) && !skipMd5CheckStrategy.skipClientSideValidationPerRequest(getObjectRequest);
}
}

View File

@@ -1,159 +0,0 @@
package org.talend.aws;
import com.amazonaws.services.s3.model.ResponseHeaderOverrides;
import com.amazonaws.services.s3.transfer.PersistableTransfer;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* An opaque token that holds some private state and can be used to resume a
* paused download operation.
*/
public final class PersistableDownload extends PersistableTransfer {
static final String TYPE = "download";
@JsonProperty
private final String pauseType = TYPE;
/** The bucket name in Amazon S3 from where the object has to be downloaded. */
@JsonProperty
private final String bucketName;
/** The name of the object in Amazon S3 that has to be downloaded. */
@JsonProperty
private final String key;
/** The version id of the object in Amazon S3 to download. */
@JsonProperty
private final String versionId;
/** Optional member indicating the byte range of data to retrieve */
@JsonProperty
private final long[] range;
/**
* Optional field that overrides headers on the response.
*/
@JsonProperty
private final ResponseHeaderOverrides responseHeaders;
/**
* If enabled, the requester is charged for downloading the data from
* Requester Pays Buckets.
*/
@JsonProperty
private final boolean isRequesterPays;
/**
* File where the downloaded data is written.
*/
@JsonProperty
private final String file;
/**
* The last part that has been successfully written into the downloaded file.
*/
@JsonProperty
private final Integer lastFullyDownloadedPartNumber;
/**
* Last Modified/created time on Amazon S3 for this object.
*/
@JsonProperty
private final long lastModifiedTime;
public PersistableDownload() {
this(null, null, null, null, null, false, null, null, 0L);
}
public PersistableDownload(
@JsonProperty(value = "bucketName") String bucketName,
@JsonProperty(value = "key") String key,
@JsonProperty(value = "versionId") String versionId,
@JsonProperty(value = "range") long[] range,
@JsonProperty(value = "responseHeaders") ResponseHeaderOverrides responseHeaders,
@JsonProperty(value = "isRequesterPays") boolean isRequesterPays,
@JsonProperty(value = "file") String file,
@JsonProperty(value = "lastFullyDownloadedPartNumber") Integer lastFullyDownloadedPartNumber,
@JsonProperty(value = "lastModifiedTime") long lastModifiedTime) {
this.bucketName = bucketName;
this.key = key;
this.versionId = versionId;
this.range = range == null ? null : range.clone();
this.responseHeaders = responseHeaders;
this.isRequesterPays = isRequesterPays;
this.file = file;
this.lastFullyDownloadedPartNumber = lastFullyDownloadedPartNumber;
this.lastModifiedTime = lastModifiedTime;
}
/**
* Returns the name of the bucket.
*/
String getBucketName() {
return bucketName;
}
/**
* Returns the name of the object.
*/
String getKey() {
return key;
}
/**
* Returns the version id of the object.
*/
String getVersionId() {
return versionId;
}
/**
* Returns the byte range of the object to download.
*/
long[] getRange() {
return range == null ? null : range.clone();
}
/**
* Returns the optional response headers.
*/
ResponseHeaderOverrides getResponseHeaders() {
return responseHeaders;
}
/**
* Returns true if RequesterPays is enabled on the Amazon S3 bucket else
* false.
*/
boolean isRequesterPays() {
return isRequesterPays;
}
/**
* Returns the file where the object is to be downloaded.
*/
String getFile() {
return file;
}
String getPauseType() {
return pauseType;
}
/**
* Returns the last part number that was successfully written into the downloaded file.
*/
Integer getLastFullyDownloadedPartNumber() {
return lastFullyDownloadedPartNumber;
}
/**
* Returns the last modified/created time of the object represented by
* the bucketName and key.
*/
Long getlastModifiedTime() {
return lastModifiedTime;
}
}

View File

@@ -1,17 +0,0 @@
package org.talend.aws;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventFilter;
import com.amazonaws.event.ProgressEventType;
final class TransferCompletionFilter implements ProgressEventFilter {
@Override
public ProgressEvent filter(ProgressEvent progressEvent) {
// Block COMPLETE events from the low-level GetObject operation,
// but we still want to keep the BytesTransferred
return progressEvent.getEventType() == ProgressEventType.TRANSFER_COMPLETED_EVENT
? null // discard this event
: progressEvent
;
}
}

View File

@@ -1,233 +0,0 @@
package org.talend.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.event.ProgressListenerChain;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.internal.FileLocks;
import com.amazonaws.services.s3.internal.RequestCopyUtils;
import com.amazonaws.services.s3.internal.ServiceUtils;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
import com.amazonaws.services.s3.transfer.TransferProgress;
import com.amazonaws.services.s3.transfer.exception.FileLockException;
import com.amazonaws.services.s3.transfer.internal.S3ProgressListener;
import com.amazonaws.services.s3.transfer.internal.S3ProgressListenerChain;
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils;
import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener;
import com.amazonaws.services.s3.transfer.internal.TransferProgressUpdatingListener;
import com.amazonaws.util.VersionInfoUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TransferManager {
private static final Log log = LogFactory.getLog(TransferManager.class);
private final AmazonS3 s3;
private final ExecutorService executorService;
private final TransferManagerConfiguration configuration;
private final boolean shutDownThreadPools;
public TransferManager(AmazonS3 s3) {
this.s3 = s3;
this.executorService = TransferManagerUtils.createDefaultExecutorService();
this.configuration = resolveConfiguration();
this.shutDownThreadPools = true;
}
private TransferManagerConfiguration resolveConfiguration() {
TransferManagerConfiguration configuration = new TransferManagerConfiguration();
configuration.setDisableParallelDownloads(false);
return configuration;
}
public Download download(GetObjectRequest getObjectRequest, File file, S3ProgressListener progressListener,
long timeoutMillis, boolean resumeOnRetry) {
return doDownload(getObjectRequest, file, null, progressListener, ServiceUtils.OVERWRITE_MODE, timeoutMillis, null, 0L,
resumeOnRetry);
}
private Download doDownload(final GetObjectRequest getObjectRequest,
final File file, final TransferStateChangeListener stateListener,
final S3ProgressListener s3progressListener,
final boolean resumeExistingDownload,
final long timeoutMillis,
final Integer lastFullyDownloadedPart,
final long lastModifiedTimeRecordedDuringPause,
final boolean resumeOnRetry)
{
assertParameterNotNull(getObjectRequest,
"A valid GetObjectRequest must be provided to initiate download");
assertParameterNotNull(file,
"A valid file must be provided to download into");
appendSingleObjectUserAgent(getObjectRequest);
String description = "Downloading from " + getObjectRequest.getBucketName() + "/" + getObjectRequest.getKey();
TransferProgress transferProgress = new TransferProgress();
// S3 progress listener to capture the persistable transfer when available
S3ProgressListenerChain listenerChain = new S3ProgressListenerChain(
// The listener for updating transfer progress
new TransferProgressUpdatingListener(transferProgress),
getObjectRequest.getGeneralProgressListener(),
s3progressListener); // Listeners included in the original request
// The listener chain used by the low-level GetObject request.
// This listener chain ignores any COMPLETE event, so that we could
// delay firing the signal until the high-level download fully finishes.
getObjectRequest
.setGeneralProgressListener(new ProgressListenerChain(new TransferCompletionFilter(), listenerChain));
GetObjectMetadataRequest getObjectMetadataRequest = RequestCopyUtils.createGetObjectMetadataRequestFrom(getObjectRequest);
final ObjectMetadata objectMetadata = s3.getObjectMetadata(getObjectMetadataRequest);
// Used to check if the object is modified between pause and resume
long lastModifiedTime = objectMetadata.getLastModified().getTime();
long startingByte = 0;
long lastByte;
long[] range = getObjectRequest.getRange();
if (range != null && range.length == 2) {
startingByte = range[0];
lastByte = range[1];
} else {
lastByte = objectMetadata.getContentLength() - 1;
}
final long origStartingByte = startingByte;
final boolean isDownloadParallel = !configuration.isDisableParallelDownloads()
&& TransferManagerUtils.isDownloadParallelizable(s3, getObjectRequest, ServiceUtils.getPartCount(getObjectRequest, s3));
// We still pass the unfiltered listener chain into DownloadImpl
final DownloadImpl download = new DownloadImpl(description, transferProgress, listenerChain, null,
stateListener, getObjectRequest, file, objectMetadata, isDownloadParallel);
long totalBytesToDownload = lastByte - startingByte + 1;
transferProgress.setTotalBytesToTransfer(totalBytesToDownload);
// Range information is needed for auto retry of downloads so a retry
// request can start at the last downloaded location in the range.
//
// For obvious reasons, setting a Range header only makes sense if the
// object actually has content because it's inclusive, otherwise S3
// responds with 4xx
//
// In addition, we only set the range if the download was *NOT*
// determined to be parallelizable above. One of the conditions for
// parallel downloads is that getRange() returns null so preserve that.
if (totalBytesToDownload > 0 && !isDownloadParallel) {
getObjectRequest.withRange(startingByte, lastByte);
}
long fileLength = -1;
if (resumeExistingDownload) {
if (isS3ObjectModifiedSincePause(lastModifiedTime, lastModifiedTimeRecordedDuringPause)) {
throw new AmazonClientException("The requested object in bucket " + getObjectRequest.getBucketName()
+ " with key " + getObjectRequest.getKey() + " is modified on Amazon S3 since the last pause.");
}
// There's still a chance the object is modified while the request
// is in flight. Set this header so S3 fails the request if this happens.
getObjectRequest.setUnmodifiedSinceConstraint(new Date(lastModifiedTime));
if (!isDownloadParallel) {
if (!FileLocks.lock(file)) {
throw new FileLockException("Fail to lock " + file + " for resume download");
}
try {
if (file.exists()) {
fileLength = file.length();
startingByte = startingByte + fileLength;
getObjectRequest.setRange(startingByte, lastByte);
transferProgress.updateProgress(Math.min(fileLength, totalBytesToDownload));
totalBytesToDownload = lastByte - startingByte + 1;
if (log.isDebugEnabled()) {
log.debug("Resume download: totalBytesToDownload=" + totalBytesToDownload
+ ", origStartingByte=" + origStartingByte + ", startingByte=" + startingByte
+ ", lastByte=" + lastByte + ", numberOfBytesRead=" + fileLength + ", file: "
+ file);
}
}
} finally {
FileLocks.unlock(file);
}
}
}
if (totalBytesToDownload < 0) {
throw new IllegalArgumentException(
"Unable to determine the range for download operation.");
}
final CountDownLatch latch = new CountDownLatch(1);
Future<?> future = executorService.submit(
new DownloadCallable(s3, latch,
getObjectRequest, resumeExistingDownload,
download, file, origStartingByte, fileLength, timeoutMillis, timedThreadPool,
executorService, lastFullyDownloadedPart, isDownloadParallel, resumeOnRetry));
download.setMonitor(new DownloadMonitor(download, future));
latch.countDown();
return download;
}
public void shutdownNow(boolean shutDownS3Client) {
if (shutDownThreadPools) {
executorService.shutdownNow();
timedThreadPool.shutdownNow();
}
if (shutDownS3Client) {
s3.shutdown();
}
}
private void assertParameterNotNull(Object parameterValue, String errorMessage) {
if (parameterValue == null) throw new IllegalArgumentException(errorMessage);
}
public static <X extends AmazonWebServiceRequest> X appendSingleObjectUserAgent(X request) {
request.getRequestClientOptions().appendUserAgent(USER_AGENT);
return request;
}
private static final String USER_AGENT = TransferManager.class.getName() + "/" + VersionInfoUtils.getVersion();
private boolean isS3ObjectModifiedSincePause(final long lastModifiedTimeRecordedDuringResume,
long lastModifiedTimeRecordedDuringPause) {
return lastModifiedTimeRecordedDuringResume != lastModifiedTimeRecordedDuringPause;
}
private final ScheduledExecutorService timedThreadPool = new ScheduledThreadPoolExecutor(1, daemonThreadFactory);
private static final ThreadFactory daemonThreadFactory = new ThreadFactory() {
final AtomicInteger threadCount = new AtomicInteger( 0 );
public Thread newThread(Runnable r) {
int threadNumber = threadCount.incrementAndGet();
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("S3TransferManagerTimedThread-" + threadNumber);
return thread;
}
};
@Override
protected void finalize() throws Throwable {
shutdownThreadPools();
}
private void shutdownThreadPools() {
if (shutDownThreadPools) {
executorService.shutdown();
timedThreadPool.shutdown();
}
}
}

View File

@@ -7,21 +7,21 @@
<groupId>org.talend.libraries</groupId>
<artifactId>talend-codegen-utils</artifactId>
<!-- release for revert version of library -->
<version>0.31.0</version>
<version>0.28.0</version>
<packaging>jar</packaging>
<properties>
<avro.version>1.8.0</avro.version>
<components.version>0.30.0</components.version>
<daikon.version>0.31.11</daikon.version>
<components.version>0.25.0-SNAPSHOT</components.version>
<daikon.version>0.26.0-SNAPSHOT</daikon.version>
<hamcrest.version>1.3</hamcrest.version>
<junit.version>4.12</junit.version>
<java-formatter.plugin.version>0.1.0</java-formatter.plugin.version>
<formatter.plugin.version>1.6.0-SNAPSHOT</formatter.plugin.version>
<mockito.version>2.2.15</mockito.version>
<jacoco.plugin.version>0.7.8</jacoco.plugin.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
</properties>

View File

@@ -24,10 +24,8 @@ import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.avro.Schema;
@@ -35,11 +33,9 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.SchemaParseException;
import org.talend.codegen.DiSchemaConstants;
import org.talend.daikon.avro.AvroUtils;
import org.talend.daikon.avro.LogicalTypeUtils;
import org.talend.daikon.avro.NameUtil;
import org.talend.daikon.avro.SchemaConstants;
/**
@@ -137,7 +133,6 @@ public class IncomingSchemaEnforcer {
}
}
//TODO remove this method as no place use it now in javajet
/**
* Take all of the parameters from the dynamic metadata and adapt it to a field for the runtime Schema.
*
@@ -149,12 +144,6 @@ public class IncomingSchemaEnforcer {
addDynamicField(name, type, null, format, description, isNullable);
}
private Set<String> existNames;
private Map<String, String> unvalidName2ValidName;
private int index = 0;
/**
* Recreates dynamic field from parameters retrieved from DI dynamic metadata
*
@@ -164,10 +153,9 @@ public class IncomingSchemaEnforcer {
* @param fieldPattern dynamic field date format
* @param description dynamic field description
* @param isNullable defines whether dynamic field may contain <code>null</code> value
* @param isKey defines whether dynamic field is key field
*/
public void addDynamicField(String name, String diType, String logicalType, String fieldPattern, String description,
boolean isNullable, boolean isKey) {
boolean isNullable) {
if (!needsInitDynamicColumns())
return;
Schema fieldSchema = diToAvro(diType, logicalType);
@@ -175,52 +163,14 @@ public class IncomingSchemaEnforcer {
if (isNullable) {
fieldSchema = SchemaBuilder.nullable().type(fieldSchema);
}
Schema.Field field;
try {
field = new Schema.Field(name, fieldSchema, description, (Object) null);
} catch (SchemaParseException e) {
//if the name contains special char which can't pass avro name check like $ and #,
//but uniode like Japanese which can pass too though that is not expected
if (existNames == null) {
existNames = new HashSet<>();
unvalidName2ValidName = new HashMap<>();
}
String validName = NameUtil.correct(name, index++, existNames);
existNames.add(validName);
unvalidName2ValidName.put(name, validName);
field = new Schema.Field(validName, fieldSchema, description, (Object) null);
field.addProp(SchemaConstants.TALEND_COLUMN_DB_COLUMN_NAME, name);
}
Schema.Field field = new Schema.Field(name, fieldSchema, description, (Object) null);
// Set pattern for date type
if ("id_Date".equals(diType) && fieldPattern != null) {
field.addProp(SchemaConstants.TALEND_COLUMN_PATTERN, fieldPattern);
}
if (isKey) {
field.addProp(SchemaConstants.TALEND_COLUMN_IS_KEY, "true");
}
dynamicFields.add(field);
}
/**
* Recreates dynamic field from parameters retrieved from DI dynamic metadata
*
* @param name dynamic field name
* @param diType di column type
* @param logicalType dynamic field logical type; could be null
* @param fieldPattern dynamic field date format
* @param description dynamic field description
* @param isNullable defines whether dynamic field may contain <code>null</code> value
*/
@Deprecated
public void addDynamicField(String name, String diType, String logicalType, String fieldPattern, String description,
boolean isNullable) {
addDynamicField(name, diType, logicalType, fieldPattern, description, isNullable, false);
}
public void addIncomingNodeField(String name, String className) {
String diType = "id_String";
switch (className) {
@@ -300,8 +250,6 @@ public class IncomingSchemaEnforcer {
fieldSchema = AvroUtils._decimal();
} else if ("id_Date".equals(diType)) {
fieldSchema = AvroUtils._date();
} else if ("id_byte[]".equals(diType)) {
fieldSchema = AvroUtils._bytes();
} else {
throw new UnsupportedOperationException("Unrecognized type " + diType);
}
@@ -421,9 +369,6 @@ public class IncomingSchemaEnforcer {
return designSchema;
}
//here we do special process for dynamic input name, but in fact,
//we have issue which support Japanese char or special char as label for basic talend column too,
//so not only dynamic columns may have special name, but also basic may have, but here, we don't consider that, that's TODO
/**
* Converts DI data value to Avro format and put it into record by field name
*
@@ -431,16 +376,9 @@ public class IncomingSchemaEnforcer {
* @param diValue data value
*/
public void put(String name, Object diValue) {
if (unvalidName2ValidName != null) {
String validName = unvalidName2ValidName.get(name);
if (validName != null) {
name = validName;
}
}
put(columnToFieldIndex.get(name), diValue);
}
//TODO make it private, no place to call it except current class?
/**
* Converts DI data value to Avro format and put it into record by field index
*

View File

@@ -52,8 +52,6 @@ public class IncomingSchemaEnforcerTest {
*/
private IndexedRecord componentRecord;
private IndexedRecord componentRecordWithSpecialName;
@Rule
public ExpectedException thrown = ExpectedException.none();
@@ -74,29 +72,9 @@ public class IncomingSchemaEnforcerTest {
componentRecord.put(3, true);
componentRecord.put(4, "Main Street");
componentRecord.put(5, "This is a record with six columns.");
Schema componentSchemaWithSpecialName = SchemaBuilder.builder().record("Record").fields() //
.name("id").type().intType().noDefault() //
.name("name").type().stringType().noDefault() //
.name("age").type().intType().noDefault() //
.name("性别").type().booleanType().noDefault() //why this don't store the origin name, as it can pass the avro name check, it's a avro bug
.name("address_").prop(SchemaConstants.TALEND_COLUMN_DB_COLUMN_NAME, "address#").type().stringType().noDefault() //
.name("comment_").prop(SchemaConstants.TALEND_COLUMN_DB_COLUMN_NAME, "comment$").type().stringType().noDefault() //
.endRecord();
componentRecordWithSpecialName = new GenericData.Record(componentSchemaWithSpecialName);
componentRecordWithSpecialName.put(0, 1);
componentRecordWithSpecialName.put(1, "User");
componentRecordWithSpecialName.put(2, 100);
componentRecordWithSpecialName.put(3, true);
componentRecordWithSpecialName.put(4, "Main Street");
componentRecordWithSpecialName.put(5, "This is a record with six columns.");
}
private void checkEnforcerWithComponentRecordData(IncomingSchemaEnforcer enforcer) {
checkEnforcerWithComponentRecordData(enforcer, false);
}
private void checkEnforcerWithComponentRecordData(IncomingSchemaEnforcer enforcer, boolean specialName) {
// The enforcer must be ready to receive values.
assertThat(enforcer.needsInitDynamicColumns(), is(false));
@@ -110,25 +88,15 @@ public class IncomingSchemaEnforcerTest {
IndexedRecord adapted = enforcer.createIndexedRecord();
// Ensure that the result is the same as the expected component record.
if (specialName) {
assertThat(adapted, is(componentRecordWithSpecialName));
} else {
assertThat(adapted, is(componentRecord));
}
assertThat(adapted, is(componentRecord));
// Ensure that we create a new instance when we give it another value.
enforcer.put("id", 2);
enforcer.put("name", "User2");
enforcer.put("age", 200);
if (specialName) {
enforcer.put("性别", false);
enforcer.put("address#", "2 Main Street");
enforcer.put("comment$", "2 This is a record with six columns.");
} else {
enforcer.put("valid", false);
enforcer.put("address", "2 Main Street");
enforcer.put("comment", "2 This is a record with six columns.");
}
enforcer.put("valid", false);
enforcer.put("address", "2 Main Street");
enforcer.put("comment", "2 This is a record with six columns.");
IndexedRecord adapted2 = enforcer.createIndexedRecord();
// It should have the same schema, but not be the same instance.
@@ -424,39 +392,6 @@ public class IncomingSchemaEnforcerTest {
checkEnforcerWithComponentRecordData(enforcer);
}
@Test
public void testDynamicColumnWithSpecialName() {
Schema designSchema = SchemaBuilder.builder().record("Record") //
.prop(DiSchemaConstants.TALEND6_DYNAMIC_COLUMN_POSITION, "3") //
.prop(SchemaConstants.INCLUDE_ALL_FIELDS, "true") //
.fields() //
.name("id").type().intType().noDefault() //
.name("name").type().stringType().noDefault() //
.name("age").type().intType().noDefault() //
.endRecord();
IncomingSchemaEnforcer enforcer = new IncomingSchemaEnforcer(designSchema);
// The enforcer isn't usable yet.
assertThat(enforcer.getDesignSchema(), is(designSchema));
assertFalse(enforcer.areDynamicFieldsInitialized());
assertThat(enforcer.getRuntimeSchema(), nullValue());
enforcer.addDynamicField("性别", "id_Boolean", null, null, null, false, false);
enforcer.addDynamicField("address#", "id_String", null, null, null, false, false);
enforcer.addDynamicField("comment$", "id_String", null, null, null, false, false);
assertFalse(enforcer.areDynamicFieldsInitialized());
enforcer.createRuntimeSchema();
assertTrue(enforcer.areDynamicFieldsInitialized());
// Check the run-time schema was created.
assertThat(enforcer.getDesignSchema(), is(designSchema));
assertThat(enforcer.getRuntimeSchema(), not(nullValue()));
// Put values into the enforcer and get them as an IndexedRecord.
checkEnforcerWithComponentRecordData(enforcer, true);
}
@Test
public void testTypeConversion_toDate() {
// The expected schema after enforcement.
@@ -764,28 +699,6 @@ public class IncomingSchemaEnforcerTest {
assertThat(record.get(1), is((Object) new Date(1234567891011L)));
}
/**
* Checks key field setting
*/
@Test
public void testAddDynamicFieldKey() {
Schema expectedRuntimeSchema = SchemaBuilder.builder().record("Record").fields().name("id")
.prop(SchemaConstants.TALEND_COLUMN_IS_KEY, "true").type().intType().noDefault().endRecord();
Schema designSchema = SchemaBuilder.builder().record("Record").prop(SchemaConstants.INCLUDE_ALL_FIELDS, "true")
.prop(DiSchemaConstants.TALEND6_DYNAMIC_COLUMN_POSITION, "0").fields().endRecord();
IncomingSchemaEnforcer enforcer = new IncomingSchemaEnforcer(designSchema);
enforcer.addDynamicField("id", "id_Integer", null, null, null, false, true);
enforcer.createRuntimeSchema();
assertTrue(enforcer.areDynamicFieldsInitialized());
Schema actualRuntimeSchema = enforcer.getRuntimeSchema();
assertEquals(expectedRuntimeSchema, actualRuntimeSchema);
}
/**
* Checks {@link IncomingSchemaEnforcer#put()} converts string value to date according pattern specified in dynamic field
* TODO (iv.gonchar): this is incorrect behavior, because avro record should not contain java.util.Date value. It should store

View File

@@ -4,7 +4,7 @@
<groupId>org.talend</groupId>
<artifactId>talend-httputil</artifactId>
<name>talend-httputil</name>
<version>1.0.6</version>
<version>1.0.5</version>
<properties>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
@@ -20,7 +20,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.4</version>
<version>2.10.1</version>
</dependency>
<dependency>

View File

@@ -6,11 +6,11 @@
<groupId>org.talend.components.lib</groupId>
<artifactId>job-audit</artifactId>
<version>1.4</version>
<version>1.2</version>
<properties>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<daikon.audit.version>1.16.1</daikon.audit.version>
<daikon.audit.version>1.16.0</daikon.audit.version>
</properties>
<repositories>

View File

@@ -24,11 +24,5 @@ public interface JobAuditLogger extends EventAuditLogger {
@AuditEvent(category = "flowexecution", message = "connection : {connection_name}, row : {rows}, cost : {duration}", level = LogLevel.INFO)
void flowExecution(Context context);
@AuditEvent(category = "componentparameters", message = "Component {connector_id} parameters", level = LogLevel.INFO)
void componentParameters(Context context);
@AuditEvent(category = "schema", message = "{connection_name} : {schema} from {source_id} to {target_id}", level = LogLevel.INFO)
void schema(Context context);
}

View File

@@ -15,11 +15,6 @@ public class JobContextBuilder {
return new JobContextBuilder(ContextBuilder.create());
}
public JobContextBuilder custom(String key, String value) {
builder.with(key, value);
return this;
}
public JobContextBuilder jobName(String job_name) {
builder.with("job_name", job_name);
return this;
@@ -166,14 +161,4 @@ public class JobContextBuilder {
return this;
}
public JobContextBuilder connectorParameters(String connector_parameters) {
builder.with("connector_parameters", connector_parameters);
return this;
}
public JobContextBuilder schema(String schema) {
builder.with("schema", schema);
return this;
}
}

View File

@@ -10,6 +10,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<cxf.version>3.1.1</cxf.version>
<odata.version>4.3.0</odata.version>
<slf4j.version>1.7.12</slf4j.version>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>

View File

@@ -1,191 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<artifactId>talend-orc</artifactId>
<version>1.0-20211008</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.2.2</hadoop.version>
<apache.orc.version>1.7.0</apache.orc.version>
<junit.jupiter.version>5.7.2</junit.jupiter.version>
<hamcrest.version>1.3</hamcrest.version>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
</properties>
<distributionManagement>
<snapshotRepository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceSnapshot/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</snapshotRepository>
<repository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceRelease/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${apache.orc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-servlet</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-webapp</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>jersey-servlet</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-webapp</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
<version>1.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,24 +0,0 @@
package org.talend.orc;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
abstract class ORCCommonUtils {
/**
* Check that the map type for the key is BYTES, LONG or DOUBLE and that the key
* type is LONG, DOUBLE, BYTES, DECIMAL or TIMESTAMP.
*
* @param mapVector a MapColumnVector
* @return true if the key and value types conform to the limits described
* above.
*/
public static boolean checkMapColumnVectorTypes(MapColumnVector mapVector) {
ColumnVector.Type keyType = mapVector.keys.type;
ColumnVector.Type valueType = mapVector.values.type;
return (keyType == ColumnVector.Type.BYTES || keyType == ColumnVector.Type.LONG
|| keyType == ColumnVector.Type.DOUBLE)
&& (valueType == ColumnVector.Type.LONG || valueType == ColumnVector.Type.DOUBLE
|| valueType == ColumnVector.Type.BYTES || valueType == ColumnVector.Type.DECIMAL
|| valueType == ColumnVector.Type.TIMESTAMP);
}
}

View File

@@ -1,442 +0,0 @@
package org.talend.orc;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
public class ORCReadUtils {
public static Object readColumnByName(VectorizedRowBatch batch, String columnName, TypeDescription schema,
int rowNum) {
List<String> allColumnNames = schema.getFieldNames();
int colIndex = allColumnNames.indexOf(columnName);
if (colIndex < 0 || colIndex > batch.cols.length - 1) {
return null;
} else {
org.apache.hadoop.hive.ql.exec.vector.ColumnVector colVector = batch.cols[colIndex];
TypeDescription fieldType = schema.getChildren().get(colIndex);
int colRow = colVector.isRepeating ? 0 : rowNum;
Object value = readColumn(colVector, fieldType, colRow);
return value;
}
}
public static Object readColumn(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object columnObj = null;
if (!colVec.isNull[rowNum]) {
switch (colVec.type) {
case BYTES:
columnObj = readBytesVal(colVec, colType, rowNum);
break;
case DECIMAL:
columnObj = readDecimalVal(colVec, rowNum);
break;
case DOUBLE:
columnObj = readDoubleVal(colVec, colType, rowNum);
break;
case LIST:
columnObj = readListVal(colVec, colType, rowNum);
break;
case LONG:
columnObj = readLongVal(colVec, colType, rowNum);
break;
case MAP:
columnObj = readMapVal(colVec, colType, rowNum);
break;
case STRUCT:
columnObj = readStructVal(colVec, colType, rowNum);
break;
case TIMESTAMP:
columnObj = readTimestampVal(colVec, colType, rowNum);
break;
case UNION:
columnObj = readUnionVal(colVec, colType, rowNum);
break;
default:
throw new RuntimeException("readColumn: unsupported ORC file column type: " + colVec.type.name());
}
}
return columnObj;
}
private static Object readListVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object listValues = null;
if (!colVec.isNull[rowNum]) {
ListColumnVector listVector = (ListColumnVector) colVec;
ColumnVector listChildVector = listVector.child;
TypeDescription childType = colType.getChildren().get(0);
switch (listChildVector.type) {
case BYTES:
listValues = readBytesListValues(listVector, childType, rowNum);
break;
case DECIMAL:
listValues = readDecimalListValues(listVector, rowNum);
break;
case DOUBLE:
listValues = readDoubleListValues(listVector, rowNum);
break;
case LONG:
listValues = readLongListValues(listVector, childType, rowNum);
break;
case TIMESTAMP:
listValues = readTimestampListValues(listVector, childType, rowNum);
break;
default:
throw new RuntimeException(listVector.type.name() + " is not supported for ListColumnVectors");
}
}
return listValues;
}
private static List<Object> readLongListVector(LongColumnVector longVector, TypeDescription childType, int offset,
int numValues) {
List<Object> longList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!longVector.isNull[offset + i]) {
long longVal = longVector.vector[offset + i];
if (childType.getCategory() == TypeDescription.Category.BOOLEAN) {
Boolean boolVal = longVal == 0 ? Boolean.valueOf(false) : Boolean.valueOf(true);
longList.add(boolVal);
} else if (childType.getCategory() == TypeDescription.Category.INT) {
Integer intObj = (int) longVal;
longList.add(intObj);
} else {
longList.add(longVal);
}
} else {
longList.add(null);
}
}
return longList;
}
private static Object readLongListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
LongColumnVector longVector = (LongColumnVector) listVector.child;
return readLongListVector(longVector, childType, offset, numValues);
}
private static Object readTimestampListVector(TimestampColumnVector timestampVector, TypeDescription childType,
int offset, int numValues) {
List<Object> timestampList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!timestampVector.isNull[offset + i]) {
int nanos = timestampVector.nanos[offset + i];
long millisec = timestampVector.time[offset + i];
Timestamp timestamp = new Timestamp(millisec);
timestamp.setNanos(nanos);
if (childType.getCategory() == TypeDescription.Category.DATE) {
Date date = new Date(timestamp.getTime());
timestampList.add(date);
} else {
timestampList.add(timestamp);
}
} else {
timestampList.add(null);
}
}
return timestampList;
}
/**
* Read either Timestamp or Date values, depending on the definition in the
* schema.
*/
private static Object readTimestampListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
TimestampColumnVector timestampVec = (TimestampColumnVector) listVector.child;
return readTimestampListVector(timestampVec, childType, offset, numValues);
}
private static Object readDecimalListVector(DecimalColumnVector decimalVector, int offset, int numValues,
int batchRowNum) {
List<Object> decimalList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!decimalVector.isNull[offset + i]) {
BigDecimal bigDecimal = decimalVector.vector[batchRowNum].getHiveDecimal().bigDecimalValue();
decimalList.add(bigDecimal);
} else {
decimalList.add(null);
}
}
return decimalList;
}
private static Object readDecimalListValues(ListColumnVector listVector, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
DecimalColumnVector decimalVec = (DecimalColumnVector) listVector.child;
return readDecimalListVector(decimalVec, offset, numValues, rowNum);
}
private static Object readBytesListVector(BytesColumnVector bytesVec, TypeDescription childType, int offset,
int numValues) {
List<Object> bytesValList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!bytesVec.isNull[offset + i]) {
byte[] byteArray = bytesVec.vector[offset + i];
int vecLen = bytesVec.length[offset + i];
int vecStart = bytesVec.start[offset + i];
byte[] vecCopy = Arrays.copyOfRange(byteArray, vecStart, vecStart + vecLen);
if (childType.getCategory() == TypeDescription.Category.STRING) {
String str = new String(vecCopy);
bytesValList.add(str);
} else {
bytesValList.add(vecCopy);
}
} else {
bytesValList.add(null);
}
}
return bytesValList;
}
private static Object readBytesListValues(ListColumnVector listVector, TypeDescription childType, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
BytesColumnVector bytesVec = (BytesColumnVector) listVector.child;
return readBytesListVector(bytesVec, childType, offset, numValues);
}
private static Object readDoubleListVector(DoubleColumnVector doubleVec, int offset, int numValues) {
List<Object> doubleList = new ArrayList<>();
for (int i = 0; i < numValues; i++) {
if (!doubleVec.isNull[offset + i]) {
Double doubleVal = doubleVec.vector[offset + i];
doubleList.add(doubleVal);
} else {
doubleList.add(null);
}
}
return doubleList;
}
private static Object readDoubleListValues(ListColumnVector listVector, int rowNum) {
int offset = (int) listVector.offsets[rowNum];
int numValues = (int) listVector.lengths[rowNum];
DoubleColumnVector doubleVec = (DoubleColumnVector) listVector.child;
return readDoubleListVector(doubleVec, offset, numValues);
}
@SuppressWarnings("unchecked")
private static List<Object> readMapVector(ColumnVector mapVector, TypeDescription childType, int offset,
int numValues, int rowNum) {
List<Object> mapList = null;
switch (mapVector.type) {
case BYTES:
mapList = (List<Object>) readBytesListVector((BytesColumnVector) mapVector, childType, offset, numValues);
break;
case DECIMAL:
mapList = (List<Object>) readDecimalListVector((DecimalColumnVector) mapVector, offset, numValues, rowNum);
;
break;
case DOUBLE:
mapList = (List<Object>) readDoubleListVector((DoubleColumnVector) mapVector, offset, numValues);
break;
case LONG:
mapList = readLongListVector((LongColumnVector) mapVector, childType, offset, numValues);
break;
case TIMESTAMP:
mapList = (List<Object>) readTimestampListVector((TimestampColumnVector) mapVector, childType, offset,
numValues);
break;
default:
throw new RuntimeException(mapVector.type.name() + " is not supported for MapColumnVectors");
}
return mapList;
}
/**
* <p>
* Read a Map column value (e.g., a set of keys and their associated values).
* </p>
* <p>
* The Map key and value types are the first and second children in the children
* TypeDescription List. From the TypeDescription source:
* </p>
*
* <pre>
* result.children.add(keyType);
* result.children.add(valueType);
* </pre>
*/
private static Object readMapVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Map<Object, Object> objMap = new HashMap<>();
MapColumnVector mapVector = (MapColumnVector) colVec;
if (ORCCommonUtils.checkMapColumnVectorTypes(mapVector)) {
int mapSize = (int) mapVector.lengths[rowNum];
int offset = (int) mapVector.offsets[rowNum];
List<TypeDescription> mapTypes = colType.getChildren();
TypeDescription keyType = mapTypes.get(0);
TypeDescription valueType = mapTypes.get(1);
ColumnVector keyChild = mapVector.keys;
ColumnVector valueChild = mapVector.values;
List<Object> keyList = readMapVector(keyChild, keyType, offset, mapSize, rowNum);
List<Object> valueList = readMapVector(valueChild, valueType, offset, mapSize, rowNum);
assert (keyList.size() == valueList.size());
for (int i = 0; i < keyList.size(); i++) {
objMap.put(keyList.get(i), valueList.get(i));
}
} else {
throw new RuntimeException("readMapVal: unsupported key or value types");
}
return objMap;
}
private static Object readUnionVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Pair<TypeDescription, Object> columnValuePair;
UnionColumnVector unionVector = (UnionColumnVector) colVec;
int tagVal = unionVector.tags[rowNum];
List<TypeDescription> unionFieldTypes = colType.getChildren();
if (tagVal < unionFieldTypes.size()) {
TypeDescription fieldType = unionFieldTypes.get(tagVal);
if (tagVal < unionVector.fields.length) {
ColumnVector fieldVector = unionVector.fields[tagVal];
int colRow = fieldVector.isRepeating ? 0 : rowNum;
Object unionValue = readColumn(fieldVector, fieldType, colRow);
columnValuePair = new ImmutablePair<>(fieldType, unionValue);
} else {
throw new RuntimeException("readUnionVal: union tag value out of range for union column vectors");
}
} else {
throw new RuntimeException("readUnionVal: union tag value out of range for union types");
}
return columnValuePair;
}
private static Object readStructVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object structObj = null;
if (!colVec.isNull[rowNum]) {
List<Object> fieldValList = new ArrayList<>();
StructColumnVector structVector = (StructColumnVector) colVec;
ColumnVector[] fieldVec = structVector.fields;
List<TypeDescription> fieldTypes = colType.getChildren();
assert (fieldVec.length == fieldTypes.size());
for (int i = 0; i < fieldVec.length; i++) {
int colRow = fieldVec[i].isRepeating ? 0 : rowNum;
Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), colRow);
fieldValList.add(fieldObj);
}
structObj = fieldValList;
}
return structObj;
}
private static Object readTimestampVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object timestampVal = null;
if (!colVec.isNull[rowNum]) {
TimestampColumnVector timestampVec = (TimestampColumnVector) colVec;
int nanos = timestampVec.nanos[rowNum];
long millisec = timestampVec.time[rowNum];
Timestamp timestamp = new Timestamp(millisec);
timestamp.setNanos(nanos);
timestampVal = timestamp;
if (colType.getCategory() == TypeDescription.Category.DATE) {
timestampVal = new Date(timestamp.getTime());
}
}
return timestampVal;
}
private static Object readDecimalVal(ColumnVector colVec, int rowNum) {
Object decimalObj = null;
if (!colVec.isNull[rowNum]) {
DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
decimalObj = decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue().setScale(decimalVec.scale);
}
return decimalObj;
}
/**
* Read a Long or Boolean value
*
* @param colVec the column vector
* @param colType the type of the column
* @param rowNum the ORC file row number.
* @return a Boolean or Long object
*/
private static Object readLongVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object colObj = null;
if (!colVec.isNull[rowNum]) {
LongColumnVector longVec = (LongColumnVector) colVec;
Long longVal = longVec.vector[rowNum];
colObj = longVal;
if (colType.getCategory() == TypeDescription.Category.INT) {
colObj = longVal.intValue();
} else if (colType.getCategory() == TypeDescription.Category.BOOLEAN) {
colObj = longVal == 1 ? Boolean.TRUE : Boolean.FALSE;
} else if (colType.getCategory() == TypeDescription.Category.DATE) {
colObj = new Date(longVal * 86400000);
} else if (colType.getCategory() == TypeDescription.Category.BYTE) {
colObj = longVal.byteValue();
} else if (colType.getCategory() == TypeDescription.Category.SHORT) {
colObj = longVal.shortValue();
}
}
return colObj;
}
/**
* Read a Double or Float value
*
* @param colVec the column vector
* @param colType the type of the column
* @param rowNum the ORC file row number.
* @return a Double or Float object
*/
private static Object readDoubleVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object colObj = null;
if (!colVec.isNull[rowNum]) {
DoubleColumnVector longVec = (DoubleColumnVector) colVec;
Double doubleVal = longVec.vector[rowNum];
colObj = doubleVal;
if (colType.getCategory() == TypeDescription.Category.FLOAT) {
colObj = doubleVal.floatValue();
}
}
return colObj;
}
private static Object readBytesVal(ColumnVector colVec, TypeDescription colType, int rowNum) {
Object bytesObj = null;
if (!colVec.isNull[rowNum]) {
BytesColumnVector bytesVector = (BytesColumnVector) colVec;
byte[] columnBytes = bytesVector.vector[rowNum];
int vecLen = bytesVector.length[rowNum];
int vecStart = bytesVector.start[rowNum];
byte[] vecCopy = Arrays.copyOfRange(columnBytes, vecStart, vecStart + vecLen);
if (colType.getCategory() == TypeDescription.Category.STRING ||colType.getCategory() == TypeDescription.Category.VARCHAR) {
bytesObj = new String(vecCopy);
} else if (colType.getCategory() == TypeDescription.Category.CHAR) {
String charStr = new String(vecCopy);
bytesObj = charStr;
} else {
bytesObj = vecCopy;
}
}
return bytesObj;
}
}

View File

@@ -1,884 +0,0 @@
package org.talend.orc;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.orc.TypeDescription;
public class ORCWriteUtils {
private static final Predicate<Object> isInteger = Integer.class::isInstance;
private static final Predicate<Object> isLong = Long.class::isInstance;
private static final Predicate<Object> isDouble = Double.class::isInstance;
private static final Predicate<Object> isString = String.class::isInstance;
private static final Predicate<Object> isBigDecimal = BigDecimal.class::isInstance;
private static final Predicate<Object> isDate = Date.class::isInstance;
public static void setColumnByName(VectorizedRowBatch batch, String columnName, TypeDescription schema,
Object colVal, int rowNum) {
List<String> allColumnNames = schema.getFieldNames();
int colIndex = allColumnNames.indexOf(columnName);
if (colIndex < 0 || colIndex > batch.cols.length - 1) {
return;
} else {
org.apache.hadoop.hive.ql.exec.vector.ColumnVector colVector = batch.cols[colIndex];
TypeDescription fieldType = schema.getChildren().get(colIndex);
setColumn(colVal, fieldType, columnName, colVector, rowNum);
}
}
/**
* Add a column value that is a String or a byte[] array.
*
* @param colVal the column value object
* @param fieldName the name of the field (for error reporting)
* @param bytesColVector the BytesColumnVector that the byte array will be added
* to.
* @param rowNum the ORC file row number
*/
private static void setByteColumnVector(Object colVal, String fieldName, BytesColumnVector bytesColVector,
int rowNum) {
if (colVal instanceof byte[] || colVal instanceof String || colVal instanceof Character) {
byte[] byteVec;
if (colVal instanceof String) {
String strVal = (String) colVal;
byteVec = strVal.getBytes(StandardCharsets.UTF_8);
} else if (colVal instanceof Character) {
String strVal = String.valueOf((char) colVal);
byteVec = strVal.getBytes(StandardCharsets.UTF_8);
} else {
byteVec = (byte[]) colVal;
}
bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length);
} else {
throw new RuntimeException(orcExceptionMsg("byte[] or String type expected for field ", fieldName, rowNum));
}
}
private static void setDecimalVector(Object colVal, String fieldName, DecimalColumnVector decimalColVector,
int rowNum) {
if (colVal instanceof BigDecimal) {
BigDecimal bigDecimal = (BigDecimal) colVal;
decimalColVector.precision = (short) bigDecimal.precision();
decimalColVector.scale = (short) bigDecimal.scale();
HiveDecimal hiveDecimal = HiveDecimal.create(bigDecimal);
HiveDecimalWritable writeableDecimal = new HiveDecimalWritable(hiveDecimal);
decimalColVector.vector[rowNum] = writeableDecimal;
} else {
throw new RuntimeException(orcExceptionMsg("BigDecimal type expected for field ", fieldName, rowNum));
}
}
private static void setDoubleVector(Object colVal, String fieldName, DoubleColumnVector doubleVector, int rowNum) {
if (colVal instanceof Double) {
doubleVector.vector[rowNum] = (Double) colVal;
} else if (colVal instanceof Float) {
Float fltVal = (Float) colVal;
doubleVector.vector[rowNum] = fltVal.doubleValue();
} else if (colVal instanceof BigDecimal) {
doubleVector.vector[rowNum] = ((BigDecimal) colVal).doubleValue();
} else {
throw new RuntimeException(orcExceptionMsg("Double/Float/BigDecimal type expected for field ", fieldName, rowNum));
}
}
/**
* Initialize a LongColumnVector value.
*
* @param colVal an object of type Boolean, Integer, Long or BigInteger.
* @param fieldName the field name in the schema
* @param longVector the LongColumnVector
* @param rowNum the row number
*/
private static void setLongColumnVector(Object colVal, String fieldName, LongColumnVector longVector, int rowNum) {
if (colVal instanceof Boolean) {
Boolean bool = (Boolean) colVal;
longVector.vector[rowNum] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0);
} else if (colVal instanceof Byte) {
longVector.vector[rowNum] = (Byte) colVal;
} else if (colVal instanceof Short) {
longVector.vector[rowNum] = (Short) colVal;
} else if (colVal instanceof Integer) {
longVector.vector[rowNum] = (Integer) colVal;
} else if (colVal instanceof Long) {
longVector.vector[rowNum] = (Long) colVal;
} else if (colVal instanceof BigInteger) {
BigInteger bigInt = (BigInteger) colVal;
longVector.vector[rowNum] = bigInt.longValue();
} else {
throw new RuntimeException(orcExceptionMsg("Long or Integer type expected for field ", fieldName, rowNum));
}
}
private static void setDateColumnVector(Object colVal, String fieldName, DateColumnVector dateVector, int rowNum) {
if (colVal instanceof Date) {
Date dateVal = (Date) colVal;
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(dateVal);
long epochDay = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH)).toEpochDay();
dateVector.vector[rowNum] = epochDay;
} else {
throw new RuntimeException(orcExceptionMsg("Date type expected for field ", fieldName, rowNum));
}
}
private static void setTimestampVector(Object colVal, String fieldName, TimestampColumnVector timestampVector,
int rowNum) {
if (colVal instanceof Timestamp) {
timestampVector.set(rowNum, (Timestamp) colVal);
} else if (colVal instanceof Date) {
Date date = (Date) colVal;
Timestamp ts = new Timestamp(date.getTime());
timestampVector.set(rowNum, ts);
} else {
throw new RuntimeException(
orcExceptionMsg("Date or Timestamp type expected for field ", fieldName, rowNum));
}
}
/**
* <p>
* A union column can contain column vectors of more than one type. In the
* TypeDescription createUnion() is called to create a TypeDescription for a
* union column. The union values are added by calling the addUnionChild()
* method on this TypeDescription object.
* </p>
* <p>
* The class fields in the UnionColumnVector are shown below:
* </p>
*
* <pre>
* public class UnionColumnVector extends ColumnVector {
* public int[] tags;
* public ColumnVector[] fields;
* </pre>
* <p>
* A tag value (
*
* <pre>
* tags[rowNum]
* </pre>
*
* ) is associated with each field value (
*
* <pre>
* fields[rowNum])
* </pre>
*
* . The tag value serves as an index for the field type. For example, if there
* are three field types defined:
* <ol>
* <li>Long</li>
* <li>Double</li>
* <li>String</li>
* </ol>
* The tag will have a value in the range of [0..2]
* </p>
* <p>
* The tag value is needed to initialize the ColumnVector since without the tag
* there is no way to know which union child should be initialized.
* </p>
*
* @param colVal a Pair&lt;ColumnVector.Type, Object&gt; object with the
* union type and the object that will be used to initialize
* the union child ColumnVector.
* @param fieldName The name of the union field
* @param unionVector The UnionColumnVector to be initialized
* @param rowNum the ORC file row number.
*/
private static void setUnionColumnVector(Object colVal, TypeDescription unionTypeDesc, String fieldName,
UnionColumnVector unionVector, int rowNum) {
@SuppressWarnings("unchecked")
Pair<TypeDescription, Object> unionValuePair = (Pair<TypeDescription, Object>) colVal;
TypeDescription unionValType = unionValuePair.getLeft();
List<TypeDescription> unionChildrenTypes = unionTypeDesc.getChildren();
Object unionColVal = unionValuePair.getRight();
boolean found = false;
for (int i = 0; i < unionChildrenTypes.size(); i++) {
if (unionChildrenTypes.get(i).getCategory() == unionValType.getCategory()) {
unionVector.tags[rowNum] = i;
ColumnVector unionFieldVec = unionVector.fields[i];
setColumn(unionColVal, unionChildrenTypes.get(i), fieldName, unionFieldVec, rowNum);
found = true;
break;
}
}
if (!found) {
throw new RuntimeException("writeUnionColumnVector: Bad type enumeration "
+ unionValType.getCategory().getName() + " passed for field " + fieldName);
}
}
private static void setLongListVector(List<Object> longValList, LongColumnVector longVector, int offset,
String fieldName) {
for (int i = 0; i < longValList.size(); i++) {
Object objVal = longValList.get(i);
if (objVal != null) {
if (objVal instanceof Integer) {
longVector.vector[offset + i] = (Integer) objVal;
} else if (objVal instanceof Long) {
longVector.vector[offset + i] = (Long) objVal;
} else {
throw new RuntimeException("List<Integer> expected for field " + fieldName);
}
} else {
longVector.isNull[offset + i] = true;
longVector.noNulls = false;
}
}
}
private static void setLongList(List<Object> colValList, ListColumnVector listVector, String fieldName,
int rowNum) {
LongColumnVector longVector = (LongColumnVector) listVector.child;
int offset = (int) listVector.offsets[rowNum];
setLongListVector(colValList, longVector, offset, fieldName);
}
private static void setDoubleListVector(List<Object> doubleValList, DoubleColumnVector doubleVector, int offset,
String fieldName) {
for (int i = 0; i < doubleValList.size(); i++) {
Object objVal = doubleValList.get(i);
if (objVal != null) {
if (objVal instanceof Double) {
doubleVector.vector[offset + i] = (Double) objVal;
} else if (objVal instanceof Float) {
Float fltVal = (Float) objVal;
doubleVector.vector[offset + i] = fltVal.doubleValue();
} else {
throw new RuntimeException("List<Double> expected for field " + fieldName);
}
} else {
doubleVector.isNull[offset + i] = true;
doubleVector.noNulls = false;
}
}
}
private static void setDoubleList(List<Object> doubleValList, ListColumnVector listVector, String fieldName,
int rowNum) {
DoubleColumnVector vecChild = (DoubleColumnVector) listVector.child;
int offset = (int) listVector.offsets[rowNum];
setDoubleListVector(doubleValList, vecChild, offset, fieldName);
}
private static void setTimestampListVector(List<Object> valueList, TimestampColumnVector timestampVector,
int offset, String fieldName) {
for (int i = 0; i < valueList.size(); i++) {
Object objVal = valueList.get(i);
if (objVal != null) {
if (objVal instanceof Date) {
Timestamp ts = (objVal instanceof Timestamp) ? (Timestamp) objVal
: new Timestamp(((Date) objVal).getTime());
timestampVector.time[offset + i] = ts.getTime();
timestampVector.nanos[offset + i] = ts.getNanos();
} else {
throw new RuntimeException("List<Date> or List<Timestamp> expected for field " + fieldName);
}
} else {
timestampVector.isNull[offset + i] = true;
timestampVector.noNulls = false;
}
}
}
/**
* Initialize the vector values for a ListColumnVector of Date or Timestamp
* values.
*
* @param colValList a list of Timestamp or java.util.Date objects
* @param listVector A ListColumnVector with a child that will contain the
* vector values.
* @param fieldName The field name in the schema for this ORC element
* @param rowNum The ORC file row number
*/
private static void setTimestampList(List<Object> colValList, ListColumnVector listVector, String fieldName,
int rowNum) {
TimestampColumnVector timestampVector = (TimestampColumnVector) listVector.child;
int offset = (int) listVector.offsets[rowNum];
setTimestampListVector(colValList, timestampVector, offset, fieldName);
}
private static void setDecimalListVector(List<Object> decimalValList, DecimalColumnVector decimalVector, int offset,
String fieldName) {
for (int i = 0; i < decimalValList.size(); i++) {
Object objVal = decimalValList.get(i);
if (objVal != null) {
if (objVal instanceof BigDecimal) {
BigDecimal bigDecimal = (BigDecimal) objVal;
decimalVector.precision = (short) bigDecimal.precision();
decimalVector.scale = (short) bigDecimal.scale();
HiveDecimal hiveDecimal = HiveDecimal.create(bigDecimal);
HiveDecimalWritable writeableDecimal = new HiveDecimalWritable(hiveDecimal);
decimalVector.vector[offset + i] = writeableDecimal;
} else {
throw new RuntimeException("BigDecimal value expected for field " + fieldName);
}
} else {
decimalVector.isNull[offset + i] = true;
decimalVector.noNulls = false;
}
}
}
/**
*
* @param colValList a list of BigDecimal values to initialize the
* ListColumnVector child
* @param listVector the ListColumnVector with the DecimalColumnVector child
* @param fieldName the field name for the ListColumnVector/DecimalColumnVector
* column
* @param rowNum the ORC file row number
*/
private static void setDecimalList(List<Object> colValList, ListColumnVector listVector, String fieldName,
int rowNum) {
DecimalColumnVector decimalVector = (DecimalColumnVector) listVector.child;
int offset = (int) listVector.offsets[rowNum];
setDecimalListVector(colValList, decimalVector, offset, fieldName);
}
private static void setBytesListVector(List<Object> valueList, BytesColumnVector bytesVector, int offset,
String fieldName) {
for (int i = 0; i < valueList.size(); i++) {
Object objVal = valueList.get(i);
if (objVal != null) {
if (objVal instanceof byte[] || objVal instanceof String) {
byte[] byteVec = (objVal instanceof byte[]) ? (byte[]) objVal
: ((String) objVal).getBytes(StandardCharsets.UTF_8);
bytesVector.vector[offset + i] = byteVec;
bytesVector.length[offset + i] = byteVec.length;
} else {
throw new RuntimeException("String or byte[] value expected for field " + fieldName);
}
} else {
bytesVector.isNull[offset + i] = true;
bytesVector.length[offset + i] = 0;
bytesVector.noNulls = false;
}
}
}
/**
* Initialize a ListColumnVector with a BytesColumnVector child with byte[]
* values.
*
* @param colValList a list of byte[] or String values
* @param listVector the parent ListColumnVector
* @param fieldName the field name for the ORC column that contains the
* ListColumnVector
* @param rowNum the ORC file row number
*/
private static void setBytesList(List<Object> colValList, ListColumnVector listVector, String fieldName,
int rowNum) {
BytesColumnVector bytesVector = (BytesColumnVector) listVector.child;
int offset = (int) listVector.offsets[rowNum];
setBytesListVector(colValList, bytesVector, offset, fieldName);
}
private static void setMultiValuedVectorParameters(MultiValuedColumnVector multiVector, int vecLength, int rowNum) {
multiVector.lengths[rowNum] = vecLength;
if (rowNum > 0) {
multiVector.offsets[rowNum] = multiVector.lengths[rowNum - 1] + multiVector.offsets[rowNum - 1];
}
}
private static void setListVectorParameters(ListColumnVector listVec, int maxBatchSize, int vecLength, int rowNum) {
setMultiValuedVectorParameters(listVec, vecLength, rowNum);
listVec.child.ensureSize(maxBatchSize * vecLength, true);
}
/**
* Initialize a ListColumnVector. The child of the vector is limited to the
* scalar types long, double, String (or byte[])), BigDecimal or Date (or
* Timestamp).
*
* @param colVal a List&lt;Object&gt;
* @param typeDesc the schema definition for this column
* @param fieldName the column field name
* @param listVector the ListColumnVector parent of the vector type child
* @param rowNum the ORC file row number.
*/
private static void setListColumnVector(Object colVal, TypeDescription typeDesc, String fieldName,
ListColumnVector listVector, int rowNum) {
if (colVal instanceof List) {
@SuppressWarnings("unchecked")
List<Object> objValList = (List<Object>) colVal;
final int maxBatchSize = typeDesc.createRowBatch().getMaxSize();
setListVectorParameters(listVector, maxBatchSize, objValList.size(), rowNum);
ColumnVector.Type childType = listVector.child.type;
switch (childType) {
case LONG:
setLongList(objValList, listVector, fieldName, rowNum);
break;
case DOUBLE:
setDoubleList(objValList, listVector, fieldName, rowNum);
break;
case BYTES:
setBytesList(objValList, listVector, fieldName, rowNum);
break;
case DECIMAL:
setDecimalList(objValList, listVector, fieldName, rowNum);
break;
case TIMESTAMP:
setTimestampList(objValList, listVector, fieldName, rowNum);
break;
default:
throw new RuntimeException(childType.name() + " is not supported for ListColumnVector columns");
}
} else {
throw new RuntimeException("List value expected for field " + fieldName);
}
}
/**
* Test that all elements in an Object list are of a particular type
*
* @param objList the Object list that is tested
* @param typeTest a function that compares against a particular Object type
* @return true if all elements are of the test type, false if one or more
* elements are not of that type.
*/
private static boolean isListType(List<Object> objList, Predicate<Object> typeTest) {
return !objList.stream().map(typeTest::test).collect(Collectors.toList()).contains(false);
}
/**
* Initialize a ColumnVector with Long values.
*
* @param valueList a list of Long values
* @param colVector the LongColumnVector that will be initialized with the Long
* values
* @param offset the offset[rownum] value for the array
* @param fieldName the field name for the Map column
*/
private static void setLongMapValues(List<Object> valueList, ColumnVector colVector, int offset, String fieldName) {
if (isListType(valueList, isLong) || isListType(valueList, isInteger)) {
LongColumnVector longVector = (LongColumnVector) colVector;
setLongListVector(valueList, longVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Long values expected");
}
}
/**
* Initialize a ColumnVector with Double values.
*
* @param valueList a list of Double values
* @param colVector the DoubleColumnVector that will be initialized with the
* Double values
* @param offset the offset[rownum] value for the array
* @param fieldName the field name for the Map column
*/
private static void setDoubleMapValues(List<Object> valueList, ColumnVector colVector, int offset,
String fieldName) {
if (isListType(valueList, isDouble)) {
DoubleColumnVector doubleVector = (DoubleColumnVector) colVector;
setDoubleListVector(valueList, doubleVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Double values expected");
}
}
/**
* Initialize a ColumnVector with String values.
*
* @param valueList a list of String values
* @param colVector the BytesColumnVector that will be initialized with the
* String values
* @param offset the offset[rownum] value for the array
* @param fieldName the field name for the Map column
*/
private static void setStringMapValues(List<Object> valueList, ColumnVector colVector, int offset,
String fieldName) {
if (isListType(valueList, isString)) {
BytesColumnVector doubleVector = (BytesColumnVector) colVector;
setBytesListVector(valueList, doubleVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " String values expected");
}
}
/**
* Initialize a ColumnVector with BigDeciml values.
*
* @param valueList a list of BigDecimal
* @param colVector the DecimalColumnVector that will be initialized with the
* BigDecimal values
* @param offset the offset[rownum] value for the array
* @param fieldName the field name for the Map column
*/
private static void setDecimalMapValues(List<Object> valueList, ColumnVector colVector, int offset,
String fieldName) {
if (isListType(valueList, isBigDecimal)) {
DecimalColumnVector decimalVector = (DecimalColumnVector) colVector;
setDecimalListVector(valueList, decimalVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " BigDecimal values expected");
}
}
/**
* Initialize a ColumnVector with timestamp values.
*
* @param valueList a list of Date (or Timestamp) objects
* @param colVector the TimestampColumnVector that will be initialized with the
* Timestamp values
* @param offset the offset[rownum] value for the array
* @param fieldName the field name for the Map column
*/
private static void setTimestampMapValues(List<Object> valueList, ColumnVector colVector, int offset,
String fieldName) {
if (isListType(valueList, isDate)) {
TimestampColumnVector timestampVector = (TimestampColumnVector) colVector;
setTimestampListVector(valueList, timestampVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Date or Timestamp values expected");
}
}
/**
* Set the MapColumn value array vector. The type for this vector is limited to
* long, double, bytes (String), Decimal and Timestamp.
*
* @param valueList a list of Objects to initialize the Map column value array.
* @param colVector the column array vector to be initialized with the map
* values.
* @param offset the offset[rowNum] from the parent MapColumnVector
* @param fieldName the name of the field for the MapColumnVector.
*/
private static void setMapValueVector(List<Object> valueList, ColumnVector colVector, int offset,
String fieldName) {
switch (colVector.type) {
case LONG:
setLongMapValues(valueList, colVector, offset, fieldName);
break;
case DOUBLE:
setDoubleMapValues(valueList, colVector, offset, fieldName);
break;
case BYTES:
setStringMapValues(valueList, colVector, offset, fieldName);
break;
case DECIMAL:
setDecimalMapValues(valueList, colVector, offset, fieldName);
break;
case TIMESTAMP:
setTimestampMapValues(valueList, colVector, offset, fieldName);
break;
default:
throw new RuntimeException(
"For field " + fieldName + " values must be long, double, String, BigDecimal or Timestamp");
}
}
/**
* <p>
* Initialize a MapColumnVector with Long key values.
* </p>
*
* @param mapSet a set of {key, value} pairs, where the key values are Long
* objects. The elements of this set will be used to initialize
* the key and value array column vectors that are children of
* the MapColumnVector.
* @param mapVector the MapColumnVector. This ColumnVector has children for the
* key and value arrays.
* @param fieldName the field name for the map column vector column.
* @param rowNum the ORC file row number.
*/
private static void setLongKeyMap(Set<Map.Entry<Object, Object>> mapSet, MapColumnVector mapVector,
String fieldName, int rowNum) {
List<Object> keyValueList = mapSet.stream().map(Map.Entry::getKey).collect(Collectors.toList());
if (isListType(keyValueList, isLong)) {
LongColumnVector longVector = (LongColumnVector) mapVector.keys;
int offset = (int) mapVector.offsets[rowNum];
// set the key vector
setLongListVector(keyValueList, longVector, offset, fieldName);
// set the value vector
ColumnVector valueVector = mapVector.values;
List<Object> valueList = mapSet.stream().map(Map.Entry::getValue).collect(Collectors.toList());
setMapValueVector(valueList, valueVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Long key type expected to match schema");
}
}
/**
* <p>
* Initialize a MapColumnVector with Double key values.
* </p>
*
* @param mapSet a set of {key, value} pairs, where the key values are Double
* objects. The elements of this set will be used to initialize
* the key and value array column vectors that are children of
* the MapColumnVector.
* @param mapVector the MapColumnVector. This ColumnVector has children for the
* key and value arrays.
* @param fieldName the field name for the map column vector column.
* @param rowNum the ORC file row number.
*/
private static void setDoubleKeyMap(Set<Map.Entry<Object, Object>> mapSet, MapColumnVector mapVector,
String fieldName, int rowNum) {
List<Object> keyValueList = mapSet.stream().map(Map.Entry::getKey).collect(Collectors.toList());
if (isListType(keyValueList, isDouble)) {
DoubleColumnVector doubleVector = (DoubleColumnVector) mapVector.keys;
int offset = (int) mapVector.offsets[rowNum];
// set the key vector
setDoubleListVector(keyValueList, doubleVector, offset, fieldName);
// set the value vector
ColumnVector valueVector = mapVector.values;
List<Object> valueList = mapSet.stream().map(Map.Entry::getValue).collect(Collectors.toList());
setMapValueVector(valueList, valueVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Long key type expected to match schema");
}
}
/**
* <p>
* Initialize a MapColumnVector with String key values.
* </p>
*
* @param mapSet a set of {key, value} pairs, where the key values are String
* objects. The elements of this set will be used to initialize
* the key and value array column vectors that are children of
* the MapColumnVector.
* @param mapVector the MapColumnVector. This ColumnVector has children for the
* key and value arrays.
* @param fieldName the field name for the map column vector column.
* @param rowNum the ORC file row number.
*/
private static void setStringKeyMap(Set<Map.Entry<Object, Object>> mapSet, MapColumnVector mapVector,
String fieldName, int rowNum) {
List<Object> keyValueList = mapSet.stream().map(Map.Entry::getKey).collect(Collectors.toList());
if (isListType(keyValueList, isString)) {
BytesColumnVector byteVector = (BytesColumnVector) mapVector.keys;
int offset = (int) mapVector.offsets[rowNum];
// set the key array vector
setBytesListVector(keyValueList, byteVector, offset, fieldName);
// set the value array vector
ColumnVector valueVector = mapVector.values;
List<Object> valueList = mapSet.stream().map(Map.Entry::getValue).collect(Collectors.toList());
setMapValueVector(valueList, valueVector, offset, fieldName);
} else {
throw new RuntimeException("For field " + fieldName + " Long key type expected to match schema");
}
}
private static void setMapVectorParameters(MapColumnVector mapVec, int maxBatchSize, int vecLength, int rowNum) {
setMultiValuedVectorParameters(mapVec, vecLength, rowNum);
mapVec.keys.ensureSize(maxBatchSize + vecLength, true);
mapVec.values.ensureSize(maxBatchSize + vecLength, true);
}
/**
* <p>
* Set the Map key and value elements for a MapColumnVector
* </p>
* <p>
* A MapColumnVector has a single ColumnVector type for each of the map key and
* map values. For example, the ColumnVector for the key values could be a
* BytesColumnVector (a set of String keys). The values could be a
* LongColumnVector.
* </p>
* <p>
* In the documentation there is no restriction given for the map key type. This
* code limits the key types to scalar values: string, long, double.
* </p>
* </p>
* <p>
* The documentation does not limit the map value types. This code limites the
* map values to the same types that are supported for ListColumnVectors: long,
* double, bytes (String), Decimal and Timestamp.
* </p>
*
* @param colVal a HashMap object
* @param typeDesc the schema description for the MapColumnVector column
* @param fieldName the field name of the MapColumnVector column
* @param mapVector The parent MapColumnVector
* @param rowNum the ORC file column number.
*/
private static void setMapColumnVector(Object colVal, TypeDescription typeDesc, String fieldName,
MapColumnVector mapVector, int rowNum) {
if (colVal == null) {
mapVector.isNull[rowNum] = true;
mapVector.noNulls = false;
} else {
if (colVal instanceof HashMap) {
@SuppressWarnings("unchecked")
Map<Object, Object> rawMap = (HashMap<Object, Object>) colVal;
int mapLen = rawMap.size();
final int maxBatchSize = typeDesc.createRowBatch().getMaxSize();
setMapVectorParameters(mapVector, maxBatchSize, mapLen, rowNum);
if (ORCCommonUtils.checkMapColumnVectorTypes(mapVector)) {
Set<Map.Entry<Object, Object>> mapSet = rawMap.entrySet();
switch (mapVector.keys.type) {
case LONG:
setLongKeyMap(mapSet, mapVector, fieldName, rowNum);
break;
case DOUBLE:
setDoubleKeyMap(mapSet, mapVector, fieldName, rowNum);
break;
case BYTES:
setStringKeyMap(mapSet, mapVector, fieldName, rowNum);
break;
default: {
break;
/* This block left intentionally empty */
}
}
} else {
throw new RuntimeException(
"For field " + fieldName + " key types are limited to string, long and double. "
+ "value types are limited to long, double, String, decimal and timestamp");
}
}
}
}
/**
* Set a column value in an ORC a row that will be written to the ORC file.
*
* @param colVal an Object containing the values to be written to the column
* @param typeDesc the TypeDescription from the schema that defines the column
* @param fieldName the column field name
* @param vector the ColumnVector that will be initialized with the values in
* the colVal argument.
* @param rowNum the ORC file row number.
*/
public static void setColumn(Object colVal, TypeDescription typeDesc, String fieldName, ColumnVector vector,
int rowNum) {
if (colVal == null) {
vector.isNull[rowNum] = true;
vector.noNulls = false;
} else {
switch (vector.type) {
case LONG: {
if (vector instanceof DateColumnVector) {
DateColumnVector dateVector = (DateColumnVector) vector;
setDateColumnVector(colVal, fieldName, dateVector, rowNum);
} else {
LongColumnVector longVector = (LongColumnVector) vector;
setLongColumnVector(colVal, fieldName, longVector, rowNum);
}
break;
}
case DOUBLE: {
DoubleColumnVector doubleVector = (DoubleColumnVector) vector;
setDoubleVector(colVal, fieldName, doubleVector, rowNum);
break;
}
case BYTES: {
BytesColumnVector bytesColVector = (BytesColumnVector) vector;
setByteColumnVector(colVal, fieldName, bytesColVector, rowNum);
break;
}
case DECIMAL: {
DecimalColumnVector decimalVector = (DecimalColumnVector) vector;
setDecimalVector(colVal, fieldName, decimalVector, rowNum);
break;
}
case DECIMAL_64:
throw new RuntimeException("Field: " + fieldName + ", Decimal64ColumnVector is not supported");
case TIMESTAMP: {
TimestampColumnVector timestampVector = (TimestampColumnVector) vector;
setTimestampVector(colVal, fieldName, timestampVector, rowNum);
break;
}
case INTERVAL_DAY_TIME:
throw new RuntimeException("Field: " + fieldName + ", HiveIntervalDayTime is not supported");
case STRUCT: {
StructColumnVector structVector = (StructColumnVector) vector;
// setStructColumnVector(colVal, typeDesc, fieldName, structVector, rowNum);
break;
}
case LIST: {
ListColumnVector listVector = (ListColumnVector) vector;
setListColumnVector(colVal, typeDesc, fieldName, listVector, rowNum);
break;
}
case MAP: {
MapColumnVector mapVector = (MapColumnVector) vector;
setMapColumnVector(colVal, typeDesc, fieldName, mapVector, rowNum);
break;
}
case UNION: {
UnionColumnVector unionVector = (UnionColumnVector) vector;
setUnionColumnVector(colVal, typeDesc, fieldName, unionVector, rowNum);
break;
}
default:
throw new RuntimeException("setColumn: Internal error: unexpected ColumnVector subtype");
} // switch
} // else
} // setColumn
private static String orcExceptionMsg(String prefixMsg, String fieldName, int rowNum) {
return prefixMsg + fieldName + " in row " + rowNum;
}
public static TypeDescription detectType(Object value) {
TypeDescription type = null;
if (value != null) {
if (value instanceof Boolean) {
type = TypeDescription.createBoolean();
} else if (value instanceof Short) {
type = TypeDescription.createShort();
} else if (value instanceof Integer) {
type = TypeDescription.createInt();
} else if (value instanceof Long) {
type = TypeDescription.createLong();
} else if (value instanceof Timestamp) {
type = TypeDescription.createTimestamp();
} else if (value instanceof BigDecimal) {
type = TypeDescription.createDecimal();
} else if (value instanceof Byte) {
type = TypeDescription.createByte();
} else if (value instanceof Float) {
type = TypeDescription.createFloat();
} else if (value instanceof Double) {
type = TypeDescription.createDouble();
} else if (value instanceof String) {
type = TypeDescription.createString();
} else if (value instanceof Date) {
type = TypeDescription.createDate();
} else if (value instanceof byte[]) {
type = TypeDescription.createBinary();
} else {
throw new RuntimeException(
value.getClass().getName() + " is not supported for ListColumnVector columns");
}
} else {
type = TypeDescription.createString();
}
return type;
}
}

View File

@@ -1,253 +0,0 @@
package org.talend.orc;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFile.WriterOptions;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class ORCUtilsTest {
private static File localFolder;
@BeforeAll
static void setup() throws IOException {
localFolder = createTempDirectory();
}
@AfterAll
static void dispose() {
localFolder.delete();
}
/**
* Test that an exception is thrown if a Date type is written.
*
* At the time this test was written, the ORC writer did not correctly write the
* date epoch value to the ORC file. The value was written as a 32-bit int,
* instead of a 64 bit long. As a result, the date is incorrect. A timestamp
* value should be used instead.
*
* @param tempDirPath
* @throws ORCFileException
* @throws InterruptedException
*/
@Test
void testAllDataTypes() throws Throwable {
String filePath = localFolder.getAbsolutePath() + "/testAllTypes.orc";
writeData(filePath);
readAndCheckData(filePath);
}
@Test
void testDetectType() {
Assertions.assertEquals(TypeDescription.Category.BOOLEAN, ORCWriteUtils.detectType(true).getCategory());
Assertions.assertEquals(TypeDescription.Category.SHORT,
ORCWriteUtils.detectType(Short.valueOf("1")).getCategory());
Assertions.assertEquals(TypeDescription.Category.INT, ORCWriteUtils.detectType(1).getCategory());
Assertions.assertEquals(TypeDescription.Category.LONG, ORCWriteUtils.detectType(1L).getCategory());
Assertions.assertEquals(TypeDescription.Category.TIMESTAMP,
ORCWriteUtils.detectType(new Timestamp(System.currentTimeMillis())).getCategory());
Assertions.assertEquals(TypeDescription.Category.DECIMAL,
ORCWriteUtils.detectType(new BigDecimal("1")).getCategory());
Assertions.assertEquals(TypeDescription.Category.BYTE,
ORCWriteUtils.detectType(Byte.valueOf("1")).getCategory());
Assertions.assertEquals(TypeDescription.Category.FLOAT, ORCWriteUtils.detectType(1.0f).getCategory());
Assertions.assertEquals(TypeDescription.Category.DOUBLE, ORCWriteUtils.detectType(1.0).getCategory());
Assertions.assertEquals(TypeDescription.Category.STRING, ORCWriteUtils.detectType("test").getCategory());
Assertions.assertEquals(TypeDescription.Category.DATE, ORCWriteUtils.detectType(new Date()).getCategory());
Assertions.assertEquals(TypeDescription.Category.BINARY,
ORCWriteUtils.detectType("test".getBytes()).getCategory());
}
private void writeData(String filePath) throws Throwable {
TypeDescription schema = TypeDescription.createStruct();
schema.addField("t_boolean", TypeDescription.createBoolean());
schema.addField("t_byte", TypeDescription.createByte());
schema.addField("t_bytes", TypeDescription.createBinary());
schema.addField("t_char", TypeDescription.createChar());
schema.addField("t_date", TypeDescription.createDate());
schema.addField("t_ts", TypeDescription.createTimestamp());
schema.addField("t_double", TypeDescription.createDouble());
schema.addField("t_float", TypeDescription.createFloat());
schema.addField("t_decimal", TypeDescription.createDecimal().withPrecision(18).withScale(5));
schema.addField("t_int", TypeDescription.createInt());
schema.addField("t_long", TypeDescription.createLong());
schema.addField("t_short", TypeDescription.createShort());
schema.addField("t_string", TypeDescription.createString());
schema.addField("t_list", TypeDescription.createList(TypeDescription.createString()));
WriterOptions writerOption = OrcFile.writerOptions(new Configuration()) //
.overwrite(true) //
.compress(CompressionKind.valueOf("ZLIB")).setSchema(schema); //
Writer writer = OrcFile.createWriter(new Path(filePath), writerOption);
VectorizedRowBatch batch = schema.createRowBatch(100);
for (int r = 0; r < 1000; ++r) {
int row = batch.size++;
for (int i = 0; i < batch.cols.length; i++) {
ColumnVector vector = batch.cols[i];
TypeDescription type = schema.getChildren().get(i);
switch (vector.type) {
case BYTES:
if (type.getCategory() == TypeDescription.Category.BINARY) {
ORCWriteUtils.setColumn(("this is byte[] " + r).getBytes(), null, "t_bytes", vector, row);
} else if (type.getCategory() == TypeDescription.Category.STRING) {
if(r==666) {
ORCWriteUtils.setColumn(null, null, "t_string", vector, row);
}else {
ORCWriteUtils.setColumn(("this is String " + r), null, "t_string", vector, row);
}
} else if (type.getCategory() == TypeDescription.Category.CHAR) {
ORCWriteUtils.setColumn("talend".charAt(r % 6), null, "t_char", vector, row);
} else {
throw new RuntimeException(type.getCategory() + " is not supported as BYTES vector");
}
break;
case DECIMAL:
ORCWriteUtils.setColumn(new BigDecimal(r + ".12345"), null, "t_decimal", vector, row);
break;
case DOUBLE:
if (type.getCategory() == TypeDescription.Category.DOUBLE) {
ORCWriteUtils.setColumn(r + 0.123, null, "t_double", vector, row);
} else if (type.getCategory() == TypeDescription.Category.FLOAT) {
ORCWriteUtils.setColumn(r + 0.456f, null, "t_float", vector, row);
} else {
throw new RuntimeException(type.getCategory() + " is not supported as DOUBLE vector");
}
break;
case LONG:
if (type.getCategory() == TypeDescription.Category.BOOLEAN) {
ORCWriteUtils.setColumn(true, null, "t_boolean", vector, row);
} else if (type.getCategory() == TypeDescription.Category.BYTE) {
ORCWriteUtils.setColumn((byte)(r % 128), null, "t_byte", vector, row);
} else if (type.getCategory() == TypeDescription.Category.INT) {
ORCWriteUtils.setColumn(r, null, "t_int", vector, row);
} else if (type.getCategory() == TypeDescription.Category.SHORT) {
ORCWriteUtils.setColumn((short)(r % 256), null, "t_short", vector, row);
} else if (type.getCategory() == TypeDescription.Category.LONG) {
ORCWriteUtils.setColumn(r * 1000L, null, "t_long", vector, row);
} else if (type.getCategory() == TypeDescription.Category.DATE) {
Date d = new Date(1633687854031L);
ORCWriteUtils.setColumn(d, null, "t_date", vector, row);
} else {
throw new RuntimeException(type.getCategory() + " is not supported as LONG vector");
}
break;
case TIMESTAMP:
Timestamp ts = new java.sql.Timestamp(1633687854031L);
ts.setNanos(123456789);
ORCWriteUtils.setColumn(ts, null, "t_ts", vector, row);
break;
case LIST:
List<String> values = new ArrayList<>();
values.add("v1_" + r);
values.add("v2_" + r);
values.add("v3_" + r);
ORCWriteUtils.setColumn(values, ORCWriteUtils.detectType("v1_" + r), "t_ list", vector, row);
break;
default:
throw new RuntimeException(vector.type + " is not supported");
}
}
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
}
writer.close();
}
private void readAndCheckData(String filePath) throws Throwable {
Reader reader = OrcFile.createReader(new Path(filePath), OrcFile.readerOptions(new Configuration()));
TypeDescription schema = reader.getSchema();
VectorizedRowBatch batch = schema.createRowBatch();
RecordReader rowIterator = reader.rows(reader.options().schema(schema));
int nuberLine = 0;
List<Object> nb_500 = new ArrayList<>();
List<Object> nb_666 = new ArrayList<>();
while (rowIterator.nextBatch(batch)) {
ColumnVector[] colVectors = batch.cols;
for (int row = 0; row < batch.size; ++row) {
nuberLine++;
for (String columnName : schema.getFieldNames()) {
ColumnVector colVector = colVectors[schema.getFieldNames().indexOf(columnName)];
int colRow = colVector.isRepeating ? 0 : row;
Object value = ORCReadUtils.readColumnByName(batch, columnName, schema, colRow);
if (nuberLine == 500) {
nb_500.add(value);
}else if (nuberLine == 667) {
nb_666.add(value);
}
}
}
}
Assertions.assertEquals(true, nb_500.get(0));
Assertions.assertEquals(Byte.valueOf("115"), nb_500.get(1));
Assertions.assertEquals("this is byte[] 499", new String((byte[]) nb_500.get(2)));
Assertions.assertEquals("a", nb_500.get(3));
Date t_date = (Date) nb_500.get(4);
Assertions.assertEquals((1633687854000L/86400000), t_date.getTime()/86400000);
Timestamp t_ts = (Timestamp) nb_500.get(5);
Assertions.assertEquals(1633687854123L, t_ts.getTime());
Assertions.assertEquals(123456789, t_ts.getNanos());
Assertions.assertEquals(499.123, nb_500.get(6));
Assertions.assertEquals(499.456f, (((float) nb_500.get(7)) * 1000) / 1000f);
Assertions.assertEquals(new BigDecimal("499.12345"), nb_500.get(8));
Assertions.assertEquals(499, nb_500.get(9));
Assertions.assertEquals(499000L, nb_500.get(10));
Assertions.assertEquals(Short.valueOf("243"), nb_500.get(11));
Assertions.assertEquals("this is String 499", nb_500.get(12));
Assertions.assertArrayEquals(Arrays.asList("v1_499", "v2_499", "v3_499").toArray(),
((List<Object>) nb_500.get(13)).toArray());
//NB_LINE 666
Assertions.assertNull( nb_666.get(12));
rowIterator.close();
}
public static File createTempDirectory() throws IOException {
final File temp;
temp = File.createTempFile("temp", Long.toString(System.nanoTime()));
if (!temp.delete()) {
throw new IOException("Could not delete temp file: " + temp.getAbsolutePath());
}
if (!temp.mkdir()) {
throw new IOException("Could not create temp directory: " + temp.getAbsolutePath());
}
return temp;
}
}

View File

@@ -1,98 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.talend.components</groupId>
<artifactId>talend-parquet</artifactId>
<version>1.3</version>
<properties>
<parquet.version>1.10.1</parquet.version>
<hadoop.version>3.2.2</hadoop.version>
<jodd.version>6.0.1</jodd.version>
<hamcrest.version>1.3</hamcrest.version>
<junit.version>4.13.2</junit.version>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
</properties>
<distributionManagement>
<snapshotRepository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceSnapshot/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</snapshotRepository>
<repository>
<id>talend_nexus_deployment</id>
<url>${talend.nexus.url}/nexus/content/repositories/TalendOpenSourceRelease/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-util</artifactId>
<version>${jodd.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,141 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data;
import org.talend.parquet.data.simple.NanoTime;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract public class Group extends GroupValueSource {
private static final Logger LOG = LoggerFactory.getLogger(Group.class);
public void add(String field, int value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, long value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, float value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, double value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, String value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, NanoTime value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, boolean value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, Binary value) {
add(getType().getFieldIndex(field), value);
}
public void add(String field, Group value) {
add(getType().getFieldIndex(field), value);
}
public Group addGroup(String field) {
if (LOG.isDebugEnabled()) {
LOG.debug("add group {} to {}", field, getType().getName());
}
return addGroup(getType().getFieldIndex(field));
}
@Override
public Group getGroup(String field, int index) {
return getGroup(getType().getFieldIndex(field), index);
}
abstract public void add(int fieldIndex, int value);
abstract public void add(int fieldIndex, long value);
abstract public void add(int fieldIndex, String value);
abstract public void add(int fieldIndex, boolean value);
abstract public void add(int fieldIndex, NanoTime value);
abstract public void add(int fieldIndex, Binary value);
abstract public void add(int fieldIndex, float value);
abstract public void add(int fieldIndex, double value);
abstract public void add(int fieldIndex, Group value);
abstract public Group addGroup(int fieldIndex);
@Override
abstract public Group getGroup(int fieldIndex, int index);
public Group asGroup() {
return this;
}
public Group append(String fieldName, int value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, float value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, double value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, long value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, NanoTime value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, String value) {
add(fieldName, Binary.fromString(value));
return this;
}
public Group append(String fieldName, boolean value) {
add(fieldName, value);
return this;
}
public Group append(String fieldName, Binary value) {
add(fieldName, value);
return this;
}
abstract public void writeValue(int field, int index, RecordConsumer recordConsumer);
}

View File

@@ -1,19 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data;
abstract public class GroupFactory {
abstract public Group newGroup();
}

View File

@@ -1,83 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
abstract public class GroupValueSource {
public int getFieldRepetitionCount(String field) {
return getFieldRepetitionCount(getType().getFieldIndex(field));
}
public GroupValueSource getGroup(String field, int index) {
return getGroup(getType().getFieldIndex(field), index);
}
public String getString(String field, int index) {
return getString(getType().getFieldIndex(field), index);
}
public int getInteger(String field, int index) {
return getInteger(getType().getFieldIndex(field), index);
}
public long getLong(String field, int index) {
return getLong(getType().getFieldIndex(field), index);
}
public double getDouble(String field, int index) {
return getDouble(getType().getFieldIndex(field), index);
}
public float getFloat(String field, int index) {
return getFloat(getType().getFieldIndex(field), index);
}
public boolean getBoolean(String field, int index) {
return getBoolean(getType().getFieldIndex(field), index);
}
public Binary getBinary(String field, int index) {
return getBinary(getType().getFieldIndex(field), index);
}
public Binary getInt96(String field, int index) {
return getInt96(getType().getFieldIndex(field), index);
}
abstract public int getFieldRepetitionCount(int fieldIndex);
abstract public GroupValueSource getGroup(int fieldIndex, int index);
abstract public String getString(int fieldIndex, int index);
abstract public Integer getInteger(int fieldIndex, int index);
abstract public Long getLong(int fieldIndex, int index);
abstract public Double getDouble(int fieldIndex, int index);
abstract public Float getFloat(int fieldIndex, int index);
abstract public Boolean getBoolean(int fieldIndex, int index);
abstract public Binary getBinary(int fieldIndex, int index);
abstract public Binary getInt96(int fieldIndex, int index);
abstract public String getValueToString(int fieldIndex, int index);
abstract public GroupType getType();
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
public class GroupWriter {
private final RecordConsumer recordConsumer;
private final GroupType schema;
public GroupWriter(RecordConsumer recordConsumer, GroupType schema) {
this.recordConsumer = recordConsumer;
this.schema = schema;
}
public void write(Group group) {
recordConsumer.startMessage();
writeGroup(group, schema);
recordConsumer.endMessage();
}
private void writeGroup(Group group, GroupType type) {
int fieldCount = type.getFieldCount();
for (int field = 0; field < fieldCount; ++field) {
int valueCount = group.getFieldRepetitionCount(field);
if (valueCount > 0) {
Type fieldType = type.getType(field);
String fieldName = fieldType.getName();
recordConsumer.startField(fieldName, field);
for (int index = 0; index < valueCount; ++index) {
if (fieldType.isPrimitive()) {
group.writeValue(field, index, recordConsumer);
} else {
recordConsumer.startGroup();
writeGroup(group.getGroup(field, index), fieldType.asGroupType());
recordConsumer.endGroup();
}
}
recordConsumer.endField(fieldName, field);
}
}
}
}

View File

@@ -1,45 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
public class BinaryValue extends Primitive {
private final Binary binary;
public BinaryValue(Binary binary) {
this.binary = binary;
}
@Override
public Binary getBinary() {
return binary;
}
@Override
public String getString() {
return binary.toStringUsingUTF8();
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addBinary(binary);
}
@Override
public String toString() {
return getString();
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.RecordConsumer;
public class BooleanValue extends Primitive {
private final boolean bool;
public BooleanValue(boolean bool) {
this.bool = bool;
}
@Override
public String toString() {
return String.valueOf(bool);
}
@Override
public boolean getBoolean() {
return bool;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addBoolean(bool);
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.RecordConsumer;
public class DoubleValue extends Primitive {
private final double value;
public DoubleValue(double value) {
this.value = value;
}
@Override
public double getDouble() {
return value;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addDouble(value);
}
@Override
public String toString() {
return String.valueOf(value);
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.RecordConsumer;
public class FloatValue extends Primitive {
private final float value;
public FloatValue(float value) {
this.value = value;
}
@Override
public float getFloat() {
return value;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addFloat(value);
}
@Override
public String toString() {
return String.valueOf(value);
}
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
public class Int96Value extends Primitive {
private final Binary value;
public Int96Value(Binary value) {
this.value = value;
}
@Override
public Binary getInt96() {
return value;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addBinary(value);
}
@Override
public String toString() {
return "Int96Value{" + String.valueOf(value) + "}";
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.RecordConsumer;
public class IntegerValue extends Primitive {
private final int value;
public IntegerValue(int value) {
this.value = value;
}
@Override
public String toString() {
return String.valueOf(value);
}
@Override
public int getInteger() {
return value;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addInteger(value);
}
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.RecordConsumer;
public class LongValue extends Primitive {
private final long value;
public LongValue(long value) {
this.value = value;
}
@Override
public String toString() {
return String.valueOf(value);
}
@Override
public long getLong() {
return value;
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addLong(value);
}
}

View File

@@ -1,74 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.parquet.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
public class NanoTime extends Primitive {
private final int julianDay;
private final long timeOfDayNanos;
public static NanoTime fromBinary(Binary bytes) {
Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes");
ByteBuffer buf = bytes.toByteBuffer();
buf.order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();
return new NanoTime(julianDay, timeOfDayNanos);
}
public static NanoTime fromInt96(Int96Value int96) {
ByteBuffer buf = int96.getInt96().toByteBuffer();
return new NanoTime(buf.getInt(), buf.getLong());
}
public NanoTime(int julianDay, long timeOfDayNanos) {
this.julianDay = julianDay;
this.timeOfDayNanos = timeOfDayNanos;
}
public int getJulianDay() {
return julianDay;
}
public long getTimeOfDayNanos() {
return timeOfDayNanos;
}
public Binary toBinary() {
ByteBuffer buf = ByteBuffer.allocate(12);
buf.order(ByteOrder.LITTLE_ENDIAN);
buf.putLong(timeOfDayNanos);
buf.putInt(julianDay);
buf.flip();
return Binary.fromConstantByteBuffer(buf);
}
public Int96Value toInt96() {
return new Int96Value(toBinary());
}
@Override
public void writeValue(RecordConsumer recordConsumer) {
recordConsumer.addBinary(toBinary());
}
@Override
public String toString() {
return "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}";
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
public abstract class Primitive {
public String getString() {
throw new UnsupportedOperationException();
}
public int getInteger() {
throw new UnsupportedOperationException();
}
public long getLong() {
throw new UnsupportedOperationException();
}
public boolean getBoolean() {
throw new UnsupportedOperationException();
}
public Binary getBinary() {
throw new UnsupportedOperationException();
}
public Binary getInt96() {
throw new UnsupportedOperationException();
}
public float getFloat() {
throw new UnsupportedOperationException();
}
public double getDouble() {
throw new UnsupportedOperationException();
}
abstract public void writeValue(RecordConsumer recordConsumer);
}

View File

@@ -1,274 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import java.util.ArrayList;
import java.util.List;
import org.talend.parquet.data.Group;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
public class SimpleGroup extends Group {
private final GroupType schema;
private final List<Object>[] data;
@SuppressWarnings("unchecked")
public SimpleGroup(GroupType schema) {
this.schema = schema;
this.data = new List[schema.getFields().size()];
for (int i = 0; i < schema.getFieldCount(); i++) {
this.data[i] = new ArrayList<>();
}
}
@Override
public String toString() {
return toString("");
}
private StringBuilder appendToString(StringBuilder builder, String indent) {
int i = 0;
for (Type field : schema.getFields()) {
String name = field.getName();
List<Object> values = data[i];
++i;
if (values != null && !values.isEmpty()) {
for (Object value : values) {
builder.append(indent).append(name);
if (value == null) {
builder.append(": NULL\n");
} else if (value instanceof Group) {
builder.append('\n');
((SimpleGroup) value).appendToString(builder, indent + " ");
} else {
builder.append(": ").append(value.toString()).append('\n');
}
}
}
}
return builder;
}
public String toString(String indent) {
StringBuilder builder = new StringBuilder();
appendToString(builder, indent);
return builder.toString();
}
@Override
public Group addGroup(int fieldIndex) {
SimpleGroup g = new SimpleGroup(schema.getType(fieldIndex).asGroupType());
add(fieldIndex, g);
return g;
}
@Override
public Group getGroup(int fieldIndex, int index) {
return (Group) getValue(fieldIndex, index);
}
private Object getValue(int fieldIndex, int index) {
List<Object> list;
try {
list = data[fieldIndex];
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException(
"not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + ") in group:\n" + this);
}
try {
if(list == null || list.isEmpty()) {
return null;
}
return list.get(index);
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex)
+ ") element number " + index + " in group:\n" + this);
}
}
private void add(int fieldIndex, Primitive value) {
Type type = schema.getType(fieldIndex);
List<Object> list = data[fieldIndex];
if (!type.isRepetition(Type.Repetition.REPEATED) && !list.isEmpty()) {
throw new IllegalStateException(
"field " + fieldIndex + " (" + type.getName() + ") can not have more than one value: " + list);
}
list.add(value);
}
@Override
public int getFieldRepetitionCount(int fieldIndex) {
List<Object> list = data[fieldIndex];
return list == null ? 0 : list.size();
}
@Override
public String getValueToString(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return String.valueOf(value);
}
@Override
public String getString(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((BinaryValue) value).getString();
}
@Override
public Integer getInteger(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((IntegerValue)value).getInteger();
}
@Override
public Long getLong(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((LongValue)value).getLong();
}
@Override
public Double getDouble(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((DoubleValue)value).getDouble();
}
@Override
public Float getFloat(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((FloatValue)value).getFloat();
}
@Override
public Boolean getBoolean(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((BooleanValue) value).getBoolean();
}
@Override
public Binary getBinary(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((BinaryValue) value).getBinary();
}
public NanoTime getTimeNanos(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return NanoTime.fromInt96((Int96Value) value);
}
@Override
public Binary getInt96(int fieldIndex, int index) {
Object value = getValue(fieldIndex, index);
if(value == null) {
return null;
}
return ((Int96Value) value).getInt96();
}
@Override
public void add(int fieldIndex, int value) {
add(fieldIndex, new IntegerValue(value));
}
@Override
public void add(int fieldIndex, long value) {
add(fieldIndex, new LongValue(value));
}
@Override
public void add(int fieldIndex, String value) {
add(fieldIndex, new BinaryValue(Binary.fromString(value)));
}
@Override
public void add(int fieldIndex, NanoTime value) {
add(fieldIndex, value.toInt96());
}
@Override
public void add(int fieldIndex, boolean value) {
add(fieldIndex, new BooleanValue(value));
}
@Override
public void add(int fieldIndex, Binary value) {
switch (getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
add(fieldIndex, new BinaryValue(value));
break;
case INT96:
add(fieldIndex, new Int96Value(value));
break;
default:
throw new UnsupportedOperationException(
getType().asPrimitiveType().getName() + " not supported for Binary");
}
}
@Override
public void add(int fieldIndex, float value) {
add(fieldIndex, new FloatValue(value));
}
@Override
public void add(int fieldIndex, double value) {
add(fieldIndex, new DoubleValue(value));
}
@Override
public void add(int fieldIndex, Group value) {
data[fieldIndex].add(value);
}
@Override
public GroupType getType() {
return schema;
}
@Override
public void writeValue(int field, int index, RecordConsumer recordConsumer) {
((Primitive) getValue(field, index)).writeValue(recordConsumer);
}
}

View File

@@ -1,32 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple;
import org.talend.parquet.data.Group;
import org.talend.parquet.data.GroupFactory;
import org.apache.parquet.schema.MessageType;
public class SimpleGroupFactory extends GroupFactory {
private final MessageType schema;
public SimpleGroupFactory(MessageType schema) {
this.schema = schema;
}
@Override
public Group newGroup() {
return new SimpleGroup(schema);
}
}

View File

@@ -1,51 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple.convert;
import org.talend.parquet.data.Group;
import org.talend.parquet.data.simple.SimpleGroupFactory;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
public class GroupRecordConverter extends RecordMaterializer<Group> {
private final SimpleGroupFactory simpleGroupFactory;
private SimpleGroupConverter root;
public GroupRecordConverter(MessageType schema) {
this.simpleGroupFactory = new SimpleGroupFactory(schema);
this.root = new SimpleGroupConverter(null, 0, schema) {
@Override
public void start() {
this.current = simpleGroupFactory.newGroup();
}
@Override
public void end() {
}
};
}
@Override
public Group getCurrentRecord() {
return root.getCurrentRecord();
}
@Override
public GroupConverter getRootConverter() {
return root;
}
}

View File

@@ -1,61 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple.convert;
import org.talend.parquet.data.Group;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.Type;
class SimpleGroupConverter extends GroupConverter {
private final SimpleGroupConverter parent;
private final int index;
protected Group current;
private Converter[] converters;
SimpleGroupConverter(SimpleGroupConverter parent, int index, GroupType schema) {
this.parent = parent;
this.index = index;
converters = new Converter[schema.getFieldCount()];
for (int i = 0; i < converters.length; i++) {
final Type type = schema.getType(i);
if (type.isPrimitive()) {
converters[i] = new SimplePrimitiveConverter(this, i);
} else {
converters[i] = new SimpleGroupConverter(this, i, type.asGroupType());
}
}
}
@Override
public void start() {
current = parent.getCurrentRecord().addGroup(index);
}
@Override
public Converter getConverter(int fieldIndex) {
return converters[fieldIndex];
}
@Override
public void end() {
}
public Group getCurrentRecord() {
return current;
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.data.simple.convert;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
class SimplePrimitiveConverter extends PrimitiveConverter {
private final SimpleGroupConverter parent;
private final int index;
SimplePrimitiveConverter(SimpleGroupConverter parent, int index) {
this.parent = parent;
this.index = index;
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addBinary(Binary)
*/
@Override
public void addBinary(Binary value) {
parent.getCurrentRecord().add(index, value);
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addBoolean(boolean)
*/
@Override
public void addBoolean(boolean value) {
parent.getCurrentRecord().add(index, value);
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addDouble(double)
*/
@Override
public void addDouble(double value) {
parent.getCurrentRecord().add(index, value);
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addFloat(float)
*/
@Override
public void addFloat(float value) {
parent.getCurrentRecord().add(index, value);
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addInt(int)
*/
@Override
public void addInt(int value) {
parent.getCurrentRecord().add(index, value);
}
/**
* {@inheritDoc}
*
* @see org.apache.parquet.io.api.PrimitiveConverter#addLong(long)
*/
@Override
public void addLong(long value) {
parent.getCurrentRecord().add(index, value);
}
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.hadoop;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.talend.parquet.data.Group;
import org.talend.parquet.data.simple.convert.GroupRecordConverter;
public class TalendGroupReadSupport extends ReadSupport<Group> {
@Override
public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(Configuration configuration,
Map<String, String> keyValueMetaData, MessageType fileSchema) {
String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString);
return new ReadContext(requestedProjection);
}
@Override
public RecordMaterializer<Group> prepareForRead(Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
return new GroupRecordConverter(readContext.getRequestedSchema());
}
}

View File

@@ -1,81 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.hadoop;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
import org.talend.parquet.data.Group;
import org.talend.parquet.data.GroupWriter;
public class TalendGroupWriteSupport extends WriteSupport<Group> {
public static final String PARQUET_SCHEMA = "parquet.talend.schema";
public static void setSchema(MessageType schema, Configuration configuration) {
configuration.set(PARQUET_SCHEMA, schema.toString());
}
public static MessageType getSchema(Configuration configuration) {
return parseMessageType(Objects.requireNonNull(configuration.get(PARQUET_SCHEMA), PARQUET_SCHEMA));
}
private MessageType schema;
private GroupWriter groupWriter;
private Map<String, String> extraMetaData;
public TalendGroupWriteSupport() {
this(null, new HashMap<String, String>());
}
TalendGroupWriteSupport(MessageType schema) {
this(schema, new HashMap<String, String>());
}
TalendGroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) {
this.schema = schema;
this.extraMetaData = extraMetaData;
}
@Override
public String getName() {
return "Talend";
}
@Override
public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) {
// if present, prefer the schema passed to the constructor
if (schema == null) {
schema = getSchema(configuration);
}
return new WriteContext(schema, this.extraMetaData);
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
groupWriter = new GroupWriter(recordConsumer, schema);
}
@Override
public void write(Group record) {
groupWriter.write(record);
}
}

View File

@@ -1,30 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.hadoop;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.talend.parquet.data.Group;
/**
* Example input format to read Parquet files
*
* This Input format uses a rather inefficient data model but works
* independently of higher level abstractions.
*/
public class TalendInputFormat extends ParquetInputFormat<Group> {
public TalendInputFormat() {
super(TalendGroupReadSupport.class);
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.hadoop;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.schema.MessageType;
import org.talend.parquet.data.Group;
/**
* An example output format
*
* must be provided the schema up front
*
* @see TalendOutputFormat#setSchema(Job, MessageType)
* @see TalendGroupWriteSupport#PARQUET_SCHEMA
*/
public class TalendOutputFormat extends ParquetOutputFormat<Group> {
/**
* set the schema being written to the job conf
*
* @param job a job
* @param schema the schema of the data
*/
public static void setSchema(Job job, MessageType schema) {
TalendGroupWriteSupport.setSchema(schema, ContextUtil.getConfiguration(job));
}
/**
* retrieve the schema from the conf
*
* @param job a job
* @return the schema
*/
public static MessageType getSchema(Job job) {
return TalendGroupWriteSupport.getSchema(ContextUtil.getConfiguration(job));
}
public TalendOutputFormat() {
super(new TalendGroupWriteSupport());
}
}

View File

@@ -1,108 +0,0 @@
/*
* Copyright (C) 2006-2021 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.talend.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import org.talend.parquet.data.Group;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* An example file writer class.
*/
public class TalendParquetWriter extends ParquetWriter<Group> {
/**
* Creates a Builder for configuring ParquetWriter with the example object
*
* @param file the output file to create
* @return a {@link Builder} to create a {@link ParquetWriter}
*/
public static Builder builder(Path file) {
return new Builder(file);
}
/**
* Creates a Builder for configuring ParquetWriter with the example object
*
* @param file the output file to create
* @return a {@link Builder} to create a {@link ParquetWriter}
*/
public static Builder builder(OutputFile file) {
return new Builder(file);
}
/**
* Create a new {@link TalendParquetWriter}.
*
* @param file The file name to write to.
* @param writeSupport The schema to write with.
* @param compressionCodecName Compression code to use, or
* CompressionCodecName.UNCOMPRESSED
* @param blockSize the block size threshold.
* @param pageSize See parquet write up. Blocks are subdivided into
* pages for alignment and other purposes.
* @param enableDictionary Whether to use a dictionary to compress columns.
* @param conf The Configuration to use.
* @throws IOException
*/
TalendParquetWriter(Path file, WriteSupport<Group> writeSupport, CompressionCodecName compressionCodecName,
int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation,
ParquetProperties.WriterVersion writerVersion, Configuration conf) throws IOException {
super(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary,
enableValidation, writerVersion, conf);
}
public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;
private Map<String, String> extraMetaData = new HashMap<String, String>();
private Builder(Path file) {
super(file);
}
private Builder(OutputFile file) {
super(file);
}
public Builder withType(MessageType type) {
this.type = type;
return this;
}
public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}
@Override
protected Builder self() {
return this;
}
@Override
protected WriteSupport<Group> getWriteSupport(Configuration conf) {
return new TalendGroupWriteSupport(type, extraMetaData);
}
}
}

Some files were not shown because too many files have changed in this diff Show More