mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Python 3 changed some object model methods: - __nonzero__ was removed in favor of __bool__ - func_dict / func_name were removed in favor of __dict__ / __name__ - The next() function was deprecated in favor of __next__ (Code locations should use next(iter) rather than iter.next()) - metaclasses are specified a different way - Locations that specify __eq__ should also specify __hash__ Python 3 also moved some packages around (urllib2, Queue, httplib, etc), and this adapts the code to use the new locations (usually handled on Python 2 via future). This also fixes the code to avoid referencing exception variables outside the exception block and variables outside of a comprehension. Several of these seem like false positives, but it is better to avoid the warning. This fixes these pylint warnings: bad-python3-import eq-without-hash metaclass-assignment next-method-called nonzero-method exception-escape comprehension-escape Testing: - Ran core tests - Ran release exhaustive tests Change-Id: I988ae6c139142678b0d40f1f4170b892eabf25ee Reviewed-on: http://gerrit.cloudera.org:8080/19592 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
673 lines
20 KiB
Python
673 lines
20 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import json
|
|
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
|
|
# The model related modules (types.py, query.py, etc) are interconnected by circular
|
|
# imports which causes problems for the python import system. This module is intended to
|
|
# be the first of the circular modules imported. To be importable, no direct references
|
|
# are made to the other modules from this modules namespace. Instead, other modules are
|
|
# lazyily imported using the following function. Keep in mind that python "globals" are
|
|
# module local, there is no such thing as a cross-module global.
|
|
__ALREADY_IMPORTED = False
|
|
|
|
|
|
def get_import(name):
|
|
# noqa below tells flake8 to not warn when it thinks imports are not used
|
|
global __ALREADY_IMPORTED
|
|
if not __ALREADY_IMPORTED:
|
|
from tests.comparison.db_types import ( # noqa
|
|
BigInt,
|
|
Boolean,
|
|
Char,
|
|
DataType,
|
|
Decimal,
|
|
Float,
|
|
Int,
|
|
JOINABLE_TYPES,
|
|
Number,
|
|
Timestamp)
|
|
from tests.comparison.funcs import AggFunc, AnalyticFunc, Func # noqa
|
|
from tests.comparison.query import InlineView, Subquery, WithClauseInlineView # noqa
|
|
for key, value in locals().items():
|
|
globals()[key] = value
|
|
__ALREADY_IMPORTED = True
|
|
return globals()[name]
|
|
|
|
|
|
class ValExpr(object):
|
|
'''This is class that represents a generic expr that results in a scalar.'''
|
|
|
|
@property
|
|
def type(self):
|
|
'''Returns the type that this expr evaluates to. The type may be Int or Char but
|
|
never BigInt or String. Valid return value are the set defined in types.TYPES.
|
|
'''
|
|
return self.exact_type.get_generic_type()
|
|
|
|
@property
|
|
def exact_type(self):
|
|
'''Return the actual type of the val expr. For example "type" could return Int and
|
|
"exact_type" could return TinyInt.
|
|
'''
|
|
pass
|
|
|
|
@property
|
|
def base_type(self):
|
|
'''Returns the lowest type in the type heirarchy that is not DataType. For non-
|
|
numeric types, the return value will be the same is self.type. Numeric types
|
|
will return Number whereas self.type may be Decimal, Int, or Float.
|
|
'''
|
|
return self.type.get_base_type()
|
|
|
|
@property
|
|
def is_func(self):
|
|
'''Evaluates to True if this expr is an instance of a function.'''
|
|
return isinstance(self, get_import('Func'))
|
|
|
|
@property
|
|
def is_agg(self):
|
|
'''Evaluates to True if this expr is an instance of an aggregate function.'''
|
|
return isinstance(self, get_import('AggFunc'))
|
|
|
|
@property
|
|
def is_analytic(self):
|
|
'''Evaluates to True if this expr is an instance of an analytic function.'''
|
|
return isinstance(self, get_import('AnalyticFunc'))
|
|
|
|
@property
|
|
def contains_agg(self):
|
|
'''Evaluates to True if this expression is an aggregate function or contains an
|
|
aggregate function.
|
|
'''
|
|
return self.is_agg or self.is_func and any(
|
|
isinstance(arg, ValExpr) and arg.contains_agg for arg in self.args)
|
|
|
|
@property
|
|
def contains_analytic(self):
|
|
'''Evaluates to True if this expression is an analytic function or contains an
|
|
analytic function.
|
|
'''
|
|
return self.is_analytic or self.is_func and any(
|
|
isinstance(arg, ValExpr) and arg.contains_analytic for arg in self.args)
|
|
|
|
@property
|
|
def contains_subquery(self):
|
|
'''Evaluates to True if this expression is a subquery or contains a subquery.'''
|
|
return self.is_subquery or self.is_func and any(
|
|
isinstance(arg, ValExpr) and arg.contains_subquery for arg in self.args)
|
|
|
|
@property
|
|
def is_col(self):
|
|
return isinstance(self, Column)
|
|
|
|
@property
|
|
def is_constant(self):
|
|
return isinstance(self, get_import('DataType'))
|
|
|
|
@property
|
|
def is_subquery(self):
|
|
return isinstance(self, get_import('Subquery'))
|
|
|
|
@property
|
|
def returns_boolean(self):
|
|
return issubclass(self.type, get_import('Boolean'))
|
|
|
|
@property
|
|
def returns_int(self):
|
|
return issubclass(self.type, get_import('Int'))
|
|
|
|
@property
|
|
def returns_float(self):
|
|
return issubclass(self.type, get_import('Float'))
|
|
|
|
@property
|
|
def returns_char(self):
|
|
return issubclass(self.type, get_import('Char'))
|
|
|
|
@property
|
|
def returns_timestamp(self):
|
|
return issubclass(self.type, get_import('Timestamp'))
|
|
|
|
def iter_exprs(self, filter=None):
|
|
'''Return an iterator over all exprs that this expr contains as a function argument
|
|
including this expr itself.
|
|
'''
|
|
if not filter or filter(self):
|
|
yield self
|
|
|
|
def count_col_refs(self):
|
|
'''Return a dict with Columns as keys and the number of times the column was used
|
|
in this expr as values.
|
|
'''
|
|
col_ref_counts = defaultdict(int)
|
|
if self.is_func:
|
|
for arg in self.args:
|
|
if isinstance(arg, ValExpr):
|
|
for col, count in arg.count_col_refs().items():
|
|
col_ref_counts[col] += count
|
|
elif self.is_col:
|
|
col_ref_counts[self] += 1
|
|
return col_ref_counts
|
|
|
|
|
|
class StructColumn(object):
|
|
'''The methods in this class are similar to TableExpr.
|
|
|
|
In Impala it's not possible to select from a struct column. To mirror this behavior
|
|
here, StructColumn does not inherit from TableExpr.
|
|
|
|
TODO: Maybe make a parent class that both StructColumn and TableExpr will inhert
|
|
from.
|
|
'''
|
|
|
|
def __init__(self, owner, name):
|
|
self.owner = owner
|
|
self.name = name
|
|
self._cols = []
|
|
self.alias = None
|
|
|
|
@property
|
|
def identifier(self):
|
|
return self.alias or self.name
|
|
|
|
@property
|
|
def cols(self):
|
|
'''Returns a ValExprList containing all scalar cols in this StructColumn. '''
|
|
# TODO: Since Impala now supports nested types, this method can be renamed to
|
|
# scalar_cols
|
|
result = ValExprList()
|
|
for col in self._cols:
|
|
if not isinstance(col, CollectionColumn):
|
|
result.extend(col.cols)
|
|
return result
|
|
|
|
@property
|
|
def collections(self):
|
|
result = []
|
|
for col in self._cols:
|
|
if isinstance(col, CollectionColumn):
|
|
result.append(col)
|
|
result.extend(col.collections)
|
|
elif isinstance(col, StructColumn):
|
|
result.extend(col.collections)
|
|
return result
|
|
|
|
def get_col_by_name(self, col_name):
|
|
for col in self._cols:
|
|
if col.name == col_name:
|
|
return col
|
|
return None
|
|
|
|
def add_col(self, col):
|
|
col.owner = self
|
|
self._cols.append(col)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, StructColumn):
|
|
return False
|
|
if self is other:
|
|
return True
|
|
return self.name == other.name and self.owner.identifier == other.owner.identifier
|
|
|
|
def __hash__(self):
|
|
return hash(self.name)
|
|
|
|
def __repr__(self):
|
|
cols_str = ', '.join(str(f) for f in self._cols)
|
|
return '%s<name: %s, cols: [%s]>' % (type(self).__name__, self.name, cols_str)
|
|
|
|
def __deepcopy__(self, memo):
|
|
other = StructColumn(self.owner, self.name)
|
|
other.alias = self.alias
|
|
for col in self._cols:
|
|
other.add_col(deepcopy(col, memo))
|
|
return other
|
|
|
|
|
|
class Column(ValExpr):
|
|
'''A representation of a column. All TableExprs will have Columns. So a Column
|
|
may belong to an InlineView as well as a standard Table.
|
|
|
|
This class is used in two ways:
|
|
|
|
1) As a piece of metadata in a table definiton. In this usage the col isn't
|
|
intended to represent an val.
|
|
|
|
2) As an expr in a query, for example an item being selected or as part of
|
|
a JOIN condition. In this usage the col is more like a val, which is why
|
|
it implements/extends ValExpr.
|
|
|
|
This class can also be used to represent Map keys, Map values, Array pos,
|
|
scalar struct field, and scalar array item.
|
|
'''
|
|
|
|
def __init__(self, owner, name, exact_type):
|
|
self.owner = owner
|
|
self.name = name
|
|
self._exact_type = exact_type
|
|
self.is_primary_key = False
|
|
|
|
@property
|
|
def exact_type(self):
|
|
return self._exact_type
|
|
|
|
@exact_type.setter
|
|
def exact_type(self, exact_type):
|
|
self._exact_type = exact_type
|
|
|
|
def __hash__(self):
|
|
return hash(self.name)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Column):
|
|
return False
|
|
if self is other:
|
|
return True
|
|
return self.name == other.name and self.owner.identifier == other.owner.identifier
|
|
|
|
@property
|
|
def cols(self):
|
|
return ValExprList([self])
|
|
|
|
def __repr__(self):
|
|
return '%s<name: %s, type: %s>' % (
|
|
type(self).__name__, self.name, self.exact_type.__name__)
|
|
|
|
def __deepcopy__(self, memo):
|
|
# Don't return a deep copy of owner, since that is a circular reference
|
|
return Column(self.owner, self.name, self.exact_type)
|
|
|
|
|
|
class ValExprList(list):
|
|
'''A list of ValExprs.'''
|
|
|
|
@property
|
|
def by_type(self):
|
|
return get_import('DataType').group_by_type(self)
|
|
|
|
def __repr__(self):
|
|
return 'ValExprList: ' + ', '.join(str(x) for x in self)
|
|
|
|
|
|
class TableExpr(object):
|
|
'''This class represents something that a query may use to SELECT from or JOIN on.'''
|
|
|
|
@property
|
|
def identifier(self):
|
|
'''Returns either a table name or alias if one has been declared.'''
|
|
pass
|
|
|
|
@property
|
|
def unique_cols(self):
|
|
'''Returns a list of lists of Cols that in combination define a unique set of values
|
|
within the table. The returned list could be thought of as a list of uniqueness
|
|
constraints (though there may be no actual constraints or any other type of
|
|
enforcement).
|
|
'''
|
|
return ValExprList()
|
|
|
|
@property
|
|
def joinable_cols(self):
|
|
'''Returns a list of Cols that are of a type that is allowed in a JOIN. This is
|
|
mostly an Impala specific thing since Impala requires at least one equality based
|
|
join and not all types are allowed in equality comparisons. Also Boolean is
|
|
excluded because of low cardinality.
|
|
'''
|
|
joinable_types = tuple(get_import('JOINABLE_TYPES'))
|
|
return ValExprList(col for col in self.cols if issubclass(col.type, joinable_types))
|
|
|
|
@property
|
|
def col_types(self):
|
|
'''Returns a Set containing the various column types that this TableExpr contains.'''
|
|
return set(self.cols_by_type)
|
|
|
|
@property
|
|
def collections(self):
|
|
'''Returns all nested collections that can be accessed from this TableExpr.'''
|
|
result = []
|
|
for col in self._cols:
|
|
if isinstance(col, CollectionColumn):
|
|
result.append(col)
|
|
result.extend(col.collections)
|
|
elif isinstance(col, StructColumn):
|
|
result.extend(col.collections)
|
|
return result
|
|
|
|
def add_col(self, col):
|
|
col.owner = self
|
|
self._cols.append(col)
|
|
|
|
def is_visible(self):
|
|
'''If False is returned, columns from this TableExpr may only be used in JOIN
|
|
conditions. This is intended to be used to identify ANTI and SEMI joined table
|
|
exprs.
|
|
'''
|
|
pass
|
|
|
|
@property
|
|
def cols_by_type(self):
|
|
'''Group cols of the same type into lists and return a dict of the results.'''
|
|
return get_import('DataType').group_by_type(self.cols)
|
|
|
|
@property
|
|
def joinable_cols_by_type(self):
|
|
return get_import('DataType').group_by_type(self.joinable_cols)
|
|
|
|
@property
|
|
def is_table(self):
|
|
return isinstance(self, Table)
|
|
|
|
@property
|
|
def is_inline_view(self):
|
|
return isinstance(self, get_import('InlineView'))
|
|
|
|
@property
|
|
def is_with_clause_inline_view(self):
|
|
return isinstance(self, get_import('WithClauseInlineView'))
|
|
|
|
def __hash__(self):
|
|
return hash(self.identifier)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, type(self)):
|
|
return False
|
|
return self.identifier == other.identifier
|
|
|
|
|
|
class CollectionColumn(TableExpr):
|
|
'''Used for representing Map or Array columns.'''
|
|
|
|
def __init__(self, owner, name):
|
|
self.name = name
|
|
# Owner can be one of: Table, ArrayColumn or StructColumn.
|
|
self.owner = owner
|
|
self.is_visible = True
|
|
self.alias = None
|
|
self._cols = []
|
|
|
|
@property
|
|
def identifier(self):
|
|
return self.alias or self.name
|
|
|
|
@property
|
|
def cols(self):
|
|
result = ValExprList()
|
|
for col in self._cols:
|
|
if not isinstance(col, CollectionColumn):
|
|
result.extend(col.cols)
|
|
return result
|
|
|
|
def get_col_by_name(self, col_name):
|
|
for col in self._cols:
|
|
if col.name == col_name:
|
|
return col
|
|
return None
|
|
|
|
def __hash__(self):
|
|
return hash(self.name)
|
|
|
|
def __repr__(self):
|
|
cols_str = ', '.join(str(f) for f in self._cols)
|
|
return '%s<name: %s, cols: [%s]>' % (type(self).__name__, self.name, cols_str)
|
|
|
|
|
|
class ArrayColumn(CollectionColumn):
|
|
|
|
def __init__(self, owner, name, item):
|
|
'''Item represents the type of array. For example if array type is Int, item should be
|
|
Column of type Int.
|
|
'''
|
|
super(ArrayColumn, self).__init__(owner, name)
|
|
item.owner = self
|
|
item.name = 'item'
|
|
self._cols.append(item)
|
|
# Arrays have 2 fields: pos and item. Pos is automatically set to BigInt.
|
|
self._cols.append(Column(
|
|
owner=self, name='pos', exact_type=get_import('BigInt')))
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, ArrayColumn):
|
|
return False
|
|
if self is other:
|
|
return True
|
|
return self.name == other.name and self.owner.identifier == other.owner.identifier
|
|
|
|
def __hash__(self):
|
|
return hash((self.name, self.owner.identifier))
|
|
|
|
def __deepcopy__(self, memo):
|
|
other = ArrayColumn(
|
|
owner=self.owner,
|
|
name=self.name,
|
|
item=deepcopy(self.get_col_by_name('item')))
|
|
other.alias = self.alias
|
|
return other
|
|
|
|
|
|
class MapColumn(CollectionColumn):
|
|
|
|
def __init__(self, owner, name, key, value):
|
|
super(MapColumn, self).__init__(owner, name)
|
|
# Set key
|
|
key.owner = self
|
|
key.name = 'key'
|
|
self._cols.append(key)
|
|
# Set value
|
|
value.owner = self
|
|
value.name = 'value'
|
|
self._cols.append(value)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, MapColumn):
|
|
return False
|
|
if self is other:
|
|
return True
|
|
return self.name == other.name and self.owner.identifier == other.owner.identifier
|
|
|
|
def __hash__(self):
|
|
return hash((self.name, self.owner.identifier))
|
|
|
|
def __deepcopy__(self, memo):
|
|
other = MapColumn(
|
|
owner=self.owner,
|
|
name=self.name,
|
|
key=deepcopy(self.get_col_by_name('key')),
|
|
value=deepcopy(self.get_col_by_name('value')))
|
|
other.alias = self.alias
|
|
return other
|
|
|
|
|
|
class Table(TableExpr):
|
|
'''Represents a standard database table.'''
|
|
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self._cols = [] # can include CollectionColumns and StructColumns
|
|
self._unique_cols = []
|
|
self.alias = None
|
|
self.is_visible = True # Tables used in SEMI or ANTI JOINs are invisible
|
|
|
|
# Only used for data loading. Always stored in upper-case. If set, values will be
|
|
# something like 'PARQUET' or 'TEXT'. See cli_options.py for a full list of
|
|
# possible values.
|
|
self._storage_format = None
|
|
|
|
# Only used for data loading. For Impala and Hive, this is the path to the directory
|
|
# in the storage system, such as an HDFS URL.
|
|
self.storage_location = None
|
|
|
|
# Only used for data loading. Avro tables may require a separate schema definition,
|
|
# this is the path to the schema file in the storage system, such as an HDFS URL.
|
|
self.schema_location = None
|
|
|
|
@property
|
|
def identifier(self):
|
|
return self.alias or self.name
|
|
|
|
@property
|
|
def primary_keys(self):
|
|
"""
|
|
Return immutable sequence of primary keys.
|
|
"""
|
|
return tuple(col for col in self._cols if col.is_primary_key)
|
|
|
|
@property
|
|
def primary_key_names(self):
|
|
"""
|
|
Return immutable sequence for primary key names.
|
|
"""
|
|
return tuple(col.name for col in self.primary_keys)
|
|
|
|
@property
|
|
def updatable_columns(self):
|
|
"""
|
|
Return immutable sequence of columns that may be updated (i.e., not primary keys).
|
|
|
|
If the table doesn't have primary keys, no columns are updatable.
|
|
"""
|
|
if self.primary_keys:
|
|
return tuple(col for col in self._cols if not col.is_primary_key)
|
|
else:
|
|
return ()
|
|
|
|
@property
|
|
def updatable_column_names(self):
|
|
"""
|
|
Return immutable sequence of column names that may be updated
|
|
"""
|
|
return tuple(col.name for col in self.updatable_columns)
|
|
|
|
@property
|
|
def cols(self):
|
|
result = ValExprList()
|
|
for col in self._cols:
|
|
if not isinstance(col, CollectionColumn):
|
|
result.extend(col.cols)
|
|
return result
|
|
|
|
@cols.setter
|
|
def cols(self, cols):
|
|
self._cols = cols
|
|
|
|
@property
|
|
def unique_cols(self):
|
|
return self._unique_cols
|
|
|
|
@unique_cols.setter
|
|
def unique_cols(self, unique_cols):
|
|
self._unique_cols = unique_cols
|
|
|
|
@property
|
|
def storage_format(self):
|
|
return self._storage_format
|
|
|
|
@storage_format.setter
|
|
def storage_format(self, storage_format):
|
|
self._storage_format = storage_format and storage_format.upper()
|
|
|
|
def get_avro_schema(self):
|
|
avro_schema = {'name': 'my_record', 'type': 'record', 'fields': []}
|
|
for col in self.cols:
|
|
if issubclass(col.type, get_import('Int')):
|
|
avro_type = 'int'
|
|
elif issubclass(col.type, get_import('Char')):
|
|
avro_type = 'string'
|
|
elif issubclass(col.type, get_import('Decimal')):
|
|
avro_type = {
|
|
"type": "bytes",
|
|
"logicalType": "decimal",
|
|
"precision": col.exact_type.MAX_DIGITS,
|
|
"scale": col.exact_type.MAX_FRACTIONAL_DIGITS}
|
|
else:
|
|
avro_type = col.type.__name__.lower()
|
|
avro_schema['fields'].append({'name': col.name, 'type': ['null', avro_type]})
|
|
return json.dumps(avro_schema)
|
|
|
|
def __repr__(self):
|
|
return 'Table<name: %s, cols: %s>' \
|
|
% (self.name, ', '.join([str(col) for col in self._cols]))
|
|
|
|
def __deepcopy__(self, memo):
|
|
other = Table(self.name)
|
|
other.alias = self.alias
|
|
other.is_visible = self.is_visible
|
|
cols_by_name = dict()
|
|
for col in self._cols:
|
|
result_col = deepcopy(col, memo)
|
|
other.add_col(result_col)
|
|
cols_by_name[result_col.name] = result_col
|
|
|
|
other._unique_cols = []
|
|
for col_combo in self._unique_cols:
|
|
other_col_combo = set()
|
|
for col in col_combo:
|
|
other_col_combo.add(cols_by_name[col.name])
|
|
other.unique_cols.append(other_col_combo)
|
|
return other
|
|
|
|
|
|
class TableExprList(list):
|
|
'''A list of TableExprs.'''
|
|
|
|
@property
|
|
def cols(self):
|
|
'''Return a list of all the Columns containd in all the TableExprs.'''
|
|
return ValExprList(col for table_expr in self for col in table_expr.cols)
|
|
|
|
@property
|
|
def joinable_cols_by_type(self):
|
|
cols_by_type = defaultdict(ValExprList)
|
|
for table_expr in self:
|
|
for type_, cols in table_expr.joinable_cols_by_type.items():
|
|
cols_by_type[type_].extend(cols)
|
|
return cols_by_type
|
|
|
|
@property
|
|
def cols_by_type(self):
|
|
cols_by_type = defaultdict(ValExprList)
|
|
for table_expr in self:
|
|
for type_, cols in table_expr.cols_by_type.items():
|
|
cols_by_type[type_].extend(cols)
|
|
return cols_by_type
|
|
|
|
@property
|
|
def col_types(self):
|
|
return tuple(self.cols_by_type)
|
|
|
|
@property
|
|
def collections(self):
|
|
result = []
|
|
for table_expr in self:
|
|
result.extend(table_expr.collections)
|
|
return result
|
|
|
|
@property
|
|
def by_col_type(self):
|
|
'''Return a dict with keys being column types and values being lists of TableExprs
|
|
that have at least one Column of that type.
|
|
'''
|
|
table_exprs_by_type = defaultdict(TableExprList)
|
|
for table_expr in self:
|
|
for col_type in table_expr.col_types:
|
|
table_exprs_by_type[col_type].append(table_expr)
|
|
return table_exprs_by_type
|