IMPALA-2835: introduce PARQUET_FALLBACK_SCHEMA_RESOLUTION query option

This patch introduces a new query option,
PARQUET_FALLBACK_SCHEMA_RESOLUTION which allows Parquet files' schemas
to be resolved by either name or position.  It's "fallback" because
eventually field IDs will be the primary schema resolution scheme, and
we don't want to create an option that we will have to change the name
of later. The default is still by position. I chose to do a query
option because it will make testing easier and also be easier to
diagnose resolution problems quickly in the field. If users want to
switch the default behavior to be by name (like Hive), they can use
the --default_query_options flag.

This patch also introduces a new test section, SHELL, which can be
used to execute shell commands in a .test file. This is useful for
copying files into test tables.

Change-Id: Id0c715ea23792b2a6872610839a40532aabbb5a6
Reviewed-on: http://gerrit.cloudera.org:8080/2384
Reviewed-by: Skye Wanderman-Milne <skye@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Skye Wanderman-Milne
2016-03-30 16:05:25 -07:00
committed by Internal Jenkins
parent 3f2840f528
commit 9b51b2b6e6
15 changed files with 395 additions and 18 deletions

View File

@@ -1976,7 +1976,7 @@ Status HdfsParquetScanner::ResolvePathHelper(ArrayEncoding array_encoding,
for (int i = 0; i < path.size(); ++i) {
// Advance '*node' if necessary
if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
*node = NextSchemaNode(path, i, *node, missing_field);
*node = NextSchemaNode(col_type, path, i, *node, missing_field);
if (*missing_field) return Status::OK();
} else {
// We just resolved an array, meaning *node is set to the repeated field of the
@@ -2017,22 +2017,79 @@ Status HdfsParquetScanner::ResolvePathHelper(ArrayEncoding array_encoding,
return Status::OK();
}
HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode(const SchemaPath& path,
int next_idx, SchemaNode* node, bool* missing_field) {
HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode(
const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
bool* missing_field) {
DCHECK_LT(next_idx, path.size());
// The first index in a path includes the table's partition keys
int file_idx =
next_idx == 0 ? path[next_idx] - scan_node_->num_partition_keys() : path[next_idx];
if (node->children.size() <= file_idx) {
// The selected field is not in the file
if (next_idx != 0) DCHECK(col_type != NULL);
int file_idx;
int table_idx = path[next_idx];
bool resolve_by_name = state_->query_options().parquet_fallback_schema_resolution ==
TParquetFallbackSchemaResolution::NAME;
if (resolve_by_name) {
if (next_idx == 0) {
// Resolve top-level table column by name.
DCHECK_LT(table_idx, scan_node_->hdfs_table()->col_descs().size());
const string& name = scan_node_->hdfs_table()->col_descs()[table_idx].name();
file_idx = FindChildWithName(node, name);
} else if (col_type->type == TYPE_STRUCT) {
// Resolve struct field by name.
DCHECK_LT(table_idx, col_type->field_names.size());
const string& name = col_type->field_names[table_idx];
file_idx = FindChildWithName(node, name);
} else if (col_type->type == TYPE_ARRAY) {
// Arrays have only one child in the file.
DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
file_idx = table_idx;
} else {
DCHECK_EQ(col_type->type, TYPE_MAP);
// Maps have two values, "key" and "value". These are supposed to be ordered and may
// not have the right field names, but try to resolve by name in case they're
// switched and otherwise use the order. See
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
// more details.
DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
table_idx == SchemaPathConstants::MAP_VALUE);
const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
file_idx = FindChildWithName(node, name);
if (file_idx >= node->children.size()) {
// Couldn't resolve by name, fall back to resolution by position.
file_idx = table_idx;
}
}
} else {
// Resolution by position.
DCHECK_EQ(state_->query_options().parquet_fallback_schema_resolution,
TParquetFallbackSchemaResolution::POSITION);
if (next_idx == 0) {
// For top-level columns, the first index in a path includes the table's partition
// keys.
file_idx = table_idx - scan_node_->num_partition_keys();
} else {
file_idx = table_idx;
}
}
if (file_idx >= node->children.size()) {
VLOG_FILE << Substitute(
"File '$0' does not contain path '$1'", filename(), PrintPath(path));
"File '$0' does not contain path '$1' (resolving by $2)", filename(),
PrintPath(path), resolve_by_name ? "name" : "position");
*missing_field = true;
return NULL;
}
return &node->children[file_idx];
}
int HdfsParquetScanner::FindChildWithName(HdfsParquetScanner::SchemaNode* node,
const string& name) {
int idx;
for (idx = 0; idx < node->children.size(); ++idx) {
if (node->children[idx].element->name == name) break;
}
return idx;
}
// There are three types of array encodings:
//
// 1. One-level encoding

View File

@@ -418,7 +418,7 @@ class HdfsParquetScanner : public HdfsScanner {
/// Version of the application that wrote this file.
FileVersion file_version_;
/// The root schema node for this file
/// The root schema node for this file.
SchemaNode schema_;
/// Scan range for the metadata.
@@ -590,10 +590,15 @@ class HdfsParquetScanner : public HdfsScanner {
/// Helper functions for ResolvePathHelper().
/// Advances 'node' to one of its children based on path[next_idx]. Returns the child
/// node or sets 'missing_field' to true.
SchemaNode* NextSchemaNode(const SchemaPath& path, int next_idx, SchemaNode* node,
bool* missing_field);
/// Advances 'node' to one of its children based on path[next_idx] and
/// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
/// associated with 'node'. Returns the child node or sets 'missing_field' to true.
SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
int next_idx, SchemaNode* node, bool* missing_field);
/// Returns the index of 'node's child with 'name', or the number of children if not
/// found.
int FindChildWithName(SchemaNode* node, const string& name);
/// The ResolvePathHelper() logic for arrays.
Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,

View File

@@ -33,6 +33,7 @@ using boost::algorithm::is_any_of;
using boost::algorithm::token_compress_on;
using boost::algorithm::split;
using boost::algorithm::trim;
using std::to_string;
using namespace impala;
using namespace strings;
@@ -100,6 +101,9 @@ int GetQueryOptionForKey(const string& key) {
return -1;
}
// Note that we allow numerical values for boolean and enum options. This is because
// TQueryOptionsToMap() will output the numerical values, and we need to parse its output
// configuration.
Status impala::SetQueryOption(const string& key, const string& value,
TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) {
int option = GetQueryOptionForKey(key);
@@ -367,6 +371,21 @@ Status impala::SetQueryOption(const string& key, const string& value,
iequals(value, "true") || iequals(value, "1"));
break;
}
case TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION: {
if (iequals(value, "position") ||
iequals(value, to_string(TParquetFallbackSchemaResolution::POSITION))) {
query_options->__set_parquet_fallback_schema_resolution(
TParquetFallbackSchemaResolution::POSITION);
} else if (iequals(value, "name") ||
iequals(value, to_string(TParquetFallbackSchemaResolution::NAME))) {
query_options->__set_parquet_fallback_schema_resolution(
TParquetFallbackSchemaResolution::NAME);
} else {
return Status(Substitute("Invalid PARQUET_FALLBACK_SCHEMA_RESOLUTION option: "
"'$0'. Valid options are 'POSITION' and 'NAME'.", value));
}
break;
}
default:
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
// when we add a new query option.

View File

@@ -32,7 +32,7 @@ class TQueryOptions;
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
TImpalaQueryOptions::PARQUET_ANNOTATE_STRINGS_UTF8 + 1);\
TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -74,7 +74,8 @@ class TQueryOptions;
QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS)\
QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING)\
QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8);
QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION);
/// Converts a TQueryOptions struct into a map of key, value pairs.
void TQueryOptionsToMap(const TQueryOptions& query_options,

View File

@@ -42,6 +42,11 @@ const i32 INVALID_PLAN_NODE_ID = -1
// Constant default partition ID, must be < 0 to avoid collisions
const i64 DEFAULT_PARTITION_ID = -1;
enum TParquetFallbackSchemaResolution {
POSITION,
NAME
}
// Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
// respective defaults. Query options can be set in the following ways:
//
@@ -170,6 +175,11 @@ struct TQueryOptions {
// This is disabled by default in order to preserve the existing behavior of legacy
// workloads. In addition, Impala strings are not necessarily UTF8-encoded.
42: optional bool parquet_annotate_strings_utf8 = false
// Determines how to resolve Parquet files' schemas in the absence of field IDs (which
// is always, since fields IDs are NYI). Valid values are "position" (default) and
// "name".
43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
}
// Impala currently has two types of sessions: Beeswax and HiveServer2

View File

@@ -205,6 +205,10 @@ enum TImpalaQueryOptions {
// If true, use UTF-8 annotation for string columns. Note that char and varchar columns
// always use the annotation.
PARQUET_ANNOTATE_STRINGS_UTF8
// Determines how to resolve Parquet files' schemas in the absence of field IDs (which
// is always, since fields IDs are NYI). Valid values are "position" and "name".
PARQUET_FALLBACK_SCHEMA_RESOLUTION
}
// The summary of an insert.

View File

@@ -0,0 +1,12 @@
switched_map.parq was generated by modifying parquet-mr to switch the key and value fields
of map, and then converting switched_map.json to parquet using switched_map.avsc as the
schema. switched_map.parq has the following schema according to parquet-tools:
message com.cloudera.impala.switched_map {
required group int_map (MAP) {
repeated group map (MAP_KEY_VALUE) {
required int32 value;
required binary key (UTF8);
}
}
}

View File

@@ -0,0 +1,8 @@
{"type": "record",
"namespace": "com.cloudera.impala",
"name": "switched_map",
"fields": [
{"name": "int_map", "type": {"type": "map", "values": "int"}}
]
}

View File

@@ -0,0 +1,4 @@
[
{"int_map": {"a": 1, "b": 2}},
{"int_map": {"c": 3}}
]

Binary file not shown.

View File

@@ -0,0 +1,234 @@
====
---- QUERY
# Create a table and populate with data file
drop table if exists resolution_by_name_test;
create table resolution_by_name_test stored as parquet
as select * from functional_parquet.tinytable;
select a, b from resolution_by_name_test;
---- TYPES
string,string
---- RESULTS
'aaaaaaa','bbbbbbb'
'ccccc','dddd'
'eeeeeeee','f'
====
---- QUERY
# Rearrange the columns and make sure we can still resolve by name
alter table resolution_by_name_test replace columns (b string, a string);
set parquet_fallback_schema_resolution="NAME";
select a, b from resolution_by_name_test;
---- TYPES
string,string
---- RESULTS
'aaaaaaa','bbbbbbb'
'ccccc','dddd'
'eeeeeeee','f'
====
---- QUERY
# Renaming a column will cause the column to not be resolved
alter table resolution_by_name_test change a new_a string;
select new_a from resolution_by_name_test;
---- TYPES
string
---- RESULTS
'NULL'
'NULL'
'NULL'
====
---- QUERY
# Can still resolve by ordinal
set parquet_fallback_schema_resolution="POSITION";
select b, new_a from resolution_by_name_test;
---- TYPES
string,string
---- RESULTS
'aaaaaaa','bbbbbbb'
'ccccc','dddd'
'eeeeeeee','f'
====
---- QUERY
# Check that we can parse the integer enum value as well
set parquet_fallback_schema_resolution=1;
select new_a from resolution_by_name_test;
---- TYPES
string
---- RESULTS
'NULL'
'NULL'
'NULL'
====
---- QUERY
set parquet_fallback_schema_resolution=0;
select b, new_a from resolution_by_name_test;
---- TYPES
string,string
---- RESULTS
'aaaaaaa','bbbbbbb'
'ccccc','dddd'
'eeeeeeee','f'
====
---- QUERY
drop table resolution_by_name_test;
====
---- QUERY
# Test nested types resolution
drop table if exists nested_resolution_by_name_test;
create table nested_resolution_by_name_test like functional_parquet.complextypestbl;
====
---- SHELL
hadoop fs -cp $FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nullable.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
hadoop fs -cp $FILESYSTEM_PREFIX/test-warehouse/complextypestbl_parquet/nonnullable.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/nested_resolution_by_name_test/
====
---- QUERY
select id, nested_struct.a, b.item
from nested_resolution_by_name_test t, t.nested_struct.b
---- TYPES
bigint,int,int
---- RESULTS
1,1,1
2,NULL,NULL
7,7,2
7,7,3
7,7,NULL
8,-1,-1
====
---- QUERY
# Can safely ignore extra fields in nested_struct
alter table nested_resolution_by_name_test change nested_struct nested_struct
struct<a:int, b: array<int>>;
select id, nested_struct.a, b.item
from nested_resolution_by_name_test t, t.nested_struct.b
---- TYPES
bigint,int,int
---- RESULTS
1,1,1
2,NULL,NULL
7,7,2
7,7,3
7,7,NULL
8,-1,-1
====
---- QUERY
# Rearrange nested_struct's fields and make sure we can still resolve by name
alter table nested_resolution_by_name_test change nested_struct nested_struct
struct<b: array<int>, a: int>;
set parquet_fallback_schema_resolution="name";
select id, nested_struct.a, b.item
from nested_resolution_by_name_test t, t.nested_struct.b
---- TYPES
bigint,int,int
---- RESULTS
1,1,1
2,NULL,NULL
7,7,2
7,7,3
7,7,NULL
8,-1,-1
====
---- QUERY
# Can add back a single field
alter table nested_resolution_by_name_test change nested_struct nested_struct
struct<b: array<int>, a: int, g: map<string, struct<h: struct<i: array<float>>>>>;
select id, g.key
from nested_resolution_by_name_test t, t.nested_struct.g
---- TYPES
bigint,string
---- RESULTS
1,'foo'
2,'g1'
2,'g2'
2,'g3'
2,'g4'
2,'g5'
5,'foo'
====
---- QUERY
# Add back single more nested field (and remove 'g' field)
alter table nested_resolution_by_name_test change nested_struct nested_struct
struct<b: array<int>, a: int, c: struct<d: array<array<struct<f: string>>>>>;
select tmp.f from nested_resolution_by_name_test.nested_struct.c.d.item tmp;
---- TYPES
string
---- RESULTS
'aaa'
'bbb'
'c'
'NULL'
'aaa'
'NULL'
'bbb'
'NULL'
'c'
'NULL'
'NULL'
'nonnullable'
====
---- QUERY
# Can't rename nested field
alter table nested_resolution_by_name_test change nested_struct nested_struct
struct<b: array<int>, a: int, c: struct<d: array<array<struct<renamed: string>>>>>;
select tmp.renamed from nested_resolution_by_name_test.nested_struct.c.d.item tmp;
---- TYPES
string
---- RESULTS
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
'NULL'
====
---- QUERY
drop table nested_resolution_by_name_test;
====
---- QUERY
# Test switched key/value map fields
drop table if exists switched_map_fields_resolution_test;
create table switched_map_fields_resolution_test (int_map map<string,int>)
stored as parquet;
====
---- SHELL
hadoop fs -copyFromLocal \
$IMPALA_HOME/testdata/parquet_schema_resolution/switched_map.parq \
$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/switched_map_fields_resolution_test/
====
---- QUERY
# Switched map fields should be resolvable by name.
set parquet_fallback_schema_resolution="name";
select key, value from switched_map_fields_resolution_test.int_map
---- TYPES
string,int
---- RESULTS
'a',1
'b',2
'c',3
====
---- QUERY
# Can't resolve switched map fields by position since types are switched.
set parquet_fallback_schema_resolution="position";
select key, value from switched_map_fields_resolution_test.int_map
---- CATCH
File '$NAMENODE/test-warehouse/$DATABASE.db/switched_map_fields_resolution_test/
switched_map.parq' has an incompatible Parquet schema for column
'$DATABASE.switched_map_fields_resolution_test.int_map.key'.
Column type: STRING, Parquet schema:
required int32 value [i:0 d:1 r:1]
====
---- QUERY
drop table switched_map_fields_resolution_test
====
---- QUERY
# Check that we handle bad options gracefully
set parquet_fallback_schema_resolution="FOO"
---- CATCH
Invalid PARQUET_FALLBACK_SCHEMA_RESOLUTION option: 'FOO'.
Valid options are 'POSITION' and 'NAME'.
====

View File

@@ -27,6 +27,7 @@ from getpass import getuser
from functools import wraps
from impala._thrift_gen.ImpalaService.ttypes import TImpalaQueryOptions
from random import choice
from subprocess import check_call
from tests.common.impala_service import ImpaladService
from tests.common.impala_connection import ImpalaConnection, create_connection
from tests.common.test_dimensions import *
@@ -221,6 +222,17 @@ class ImpalaTestSuite(BaseTestSuite):
sections = self.load_query_test_file(self.get_workload(), test_file_name,
encoding=encoding)
for test_section in sections:
if 'SHELL' in test_section:
assert len(test_section) == 1, \
"SHELL test sections can't contain other sections"
cmd = test_section['SHELL']\
.replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)\
.replace('$IMPALA_HOME', IMPALA_HOME)
if use_db: cmd = cmd.replace('$DATABASE', use_db)
LOG.info("Shell command: " + cmd)
check_call(cmd, shell=True)
continue
if 'QUERY' not in test_section:
assert 0, 'Error in test file %s. Test cases require a -- QUERY section.\n%s' %\
(test_file_name, pprint.pformat(test_section))
@@ -265,7 +277,11 @@ class ImpalaTestSuite(BaseTestSuite):
.replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
.replace('$NAMENODE', NAMENODE) \
.replace('$IMPALA_HOME', IMPALA_HOME)
assert expected_str in str(e)
if use_db: expected_str = expected_str.replace('$DATABASE', use_db)
# Strip newlines so we can split error message into multiple lines
expected_str = expected_str.replace('\n', '')
actual_str = str(e).replace('\n', '')
assert expected_str in actual_str
continue
raise
finally:

