IMPALA-5527: Add nested testdata flattener

The TableFlattener takes a nested dataset and creates an equivalent
unnested dataset. The unnested dataset is saved as Parquet.

When an array or map is encountered in the original table, the flattener
creates a new table and adds an id column to it which references the row
in the parent table. Joining on the id column should produce the
original dataset.

The flattened dataset should be loaded into Postgres in order to run the
query generator (in nested types mode) on it. There is a script that
automates generaration, flattening and loading random data into Postgres
and Impala:
  testdata/bin/generate-load-nested.sh -f

Testing:
- ran ./testdata/bin/generate-load-nested.sh -f and random nested data
  was generated and flattened as expected.

Change-Id: I7e7a8e53ada9274759a3e2128b97bec292c129c6
Reviewed-on: http://gerrit.cloudera.org:8080/5787
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
Taras Bobrovytsky
2017-01-24 14:54:03 -08:00
committed by Impala Public Jenkins
parent a07253a814
commit bd6d2df730
9 changed files with 863 additions and 0 deletions

6
testdata/TableFlattener/.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
# Intellij
.idea
*.iml
# Maven
target

22
testdata/TableFlattener/README vendored Normal file
View File

@@ -0,0 +1,22 @@
This is a tool to convert a nested dataset to an unnested dataset. The source and/or
destination can be the local file system or HDFS.
Structs get converted to a column (with a long name). Arrays and Maps get converted to
a table which can be joined with the parent table on id column.
$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
-Dexec.arguments="file:///tmp/in.parquet,file:///tmp/out,-sfile:///tmp/in.avsc"
$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main \
-Dexec.arguments="hdfs://localhost:20500/nested.avro,file://$PWD/unnested"
There are various options to specify the type of input file but the output is always
parquet/snappy.
For additional help, use the following command:
$ mvn exec:java \
-Dexec.mainClass=org.apache.impala.infra.tableflattener.Main -Dexec.arguments="--help"
This is used by testdata/bin/generate-load-nested.sh.

