IMPALA-13661: Support parallelism above JDBC tables for joins/aggregates

Impala's planner generates a single-fragment, single-
threaded scan node for queries on JDBC tables because table
statistics are not properly available from the external
JDBC source. As a result, even large JDBC tables are
executed serially, causing suboptimal performance for joins,
aggregations, and scans over millions of rows.

This patch enables Impala to estimate the number of rows in a JDBC
table by issuing a COUNT(*) query at query preparation time. The
estimation is returned via TPrepareResult.setNum_rows_estimate()
and propagated into DataSourceScanNode. The scan node then uses
this cardinality to drive planner heuristics such as join order,
fragment parallelization, and scanner thread selection.

The design leverages the existing JDBC accessor layer:
- JdbcDataSource.prepare() constructs the configuration and invokes
  GenericJdbcDatabaseAccessor.getTotalNumberOfRecords().
- The accessor wraps the underlying query in:
      SELECT COUNT(*) FROM (<query>) tmptable
  ensuring correctness for both direct table scans and parameterized
  query strings.
- The result is captured as num_rows_estimate, which is then applied
  during computeStats() in DataSourceScanNode.
With accurate (or approximate) row counts, the planner can now:
- Assign multiple scanner threads to JDBC scan nodes instead of
   falling back to a single-thread plan.
- Introduce exchange nodes where beneficial, parallelizing data
   fetches across multiple JDBC connections.
- Produce better join orders by comparing JDBC row cardinalities
   against native Impala tables.
- Avoid severe underestimation that previously defaulted to wrong
   table statistics, leading to degenerate plans.

For a sample join query mentioned in the test file,
these are the improvements:

Before Optimization:
- Cardinality fixed at 1 for all JDBC scans
- Single fragment, single thread per query
- Max per-host resource reservation: ~9.7 MB, 1 thread
- No EXCHANGE or MERGING EXCHANGE operators
- No broadcast distribution; joins executed serially
- Example query runtime: ~77s

SCAN JDBC A
   \
    HASH JOIN
       \
        SCAN JDBC B
           \
            HASH JOIN
               \
                SCAN JDBC C
                   \
                    TOP-N -> ROOT

After Optimization:
- Cardinality derived from COUNT(*) (e.g. 150K, 1.5M rows)
- Multiple fragments per scan, 7 threads per query
- Max per-host resource reservation: ~123 MB, 7 threads
- Plans include EXCHANGE and MERGING EXCHANGE operators
- Broadcast joins on small sides, improving parallelism
- Example query runtime: ~38s (~2x faster)

SCAN JDBC A --> EXCHANGE(SND) --+
                                  \
                                   EXCHANGE(RCV) -> HASH JOIN(BCAST) --+
SCAN JDBC B --> EXCHANGE(SND) ----/                                   \
                                                                         HASH JOIN(BCAST) --+
SCAN JDBC C --> EXCHANGE(SND) ------------------------------------------/                 \
                                                                                             TOP-N
                                                                                               \
                                                                                                MERGING EXCHANGE -> ROOT

Also added a new backend configuration flag
--min_jdbc_scan_cardinality (default: 10) to provide a
lower bound for scan node cardinality estimates
during planning. This flag is propagated from BE
to FE via TBackendGflags and surfaced through
BackendConfig, ensuring the planner never produces
unrealistically low cardinality values.

TODO: Add a query option for this optimization
to avoid extra JDBC round trip for smaller
queries (IMPALA-14417).

Testing: All cases of Planner tests are written in
jdbc-parallel.test. Some basic metrics
are also mentioned in the commit message.

Change-Id: If47d29bdda5b17a1b369440f04d4e209d12133d9
Reviewed-on: http://gerrit.cloudera.org:8080/23112
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This commit is contained in:
pranav.lodha
2025-07-02 18:59:59 +05:30
committed by Wenzhe Zhou
parent a2e4463fbc
commit a77fec6391
12 changed files with 2210 additions and 34 deletions

View File

@@ -130,8 +130,27 @@ public class JdbcDataSource implements ExternalDataSource {
Lists.newArrayList("Invalid init_string value")));
}
List<Integer> acceptedPredicates = acceptedPredicates(params.getPredicates());
long numRecords = 0;
try {
dbAccessor_ = DatabaseAccessorFactory.getAccessor(tableConfig_);
numRecords = dbAccessor_.getTotalNumberOfRecords(tableConfig_);
LOG.info(String.format("Estimated number of records: %d", numRecords));
} catch (JdbcDatabaseAccessException e) {
return new TPrepareResult(
new TStatus(TErrorCode.RUNTIME_ERROR,
Lists.newArrayList("Failed to retrieve total number of records: "
+ e.getMessage())));
}
if (dbAccessor_ != null) {
if (params.isSetClean_dbcp_ds_cache()) {
cleanDbcpDSCache_ = params.isClean_dbcp_ds_cache();
}
dbAccessor_.close(null, cleanDbcpDSCache_);
dbAccessor_ = null;
}
return new TPrepareResult(STATUS_OK)
.setAccepted_conjuncts(acceptedPredicates);
.setAccepted_conjuncts(acceptedPredicates)
.setNum_rows_estimate(numRecords);
}
@Override

View File