View File

@@ -219,6 +219,8 @@ def unique_database(request, testid_checksum):
'characters.'.format(db_name))
def cleanup():
# Make sure we don't try to drop the current session database
request.instance.execute_query_expect_success(request.instance.client, "use default")
request.instance.execute_query_expect_success(
request.instance.client, 'DROP DATABASE `{0}` CASCADE'.format(db_name))
LOG.info('Dropped database "{0}" for test ID "{1}"'.format(db_name,

View File

@@ -336,6 +336,11 @@ class TestParquet(ImpalaTestSuite):
assert c_schema_elt.converted_type == ConvertedType.UTF8
assert d_schema_elt.converted_type == None
@SkipIfS3.insert
def test_resolution_by_name(self, unique_database, vector):
self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
use_db=unique_database)
# We use various scan range lengths to exercise corner cases in the HDFS scanner more
# thoroughly. In particular, it will exercise:
# 1. default scan range

View File

@@ -78,7 +78,7 @@ def parse_query_test_file(file_name, valid_section_names=None, encoding=None):
section_names = valid_section_names
if section_names is None:
section_names = ['QUERY', 'RESULTS', 'TYPES', 'LABELS', 'SETUP', 'CATCH', 'ERRORS',
'USER', 'RUNTIME_PROFILE']
'USER', 'RUNTIME_PROFILE', 'SHELL']
return parse_test_file(file_name, section_names, encoding=encoding,
skip_unknown_sections=False)