62
testdata/TableFlattener/pom.xml vendored Normal file
View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.impala</groupId>
<artifactId>nested-table-flattener</artifactId>
<name>Impala Nested Table Flattener</name>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>cloudera-repo-releases</id>
<url>https://repository.cloudera.com/artifactory/repo/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>1.0.0-cdh5.4.1</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.data.Dataset;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
public class FileMigrator {
// Migrates data from a nested "src" to a flat "dst".
public void migrate(Dataset<GenericRecord> src, FlattenedSchema dst) {
dst.open();
try {
for (GenericRecord record : src.newReader()) {
writeRecord(record, dst);
}
} finally {
dst.close();
}
}
private void writeRecord(GenericRecord srcRecord, FlattenedSchema dstDataset) {
Record dstRecord = createRecord(null, dstDataset);
writeRecordFields(srcRecord, dstRecord, dstDataset, "");
dstDataset.write(dstRecord);
}
private Record createRecord(Long dstParentId, FlattenedSchema dstDataset) {
Record record = new Record(dstDataset.getDataset().getDescriptor().getSchema());
if (dstDataset.getParentIdField() != null) {
Preconditions.checkNotNull(dstParentId);
record.put(dstDataset.getParentIdField().name(), dstParentId);
}
Field idField = record.getSchema().getField(dstDataset.getIdFieldName());
if (idField != null) record.put(idField.name(), dstDataset.nextId());
return record;
}
private void writeRecordFields(GenericRecord srcRecord, Record dstRecord,
FlattenedSchema dstDataset, String fieldNamePrefix) {
for (Field field : srcRecord.getSchema().getFields()) {
Object value;
if (SchemaUtil.recordHasField(srcRecord, field.name())) {
value = srcRecord.get(field.name());
} else {
Preconditions.checkNotNull(field.defaultValue());
value = GenericData.get().getDefaultValue(field);
}
writeValue(value, field.schema(), field.name(), dstRecord, dstDataset,
fieldNamePrefix);
}
}
private void writeValue(Object srcValue, Schema srcSchema, String srcFieldName,
Record dstRecord, FlattenedSchema dstDataset, String fieldNamePrefix) {
String dstFieldName = fieldNamePrefix + (srcFieldName == null ?
dstDataset.getCollectionValueFieldName() : srcFieldName);
if (!SchemaUtil.schemaHasNesting(srcSchema)) {
dstRecord.put(dstFieldName, srcValue);
return;
}
if (SchemaUtil.isNullable(srcSchema)) {
dstRecord.put(dstDataset.getIsNullFieldName(dstFieldName), (srcValue == null));
if (srcValue == null) return;
if (srcSchema.getType() == Type.UNION) {
srcSchema = srcSchema.getTypes().get(
GenericData.get().resolveUnion(srcSchema, srcValue));
}
}
if (!SchemaUtil.requiresChildDataset(srcSchema)) {
writeRecordFields((GenericRecord)srcValue, dstRecord, dstDataset,
fieldNamePrefix
+ (srcFieldName == null ?
dstDataset.getCollectionValueFieldName() : srcFieldName)
+ dstDataset.getNameSeparator());
return;
}
Long dstParentId = (Long)dstRecord.get(dstDataset.getIdFieldName());
Preconditions.checkNotNull(dstParentId);
FlattenedSchema childDataset = (srcFieldName == null) ?
dstDataset.getChildOfCollection() : dstDataset.getChildOfRecord(srcFieldName);
if (srcSchema.getType() == Type.ARRAY) {
writeArray((List) srcValue, srcSchema.getElementType(), dstParentId, childDataset);
} else {
Preconditions.checkState(srcSchema.getType() == Type.MAP);
writeMap((Map) srcValue, srcSchema.getValueType(), dstParentId, childDataset);
}
}
private void writeArray(List srcValues, Schema srcSchema, Long dstParentId,
FlattenedSchema dstDataset) {
for (ListIterator it = srcValues.listIterator(); it.hasNext(); ) {
Object value = it.next();
Record record = createRecord(dstParentId, dstDataset);
record.put(dstDataset.getArrayIdxFieldName(), (long)it.previousIndex());
writeValue(value, srcSchema, null, record, dstDataset, "");
dstDataset.write(record);
}
}
@SuppressWarnings("unchecked")
private void writeMap(Map srcValues, Schema srcSchema, Long dstParentId,
FlattenedSchema dstDataset) {
for (Entry<String, Object> entry : (Set<Entry<String, Object>>)srcValues.entrySet()) {
Record record = createRecord(dstParentId, dstDataset);
record.put(dstDataset.getMapKeyFieldName(), entry.getKey());
writeValue(entry.getValue(), srcSchema, null, record, dstDataset, "");
dstDataset.write(record);
}
}
}

View File