@@ -116,33 +116,6 @@ public class JdbcStorageConfigManager {
return config.get(key.getPropertyName());
}
public static String getOrigQueryToExecute(Configuration config) {
String query;
String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
if (tableName != null) {
// We generate query as 'select * from tbl'
query = "select * from " + tableName;
} else {
query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
}
return query;
}
public static String getQueryToExecute(Configuration config) {
String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
if (query != null) {
// Query has been defined, return it
return query;
}
// We generate query as 'select * from tbl'
String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
query = "select * from " + tableName;
return query;
}
private static boolean isEmptyString(String value) {
return ((value == null) || (value.trim().isEmpty()));
}

View File

@@ -79,7 +79,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
try {
initializeDatabaseSource(conf);
String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
String sql = getQueryToExecute(conf);
// TODO: If a target database cannot flatten this view query, try to text
// replace the generated "select *".
String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable";
@@ -116,7 +116,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
try {
initializeDatabaseSource(conf);
String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
String sql = getQueryToExecute(conf);
String partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
LOG.info("Query to execute is [{}]", partitionQuery);
@@ -350,4 +350,22 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
.getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE);
}
protected String getQueryToExecute(Configuration config) {
String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
if (query != null) {
// Query has been defined, return it
return query;
}
// We generate query as 'select * from tbl'
String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
// Make jdbc table name to be quoted with double quotes if columnMapping is not empty
String columnMapping = config.get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName());
if (!Strings.isNullOrEmpty(columnMapping)) {
tableName = getCaseSensitiveName(tableName);
}
query = "select * from " + tableName;
return query;
}
}

View File

@@ -49,6 +49,7 @@ import org.apache.impala.extdatasource.thrift.TColumnDesc;
import org.apache.impala.extdatasource.thrift.TComparisonOp;
import org.apache.impala.extdatasource.thrift.TPrepareParams;
import org.apache.impala.extdatasource.thrift.TPrepareResult;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TCacheJarResult;
import org.apache.impala.thrift.TColumnValue;
@@ -102,7 +103,7 @@ public class DataSourceScanNode extends ScanNode {
@Override
public void init(Analyzer analyzer) throws ImpalaException {
checkForSupportedFileFormats();
prepareDataSource();
prepareDataSource(analyzer.getQueryOptions());
conjuncts_ = orderConjunctsByCost(conjuncts_);
computeStats(analyzer);
// materialize slots in remaining conjuncts_
@@ -179,7 +180,7 @@ public class DataSourceScanNode extends ScanNode {
* stats. The accepted predicates are moved from conjuncts_ into acceptedConjuncts_
* and the associated TBinaryPredicates are set in acceptedPredicates_.
*/
private void prepareDataSource() throws InternalException {
private void prepareDataSource(TQueryOptions queryOptions) throws InternalException {
// Binary predicates that will be offered to the data source.
List<List<TBinaryPredicate>> offeredPredicates = new ArrayList<>();
// The index into conjuncts_ for each element in offeredPredicates.
@@ -223,6 +224,7 @@ public class DataSourceScanNode extends ScanNode {
TPrepareParams prepareParams = new TPrepareParams();
prepareParams.setInit_string(table_.getInitString());
prepareParams.setPredicates(offeredPredicates);
prepareParams.setClean_dbcp_ds_cache(queryOptions.isClean_dbcp_ds_cache());
// TODO: Include DB (i.e. getFullName())?
prepareParams.setTable_name(table_.getName());
prepareResult = executor.prepare(prepareParams);
@@ -332,8 +334,15 @@ public class DataSourceScanNode extends ScanNode {
super.computeStats(analyzer);
inputCardinality_ = numRowsEstimate_;
cardinality_ = numRowsEstimate_;
// Use estimate from the data source if present
if (numRowsEstimate_ > 0) {
cardinality_ = numRowsEstimate_;
} else {
cardinality_ = table_.getNumRows(); // fallback
}
cardinality_ = applyConjunctsSelectivity(cardinality_);
cardinality_ = Math.max(1, cardinality_);
int minCard = BackendConfig.INSTANCE.getMinJdbcScanCardinality();
cardinality_ = Math.max(minCard, cardinality_);
cardinality_ = capCardinalityAtLimit(cardinality_);
if (LOG.isTraceEnabled()) {

View File

@@ -192,6 +192,13 @@ public class BackendConfig {
return backendCfg_.blacklisted_dbs;
}
public int getMinJdbcScanCardinality() {
if (backendCfg_.isSetMin_jdbc_scan_cardinality()) {
return backendCfg_.getMin_jdbc_scan_cardinality();
}
return 10;
}
public String getBlacklistedTables() {
return backendCfg_.blacklisted_tables;
}

View File

@@ -956,6 +956,6 @@ public class LdapHS2Test {
"Table has been dropped.");
// Two successful authentications for each ExecAndFetch().
verifyMetrics(25, 0);
verifyMetrics(23, 0);
}
}

View File

@@ -245,6 +245,11 @@ public class PlannerTest extends PlannerTestBase {
runPlannerTestFile("hbase");
}
@Test
public void testJdbcParallel() {
runPlannerTestFile("jdbc-parallel");
}
/**
* Test of HBase in the case of disabling the key scan.
* Normally the HBase scan node goes out to HBase to query the