mirror of
https://github.com/apache/impala.git
synced 2026-01-08 12:02:54 -05:00
Result spooling has been relatively stable since it was introduced, and it has several benefits described in IMPALA-8656. This patch enable result spooling (SPOOL_QUERY_RESULTS) query options by default. Furthermore, some tests need to be adjusted to account for result spooling by default. The following are the adjustment categories and list of tests that fall under such category. Change in assertions: PlannerTest#testAcidTableScans PlannerTest#testBloomFilterAssignment PlannerTest#testConstantFolding PlannerTest#testFkPkJoinDetection PlannerTest#testFkPkJoinDetectionWithHDFSNumRowsEstDisabled PlannerTest#testKuduSelectivity PlannerTest#testMaxRowSize PlannerTest#testMinMaxRuntimeFilters PlannerTest#testMinMaxRuntimeFiltersWithHDFSNumRowsEstDisabled PlannerTest#testMtDopValidation PlannerTest#testParquetFiltering PlannerTest#testParquetFilteringDisabled PlannerTest#testPartitionPruning PlannerTest#testPreaggBytesLimit PlannerTest#testResourceRequirements PlannerTest#testRuntimeFilterQueryOptions PlannerTest#testSortExprMaterialization PlannerTest#testSpillableBufferSizing PlannerTest#testTableSample PlannerTest#testTpch PlannerTest#testKuduTpch PlannerTest#testTpchNested PlannerTest#testUnion TpcdsPlannerTest custom_cluster/test_admission_controller.py::TestAdmissionController::test_dedicated_coordinator_planner_estimates custom_cluster/test_admission_controller.py::TestAdmissionController::test_memory_rejection custom_cluster/test_admission_controller.py::TestAdmissionController::test_pool_mem_limit_configs metadata/test_explain.py::TestExplain::test_explain_level2 metadata/test_explain.py::TestExplain::test_explain_level3 metadata/test_stats_extrapolation.py::TestStatsExtrapolation::test_stats_extrapolation Increase BUFFER_POOL_LIMIT: query_test/test_queries.py::TestQueries::test_analytic_fns query_test/test_runtime_filters.py::TestRuntimeRowFilters::test_row_filter_reservation query_test/test_sort.py::TestQueryFullSort::test_multiple_mem_limits_full_output query_test/test_spilling.py::TestSpillingBroadcastJoins::test_spilling_broadcast_joins query_test/test_spilling.py::TestSpillingDebugActionDimensions::test_spilling_aggs query_test/test_spilling.py::TestSpillingDebugActionDimensions::test_spilling_regression_exhaustive query_test/test_udfs.py::TestUdfExecution::test_mem_limits Increase MEM_LIMIT: query_test/test_mem_usage_scaling.py::TestExchangeMemUsage::test_exchange_mem_usage_scaling query_test/test_mem_usage_scaling.py::TestScanMemLimit::test_hdfs_scanner_thread_mem_scaling Increase MAX_ROW_SIZE: custom_cluster/test_parquet_max_page_header.py::TestParquetMaxPageHeader::test_large_page_header_config query_test/test_insert.py::TestInsertQueries::test_insert_large_string query_test/test_query_mem_limit.py::TestQueryMemLimit::test_mem_limit query_test/test_scanners.py::TestTextSplitDelimiters::test_text_split_across_buffers_delimiter query_test/test_scanners.py::TestWideRow::test_wide_row Disable result spooling to maintain assertion: custom_cluster/test_admission_controller.py::TestAdmissionController::test_set_request_pool custom_cluster/test_admission_controller.py::TestAdmissionController::test_timeout_reason_host_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_timeout_reason_pool_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_queue_reasons_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_pool_config_change_while_queued custom_cluster/test_query_retries.py::TestQueryRetries::test_retry_fetched_rows custom_cluster/test_query_retries.py::TestQueryRetries::test_retry_finished_query custom_cluster/test_scratch_disk.py::TestScratchDir::test_no_dirs custom_cluster/test_scratch_disk.py::TestScratchDir::test_non_existing_dirs custom_cluster/test_scratch_disk.py::TestScratchDir::test_non_writable_dirs query_test/test_insert.py::TestInsertQueries::test_insert_large_string (the last query only) query_test/test_kudu.py::TestKuduMemLimits::test_low_mem_limit_low_selectivity_scan query_test/test_mem_usage_scaling.py::TestScanMemLimit::test_kudu_scan_mem_usage query_test/test_queries.py::TestQueriesParquetTables::test_very_large_strings query_test/test_query_mem_limit.py::TestCodegenMemLimit::test_codegen_mem_limit shell/test_shell_client.py::TestShellClient::test_fetch_size Testing: - Pass exhaustive tests. Change-Id: I9e360c1428676d8f3fab5d95efee18aca085eba4 Reviewed-on: http://gerrit.cloudera.org:8080/16755 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
97 lines
3.9 KiB
Python
97 lines
3.9 KiB
Python
#!/usr/bin/env impala-python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from shell.impala_client import ImpalaBeeswaxClient, ImpalaHS2Client
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.test_dimensions import create_client_protocol_dimension
|
|
from util import get_impalad_host_port
|
|
|
|
|
|
class TestShellClient(ImpalaTestSuite):
|
|
"""Tests for the Impala Shell clients: ImpalaBeeswaxClient and ImpalaHS2Client."""
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
|
|
|
|
def test_fetch_size(self, vector):
|
|
"""Tests that when result spooling is disabled, setting a small batch_size causes
|
|
the shell to fetch a single batch at a time, even when the configured fetch size is
|
|
larger than the batch_size."""
|
|
handle = None
|
|
num_rows = 100
|
|
batch_size = 10
|
|
query_options = {'batch_size': str(batch_size), 'spool_query_results': 'false'}
|
|
client = self.__get_shell_client(vector)
|
|
|
|
try:
|
|
client.connect()
|
|
handle = client.execute_query(
|
|
"select * from functional.alltypes limit {0}".format(num_rows), query_options)
|
|
self.__fetch_rows(client.fetch(handle), batch_size, num_rows)
|
|
finally:
|
|
if handle is not None: client.close_query(handle)
|
|
client.close_connection()
|
|
|
|
def test_fetch_size_result_spooling(self, vector):
|
|
"""Tests that when result spooling is enabled, that the exact fetch_size is honored
|
|
even if a small batch_size is configured."""
|
|
handle = None
|
|
fetch_size = 20
|
|
num_rows = 100
|
|
query_options = {'batch_size': '10', 'spool_query_results': 'true'}
|
|
client = self.__get_shell_client(vector, fetch_size)
|
|
|
|
try:
|
|
client.connect()
|
|
handle = client.execute_query(
|
|
"select * from functional.alltypes limit {0}".format(num_rows), query_options)
|
|
self.__fetch_rows(client.fetch(handle), num_rows / fetch_size, num_rows)
|
|
finally:
|
|
if handle is not None: client.close_query(handle)
|
|
client.close_connection()
|
|
|
|
def __fetch_rows(self, fetch_batches, num_batches, num_rows):
|
|
"""Fetches all rows using the given fetch_batches generator. Asserts that num_batches
|
|
batches are produced by the generator and that num_rows are returned."""
|
|
num_batches_count = 0
|
|
rows_per_batch = num_rows / num_batches
|
|
for fetch_batch in fetch_batches:
|
|
assert len(fetch_batch) == rows_per_batch
|
|
num_batches_count += 1
|
|
if num_batches_count == num_batches: break
|
|
assert num_batches_count == num_batches
|
|
|
|
def __get_shell_client(self, vector, fetch_size=1024):
|
|
"""Returns the client specified by the protocol in the given vector."""
|
|
impalad = get_impalad_host_port(vector).split(":")
|
|
protocol = vector.get_value("protocol")
|
|
if protocol == 'hs2':
|
|
return ImpalaHS2Client(impalad, fetch_size, None)
|
|
elif protocol == 'hs2-http':
|
|
return ImpalaHS2Client(impalad, fetch_size, None,
|
|
use_http_base_transport=True, http_path='cliservice')
|
|
elif protocol == 'beeswax':
|
|
return ImpalaBeeswaxClient(impalad, fetch_size, None)
|