@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetWriter;
import java.util.Map;
// This class contains information about how unnested datasets are related.
public class FlattenedSchema {
// If a dataset has nesting, an id field will be created so that records in the child
// dataset can reference the parent.
private long referenceId;
// If this dataset has a parent, the parentIdField_ indicates which field in this
// dataset is the foreign key to the parent dataset. If this dataset does not have a
// parent, this should be null??
private Field parentIdField_;
// The name of the data set, mainly used to find a child dataset.
private String name_;
private Map<String, FlattenedSchema> childrenByName_ = Maps.newHashMap();
// The actual dataset object.
private Dataset<GenericRecord> dataset_;
private DatasetWriter<GenericRecord> datasetWriter_;
private final String idFieldName_ = "id";
public FlattenedSchema(String name) {
name_ = name;
referenceId = 0;
}
public FlattenedSchema(String name, FlattenedSchema parent) {
this(name);
parent.childrenByName_.put(name, this);
}
// Opens this dataset and all children for writing.
public void open() {
if (datasetWriter_ != null) return;
datasetWriter_ = dataset_.newWriter();
for (FlattenedSchema child : childrenByName_.values()) {
child.open();
}
}
// Write a record to this dataset.
public void write(GenericRecord record) {
Preconditions.checkNotNull(datasetWriter_, "open() must be called before writing");
datasetWriter_.write(record);
}
// Close this dataset and all children.
public void close() {
if (datasetWriter_ == null) return;
datasetWriter_.close();
for (FlattenedSchema child : childrenByName_.values()) {
child.close();
}
datasetWriter_ = null;
}
// Generates a new id for a new record in this dataset.
public Long nextId() { return ++referenceId; }
// Get the name to use when creating an id field.
public String getIdFieldName() { return idFieldName_; }
// Get the name of the field used to store the values of an array or map.
public String getCollectionValueFieldName() { return "value"; }
// Get the name of the field used to store the index of an array value..
public String getArrayIdxFieldName() { return "idx"; }
// Get the name of the field used to store the key of a map entry.
public String getMapKeyFieldName() { return "key"; }
// Get the name of a child dataset if this dataset corresponds to an array or map.
public String getChildOfCollectionName() {
return name_ + getNameSeparator() + "_values";
}
public String getIsNullFieldName(String fieldName) { return fieldName + "_is_null"; }
// Get the separator when concatenating field or dataset names.
public static String getNameSeparator() { return "_"; }
// Get the child of this dataset if this dataset corresponds to an array or map.
public FlattenedSchema getChildOfCollection() {
FlattenedSchema child = childrenByName_.get(getChildOfCollectionName());
Preconditions.checkNotNull(child);
return child;
}
// Get the name of a child dataset if this dataset corresponds to a record.
public String getChildOfRecordName(String parentFieldName) {
return name_ + getNameSeparator() + parentFieldName;
}
// Get the child of this dataset if this dataset corresponds to a record.
public FlattenedSchema getChildOfRecord(String parentFieldName) {
FlattenedSchema child = childrenByName_.get(getChildOfRecordName(parentFieldName));
Preconditions.checkNotNull(child);
return child;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(dataset_.getDescriptor().getSchema().toString(true));
for (FlattenedSchema child : childrenByName_.values()) {
builder.append("\n\nChild: ")
.append(child.name_)
.append("\n")
.append(child.toString());
}
return builder.toString();
}
public String getName() { return name_; }
public Field getParentIdField() { return parentIdField_; }
public void setParentIdField(Field parentIdField) {
parentIdField_ = parentIdField;
}
public Dataset getDataset() { return dataset_; }
public void setDataset(Dataset<GenericRecord> dataset) { dataset_ = dataset; }
}

View File

