diff --git a/testdata/TableFlattener/.gitignore b/testdata/TableFlattener/.gitignore
new file mode 100644
index 000000000..0d3663d8e
--- /dev/null
+++ b/testdata/TableFlattener/.gitignore
@@ -0,0 +1,6 @@
+# Intellij
+.idea
+*.iml
+
+# Maven
+target
diff --git a/testdata/TableFlattener/README b/testdata/TableFlattener/README
new file mode 100644
index 000000000..96c7b83fc
--- /dev/null
+++ b/testdata/TableFlattener/README
@@ -0,0 +1,22 @@
+This is a tool to convert a nested dataset to an unnested dataset. The source and/or
+destination can be the local file system or HDFS.
+
+Structs get converted to a column (with a long name). Arrays and Maps get converted to
+a table which can be joined with the parent table on id column.
+
+$ mvn exec:java \
+ -Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
+ -Dexec.arguments="file:///tmp/in.parquet,file:///tmp/out,-sfile:///tmp/in.avsc"
+
+$ mvn exec:java \
+ -Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
+ -Dexec.arguments="hdfs://localhost:20500/nested.avro,file://$PWD/unnested"
+
+There are various options to specify the type of input file but the output is always
+parquet/snappy.
+
+For additional help, use the following command:
+$ mvn exec:java \
+ -Dexec.mainClass=org.apache.impala.infra.tableflattener.Main -Dexec.arguments="--help"
+
+This is used by testdata/bin/generate-load-nested.sh.
diff --git a/testdata/TableFlattener/pom.xml b/testdata/TableFlattener/pom.xml
new file mode 100644
index 000000000..ea6d6ff42
--- /dev/null
+++ b/testdata/TableFlattener/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+ 4.0.0
+ org.apache.impala
+ nested-table-flattener
+ Impala Nested Table Flattener
+ 1.0-SNAPSHOT
+ jar
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+ 1.7
+ 1.7
+
+
+
+
+
+
+
+ cloudera-repo-releases
+ https://repository.cloudera.com/artifactory/repo/
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ 2.6.0
+
+
+ org.kitesdk
+ kite-data-core
+ 1.0.0-cdh5.4.1
+
+
+
diff --git a/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FileMigrator.java b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FileMigrator.java
new file mode 100644
index 000000000..88d52fba3
--- /dev/null
+++ b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FileMigrator.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.infra.tableflattener;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.kitesdk.data.Dataset;
+
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class FileMigrator {
+
+ // Migrates data from a nested "src" to a flat "dst".
+ public void migrate(Dataset src, FlattenedSchema dst) {
+ dst.open();
+ try {
+ for (GenericRecord record : src.newReader()) {
+ writeRecord(record, dst);
+ }
+ } finally {
+ dst.close();
+ }
+ }
+
+ private void writeRecord(GenericRecord srcRecord, FlattenedSchema dstDataset) {
+ Record dstRecord = createRecord(null, dstDataset);
+ writeRecordFields(srcRecord, dstRecord, dstDataset, "");
+ dstDataset.write(dstRecord);
+ }
+
+ private Record createRecord(Long dstParentId, FlattenedSchema dstDataset) {
+ Record record = new Record(dstDataset.getDataset().getDescriptor().getSchema());
+ if (dstDataset.getParentIdField() != null) {
+ Preconditions.checkNotNull(dstParentId);
+ record.put(dstDataset.getParentIdField().name(), dstParentId);
+ }
+ Field idField = record.getSchema().getField(dstDataset.getIdFieldName());
+ if (idField != null) record.put(idField.name(), dstDataset.nextId());
+ return record;
+ }
+
+ private void writeRecordFields(GenericRecord srcRecord, Record dstRecord,
+ FlattenedSchema dstDataset, String fieldNamePrefix) {
+ for (Field field : srcRecord.getSchema().getFields()) {
+ Object value;
+ if (SchemaUtil.recordHasField(srcRecord, field.name())) {
+ value = srcRecord.get(field.name());
+ } else {
+ Preconditions.checkNotNull(field.defaultValue());
+ value = GenericData.get().getDefaultValue(field);
+ }
+ writeValue(value, field.schema(), field.name(), dstRecord, dstDataset,
+ fieldNamePrefix);
+ }
+ }
+
+ private void writeValue(Object srcValue, Schema srcSchema, String srcFieldName,
+ Record dstRecord, FlattenedSchema dstDataset, String fieldNamePrefix) {
+ String dstFieldName = fieldNamePrefix + (srcFieldName == null ?
+ dstDataset.getCollectionValueFieldName() : srcFieldName);
+ if (!SchemaUtil.schemaHasNesting(srcSchema)) {
+ dstRecord.put(dstFieldName, srcValue);
+ return;
+ }
+
+ if (SchemaUtil.isNullable(srcSchema)) {
+ dstRecord.put(dstDataset.getIsNullFieldName(dstFieldName), (srcValue == null));
+ if (srcValue == null) return;
+ if (srcSchema.getType() == Type.UNION) {
+ srcSchema = srcSchema.getTypes().get(
+ GenericData.get().resolveUnion(srcSchema, srcValue));
+ }
+ }
+
+ if (!SchemaUtil.requiresChildDataset(srcSchema)) {
+ writeRecordFields((GenericRecord)srcValue, dstRecord, dstDataset,
+ fieldNamePrefix
+ + (srcFieldName == null ?
+ dstDataset.getCollectionValueFieldName() : srcFieldName)
+ + dstDataset.getNameSeparator());
+ return;
+ }
+
+ Long dstParentId = (Long)dstRecord.get(dstDataset.getIdFieldName());
+ Preconditions.checkNotNull(dstParentId);
+ FlattenedSchema childDataset = (srcFieldName == null) ?
+ dstDataset.getChildOfCollection() : dstDataset.getChildOfRecord(srcFieldName);
+ if (srcSchema.getType() == Type.ARRAY) {
+ writeArray((List) srcValue, srcSchema.getElementType(), dstParentId, childDataset);
+ } else {
+ Preconditions.checkState(srcSchema.getType() == Type.MAP);
+ writeMap((Map) srcValue, srcSchema.getValueType(), dstParentId, childDataset);
+ }
+ }
+
+ private void writeArray(List srcValues, Schema srcSchema, Long dstParentId,
+ FlattenedSchema dstDataset) {
+ for (ListIterator it = srcValues.listIterator(); it.hasNext(); ) {
+ Object value = it.next();
+ Record record = createRecord(dstParentId, dstDataset);
+ record.put(dstDataset.getArrayIdxFieldName(), (long)it.previousIndex());
+ writeValue(value, srcSchema, null, record, dstDataset, "");
+ dstDataset.write(record);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void writeMap(Map srcValues, Schema srcSchema, Long dstParentId,
+ FlattenedSchema dstDataset) {
+ for (Entry entry : (Set>)srcValues.entrySet()) {
+ Record record = createRecord(dstParentId, dstDataset);
+ record.put(dstDataset.getMapKeyFieldName(), entry.getKey());
+ writeValue(entry.getValue(), srcSchema, null, record, dstDataset, "");
+ dstDataset.write(record);
+ }
+ }
+}
diff --git a/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FlattenedSchema.java b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FlattenedSchema.java
new file mode 100644
index 000000000..b87511020
--- /dev/null
+++ b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/FlattenedSchema.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.infra.tableflattener;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericRecord;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetWriter;
+
+import java.util.Map;
+
+// This class contains information about how unnested datasets are related.
+public class FlattenedSchema {
+
+ // If a dataset has nesting, an id field will be created so that records in the child
+ // dataset can reference the parent.
+ private long referenceId;
+
+ // If this dataset has a parent, the parentIdField_ indicates which field in this
+ // dataset is the foreign key to the parent dataset. If this dataset does not have a
+ // parent, this should be null??
+ private Field parentIdField_;
+
+ // The name of the data set, mainly used to find a child dataset.
+ private String name_;
+ private Map childrenByName_ = Maps.newHashMap();
+
+ // The actual dataset object.
+ private Dataset dataset_;
+ private DatasetWriter datasetWriter_;
+
+ private final String idFieldName_ = "id";
+
+ public FlattenedSchema(String name) {
+ name_ = name;
+ referenceId = 0;
+ }
+
+ public FlattenedSchema(String name, FlattenedSchema parent) {
+ this(name);
+ parent.childrenByName_.put(name, this);
+ }
+
+ // Opens this dataset and all children for writing.
+ public void open() {
+ if (datasetWriter_ != null) return;
+ datasetWriter_ = dataset_.newWriter();
+ for (FlattenedSchema child : childrenByName_.values()) {
+ child.open();
+ }
+ }
+
+ // Write a record to this dataset.
+ public void write(GenericRecord record) {
+ Preconditions.checkNotNull(datasetWriter_, "open() must be called before writing");
+ datasetWriter_.write(record);
+ }
+
+ // Close this dataset and all children.
+ public void close() {
+ if (datasetWriter_ == null) return;
+ datasetWriter_.close();
+ for (FlattenedSchema child : childrenByName_.values()) {
+ child.close();
+ }
+ datasetWriter_ = null;
+ }
+
+ // Generates a new id for a new record in this dataset.
+ public Long nextId() { return ++referenceId; }
+
+ // Get the name to use when creating an id field.
+ public String getIdFieldName() { return idFieldName_; }
+
+ // Get the name of the field used to store the values of an array or map.
+ public String getCollectionValueFieldName() { return "value"; }
+
+ // Get the name of the field used to store the index of an array value..
+ public String getArrayIdxFieldName() { return "idx"; }
+
+ // Get the name of the field used to store the key of a map entry.
+ public String getMapKeyFieldName() { return "key"; }
+
+ // Get the name of a child dataset if this dataset corresponds to an array or map.
+ public String getChildOfCollectionName() {
+ return name_ + getNameSeparator() + "_values";
+ }
+
+ public String getIsNullFieldName(String fieldName) { return fieldName + "_is_null"; }
+
+ // Get the separator when concatenating field or dataset names.
+ public static String getNameSeparator() { return "_"; }
+
+ // Get the child of this dataset if this dataset corresponds to an array or map.
+ public FlattenedSchema getChildOfCollection() {
+ FlattenedSchema child = childrenByName_.get(getChildOfCollectionName());
+ Preconditions.checkNotNull(child);
+ return child;
+ }
+
+ // Get the name of a child dataset if this dataset corresponds to a record.
+ public String getChildOfRecordName(String parentFieldName) {
+ return name_ + getNameSeparator() + parentFieldName;
+ }
+
+ // Get the child of this dataset if this dataset corresponds to a record.
+ public FlattenedSchema getChildOfRecord(String parentFieldName) {
+ FlattenedSchema child = childrenByName_.get(getChildOfRecordName(parentFieldName));
+ Preconditions.checkNotNull(child);
+ return child;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(dataset_.getDescriptor().getSchema().toString(true));
+ for (FlattenedSchema child : childrenByName_.values()) {
+ builder.append("\n\nChild: ")
+ .append(child.name_)
+ .append("\n")
+ .append(child.toString());
+
+ }
+ return builder.toString();
+ }
+
+ public String getName() { return name_; }
+
+ public Field getParentIdField() { return parentIdField_; }
+
+ public void setParentIdField(Field parentIdField) {
+ parentIdField_ = parentIdField;
+ }
+
+ public Dataset getDataset() { return dataset_; }
+
+ public void setDataset(Dataset dataset) { dataset_ = dataset; }
+}
diff --git a/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/Main.java b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/Main.java
new file mode 100644
index 000000000..68643c50a
--- /dev/null
+++ b/testdata/TableFlattener/src/main/java/org/apache/impala/infra/tableflattener/Main.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.infra.tableflattener;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.kitesdk.data.CompressionType;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Format;
+import org.kitesdk.data.Formats;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class Main {
+
+ Options cliOptions_;
+ DatasetDescriptor datasetDescr_;
+
+ // The dir to write the flat datasets to. The dir should either not exist or be
+ // empty. The URI can either point to a local dir or an HDFS dir.
+ URI outputDir_;
+ CommandLine commandLine_;
+
+ @SuppressWarnings("static-access")
+ void parseArgs(String[] args) throws ParseException, IOException {
+ cliOptions_ = new Options();
+ cliOptions_.addOption(OptionBuilder.withLongOpt("help").create("h"));
+ cliOptions_.addOption(OptionBuilder
+ .hasArg()
+ .withLongOpt("input-data-format")
+ .withDescription("The format of the input file. Ex, avro")
+ .create("f"));
+ cliOptions_.addOption(OptionBuilder
+ .hasArg()
+ .withLongOpt("input-data-compression")
+ .withDescription("The compression type of the input file. Ex, snappy")
+ .create("c"));
+ cliOptions_.addOption(OptionBuilder
+ .hasArg()
+ .withLongOpt("input-schema-uri")
+ .withDescription("The URI of the input file's schema. Ex, file://foo.avsc")
+ .create("s"));
+ CommandLineParser parser = new PosixParser();
+ commandLine_ = parser.parse(cliOptions_, args);
+
+ if (commandLine_.hasOption("h")) printHelp();
+
+ DatasetDescriptor.Builder datasetDescrBuilder = new DatasetDescriptor.Builder();
+
+ String[] dataArgs = commandLine_.getArgs();
+ if (dataArgs.length != 2) {
+ printHelp("Exactly two arguments are required");
+ }
+
+ URI dataFile = URI.create(dataArgs[0]);
+ outputDir_ = URI.create(dataArgs[1]);
+ datasetDescrBuilder.location(dataFile);
+
+ Format inputFormat;
+ if (commandLine_.hasOption("f")) {
+ inputFormat = Formats.fromString(commandLine_.getOptionValue("f"));
+ } else {
+ String dataFilePath = dataFile.getPath();
+ if (dataFilePath == null || dataFilePath.isEmpty()) {
+ printHelp("Data file URI is missing a path component: " + dataFile.toString());
+ }
+ String ext = FilenameUtils.getExtension(dataFilePath);
+ if (ext.isEmpty()) {
+ printHelp("The file format (-f) must be specified");
+ }
+ inputFormat = Formats.fromString(ext);
+ }
+ datasetDescrBuilder.format(inputFormat);
+
+ if (commandLine_.hasOption("c")) {
+ datasetDescrBuilder.compressionType(
+ CompressionType.forName(commandLine_.getOptionValue("c")));
+ }
+
+ if (commandLine_.hasOption("s")) {
+ datasetDescrBuilder.schemaUri(commandLine_.getOptionValue("s"));
+ } else if (inputFormat == Formats.AVRO) {
+ datasetDescrBuilder.schemaFromAvroDataFile(dataFile);
+ } else if (inputFormat == Formats.PARQUET) {
+ ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(
+ new Configuration(), new org.apache.hadoop.fs.Path(dataFile));
+ datasetDescrBuilder.schema(new AvroSchemaConverter().convert(
+ parquetMetadata.getFileMetaData().getSchema()));
+ } else {
+ printHelp("A schema (-s) is required for data format " + inputFormat.getName());
+ }
+
+ datasetDescr_ = datasetDescrBuilder.build();
+ }
+
+ void printHelp() { printHelp(""); }
+
+ void printHelp(String errorMessage) {
+ PrintWriter printer = new PrintWriter(
+ errorMessage.isEmpty() ? System.out : System.err);
+ if (!errorMessage.isEmpty()) printer.println("Error: " + errorMessage + "\n");
+ printer.println("Usage: [options]