@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import parquet.avro.AvroSchemaConverter;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
public class Main {
Options cliOptions_;
DatasetDescriptor datasetDescr_;
// The dir to write the flat datasets to. The dir should either not exist or be
// empty. The URI can either point to a local dir or an HDFS dir.
URI outputDir_;
CommandLine commandLine_;
@SuppressWarnings("static-access")
void parseArgs(String[] args) throws ParseException, IOException {
cliOptions_ = new Options();
cliOptions_.addOption(OptionBuilder.withLongOpt("help").create("h"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-data-format")
.withDescription("The format of the input file. Ex, avro")
.create("f"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-data-compression")
.withDescription("The compression type of the input file. Ex, snappy")
.create("c"));
cliOptions_.addOption(OptionBuilder
.hasArg()
.withLongOpt("input-schema-uri")
.withDescription("The URI of the input file's schema. Ex, file://foo.avsc")
.create("s"));
CommandLineParser parser = new PosixParser();
commandLine_ = parser.parse(cliOptions_, args);
if (commandLine_.hasOption("h")) printHelp();
DatasetDescriptor.Builder datasetDescrBuilder = new DatasetDescriptor.Builder();
String[] dataArgs = commandLine_.getArgs();
if (dataArgs.length != 2) {
printHelp("Exactly two arguments are required");
}
URI dataFile = URI.create(dataArgs[0]);
outputDir_ = URI.create(dataArgs[1]);
datasetDescrBuilder.location(dataFile);
Format inputFormat;
if (commandLine_.hasOption("f")) {
inputFormat = Formats.fromString(commandLine_.getOptionValue("f"));
} else {
String dataFilePath = dataFile.getPath();
if (dataFilePath == null || dataFilePath.isEmpty()) {
printHelp("Data file URI is missing a path component: " + dataFile.toString());
}
String ext = FilenameUtils.getExtension(dataFilePath);
if (ext.isEmpty()) {
printHelp("The file format (-f) must be specified");
}
inputFormat = Formats.fromString(ext);
}
datasetDescrBuilder.format(inputFormat);
if (commandLine_.hasOption("c")) {
datasetDescrBuilder.compressionType(
CompressionType.forName(commandLine_.getOptionValue("c")));
}
if (commandLine_.hasOption("s")) {
datasetDescrBuilder.schemaUri(commandLine_.getOptionValue("s"));
} else if (inputFormat == Formats.AVRO) {
datasetDescrBuilder.schemaFromAvroDataFile(dataFile);
} else if (inputFormat == Formats.PARQUET) {
ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(
new Configuration(), new org.apache.hadoop.fs.Path(dataFile));
datasetDescrBuilder.schema(new AvroSchemaConverter().convert(
parquetMetadata.getFileMetaData().getSchema()));
} else {
printHelp("A schema (-s) is required for data format " + inputFormat.getName());
}
datasetDescr_ = datasetDescrBuilder.build();
}
void printHelp() { printHelp(""); }
void printHelp(String errorMessage) {
PrintWriter printer = new PrintWriter(
errorMessage.isEmpty() ? System.out : System.err);
if (!errorMessage.isEmpty()) printer.println("Error: " + errorMessage + "\n");
printer.println("Usage: [options] <input uri> <output uri>\n\n" +
"input uri The URI to the input file.\n" +
" Ex, file:///foo.avro or hdfs://localhost:20500/foo.avro\n" +
"output uri The URI to the output directory. The dir must either not\n" +
" exist or it must be empty.\n" +
" Ex, file:///bar or hdfs://localhost:20500/bar\n\n" +
"Options:");
new HelpFormatter().printOptions(printer, 80, cliOptions_, 1 , 3);
printer.close();
System.exit(errorMessage.isEmpty() ? 0 : 1);
}
void exec(String[] args) throws ParseException, IOException {
Logger.getRootLogger().setLevel(Level.OFF);
parseArgs(args);
SchemaFlattener schemaFlattener = new SchemaFlattener(outputDir_);
FlattenedSchema rootDataset =
schemaFlattener.flatten(datasetDescr_.getSchema());
Path tempDatasetPath = Files.createTempDirectory(null);
try {
Dataset<GenericRecord> srcDataset = Datasets.create(
"dataset:file:" + tempDatasetPath.toString(), datasetDescr_);
FileMigrator migrator = new FileMigrator();
migrator.migrate(srcDataset, rootDataset);
} finally {
FileUtils.deleteDirectory(tempDatasetPath.toFile());
}
}
public static void main(String[] args) throws Exception {
new Main().exec(args);
}
}

View File

@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Formats;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
public class SchemaFlattener {
// The dir to write the flat datasets to. The dir should either not exist or be
// empty. The URI can either point to a local dir or an HDFS dir.
URI outputDir_;
public SchemaFlattener(URI outputDir) { outputDir_ = outputDir; }
// Creates a flattened schema but does not migrate any data.
public FlattenedSchema flatten(Schema srcSchema) {
Preconditions.checkState(srcSchema.getType() == Type.RECORD);
FlattenedSchema dstDataset = new FlattenedSchema(srcSchema.getName());
LinkedList<Field> fields = Lists.newLinkedList();
addRecordFields(srcSchema, dstDataset, fields, "");
finishCreatingDataset(fields, dstDataset);
return dstDataset;
}
private void addRecordFields(Schema srcSchema, FlattenedSchema dstDataset,
LinkedList<Field> dstSchemaFields, String fieldNamePrefix) {
Preconditions.checkState(srcSchema.getType() == Type.RECORD);
for (Field field : srcSchema.getFields()) {
Schema fieldSchema = field.schema();
if (SchemaUtil.isSimpleType(fieldSchema)) {
dstSchemaFields.add(SchemaUtil.createField(fieldNamePrefix + field.name(),
fieldSchema, field.doc(), field.defaultValue()));
continue;
}
if (SchemaUtil.isNullable(fieldSchema)) {
dstSchemaFields.add(SchemaUtil.createField(
fieldNamePrefix + dstDataset.getIsNullFieldName(field.name()), Type.BOOLEAN));
fieldSchema = SchemaUtil.reduceUnionToNonNull(fieldSchema);
}
if (SchemaUtil.requiresChildDataset(fieldSchema)) {
createChildDataset(dstDataset.getChildOfRecordName(field.name()), fieldSchema,
dstSchemaFields, dstDataset);
} else {
addRecordFields(fieldSchema, dstDataset, dstSchemaFields,
fieldNamePrefix + field.name() + dstDataset.getNameSeparator());
}
}
}
private void createChildDataset(String name, Schema srcSchema,
LinkedList<Field> parentFields, FlattenedSchema parentDataset) {
// Ensure that the parent schema has an id field so the child can reference the
// parent. A single id field is sufficient.
if (parentFields.isEmpty()
|| !parentFields.getFirst().name().equals(parentDataset.getIdFieldName())) {
parentFields.addFirst(SchemaUtil.createField(
parentDataset.getIdFieldName(), Type.LONG));
}
FlattenedSchema childDataset = new FlattenedSchema(name, parentDataset);
LinkedList<Field> fields = Lists.newLinkedList();
String parentIdFieldName = parentDataset.getName() + childDataset.getNameSeparator()
+ childDataset.getIdFieldName();
Field parentIdField = SchemaUtil.createField(parentIdFieldName, Type.LONG);
childDataset.setParentIdField(parentIdField);
fields.add(parentIdField);
Schema valueSchema;
if (srcSchema.getType() == Type.ARRAY) {
fields.add(
SchemaUtil.createField(childDataset.getArrayIdxFieldName(), Type.LONG));
valueSchema = srcSchema.getElementType();
} else {
Preconditions.checkState(srcSchema.getType() == Type.MAP);
fields.add(
SchemaUtil.createField(childDataset.getMapKeyFieldName(), Type.STRING));
valueSchema = srcSchema.getValueType();
}
if (SchemaUtil.isSimpleType(valueSchema)) {
fields.add(SchemaUtil.createField(
childDataset.getCollectionValueFieldName(), valueSchema));
} else {
if (SchemaUtil.isNullable(valueSchema)) {
fields.add(SchemaUtil.createField(childDataset.getIsNullFieldName(
childDataset.getCollectionValueFieldName()), Type.BOOLEAN));
valueSchema = SchemaUtil.reduceUnionToNonNull(valueSchema);
}
if (SchemaUtil.requiresChildDataset(valueSchema)) {
createChildDataset(childDataset.getChildOfCollectionName(), valueSchema, fields,
childDataset);
} else {
addRecordFields(valueSchema, childDataset, fields,
childDataset.getCollectionValueFieldName() + childDataset.getNameSeparator());
}
}
finishCreatingDataset(fields, childDataset);
}
private void finishCreatingDataset(List<Field> fields, FlattenedSchema dataset) {
Schema childSchema = Schema.createRecord(dataset.getName(), null, null, false);
for (Field field : fields) {
Preconditions.checkState(!SchemaUtil.schemaHasNesting(field.schema()));
}
childSchema.setFields(fields);
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.format(Formats.PARQUET)
.schema(childSchema)
.build();
dataset.setDataset((Dataset<GenericRecord>)Datasets.create(
"dataset:" + createDir(dataset.getName()), descriptor));
}
private URI createDir(String name) {
try {
switch (outputDir_.getScheme().toUpperCase()) {
case "FILE": {
Path datasetPath = Paths.get(outputDir_).resolve(name);
datasetPath.toFile().mkdirs();
return datasetPath.toUri();
}
case "HDFS": {
org.apache.hadoop.fs.Path outputDirPath
= new org.apache.hadoop.fs.Path(outputDir_);
org.apache.hadoop.fs.Path datasetPath
= new org.apache.hadoop.fs.Path(outputDirPath, name);
outputDirPath.getFileSystem(new Configuration()).mkdirs(datasetPath);
return datasetPath.toUri();
}
default:
throw new NotImplementedException(String.format(
"Unexpected output dir scheme: %s", outputDir_.getScheme()));
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}

View File

@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.infra.tableflattener;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.codehaus.jackson.JsonNode;
import java.util.Map;
public class SchemaUtil {
// Used to validate unions. This is a substitution map for comparison purposes.
static final Map<Type, Type> BASE_TYPES = ImmutableMap.<Type, Type>builder()
.put(Type.STRING, Type.BYTES)
.put(Type.FIXED, Type.BYTES)
.put(Type.DOUBLE, Type.INT)
.put(Type.FLOAT, Type.INT)
.put(Type.LONG, Type.INT)
.build();
static Field createField(String name, Type type) {
return createField(name, type, null, null);
}
static Field createField(String name, Type type, String doc, JsonNode defaultValue) {
return new Field(name, Schema.createUnion(
Schema.create(Type.NULL), Schema.create(type)), doc, defaultValue);
}
static Field createField(String name, Schema schema) {
return createField(name, schema, null, null);
}
static Field createField(String name, Schema schema, String doc,
JsonNode defaultValue) {
Preconditions.checkState(!schemaHasNesting(schema));
if (schema.getType() == Type.UNION) {
return new Field(name, Schema.createUnion(schema.getTypes()), doc, defaultValue);
}
return createField(name, schema.getType(), doc, defaultValue);
}
static boolean recordHasField(GenericRecord record, String fieldName) {
return record.getSchema().getField(fieldName) != null;
}
static Schema reduceUnionToNonNull(Schema unionSchema) {
Schema reducedSchema = null;
Type reducedBaseType = null;
for (Schema schema : unionSchema.getTypes()) {
if (schema.getType() == Type.NULL) continue;
String logicalType = schema.getProp("logicalType");
Type baseType;
if (logicalType == null) {
baseType = BASE_TYPES.containsKey(schema.getType()) ?
BASE_TYPES.get(schema.getType()) : schema.getType();
} else {
Preconditions.checkState(logicalType.equals("decimal"));
baseType = Type.INT;
}
if (reducedBaseType == null) {
reducedSchema = schema;
reducedBaseType = baseType;
continue;
}
if (reducedBaseType != baseType) {
throw new RuntimeException(String.format(
"Union contains incompatible types: %s",
Joiner.on(" ,").join(unionSchema.getTypes())));
}
}
if (reducedSchema == null) {
throw new RuntimeException(String.format(
"Union schema contains no non-null types: %s",
Joiner.on(" ,").join(unionSchema.getTypes())));
}
return reducedSchema;
}
static boolean isNullable(Schema schema) {
return schema.getType() == Type.NULL
|| (schema.getType() == Type.UNION && unionIsNullable(schema));
}
static boolean unionIsNullable(Schema unionSchema) {
for (Schema schema : unionSchema.getTypes()) {
if (schema.getType() == Type.NULL) return true;
}
return false;
}
static boolean isComplexType(Type type) {
Preconditions.checkState(type != Type.UNION);
return type == Type.ARRAY || type == Type.MAP || type == Type.RECORD;
}
static boolean isSimpleType(Schema schema) {
if (schema.getType() == Type.UNION) schema = reduceUnionToNonNull(schema);
return !isComplexType(schema.getType());
}
static boolean requiresChildDataset(Schema schema) {
if (schema.getType() == Type.UNION) schema = reduceUnionToNonNull(schema);
return schema.getType() == Type.ARRAY || schema.getType() == Type.MAP;
}
static boolean schemaHasNesting(Schema schema) {
if (schema.getType() == Type.UNION) schema = reduceUnionToNonNull(schema);
return isComplexType(schema.getType());
}
}

View File

@@ -53,6 +53,10 @@ while getopts ":n:e:i:f" OPTION; do
esac
done
if $FLATTEN; then
mvn -f "${IMPALA_HOME}/testdata/TableFlattener/pom.xml" package;
fi
RANDOM_SCHEMA_GENERATOR=${IMPALA_HOME}/testdata/bin/random_avro_schema.py;
HDFS_DIR=/test-warehouse/random_nested_data