IMPALA-10581: Implement ds_theta_intersect_f() function

This function receives two strings that are serialized Apache
DataSketches Theta sketches. Computes the intersection of two sketches
of same or different column and returns the resulting sketch of
intersection.

Example:
select ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2))
from sketch_tbl;
+-----------------------------------------------------------+
| ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2)) |
+-----------------------------------------------------------+
| 5                                                         |
+-----------------------------------------------------------+

Change-Id: I335eada00730036d5433775cfe673e0e4babaa01
Reviewed-on: http://gerrit.cloudera.org:8080/17186
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Fucun Chu
2021-03-12 16:59:05 +08:00
committed by Impala Public Jenkins
parent 8f8668aaf0
commit 77d6acd032
6 changed files with 157 additions and 13 deletions

View File

@@ -78,6 +78,33 @@ StringVal StringStreamToStringVal(FunctionContext* ctx, const stringstream& str_
return dst;
}
bool update_sketch_to_theta_union(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_union& sketch) {
if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
datasketches::theta_sketch::unique_ptr sketch_ptr;
if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
LogSketchDeserializationError(ctx);
return false;
}
sketch.update(*sketch_ptr);
}
return true;
}
bool update_sketch_to_theta_intersection(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_intersection& sketch) {
if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
datasketches::theta_sketch::unique_ptr sketch_ptr;
if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
LogSketchDeserializationError(ctx);
return false;
}
sketch.update(*sketch_ptr);
return true;
}
return false;
}
template<class T>
StringVal DsKllVectorResultToStringVal(FunctionContext* ctx,
const vector<T>& kll_result) {

View File

@@ -22,6 +22,8 @@
#include "common/status.h"
#include "thirdparty/datasketches/hll.hpp"
#include "thirdparty/datasketches/theta_union.hpp"
#include "thirdparty/datasketches/theta_intersection.hpp"
#include "udf/udf.h"
namespace impala {
@@ -59,6 +61,19 @@ bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch)
StringVal StringStreamToStringVal(FunctionContext* ctx,
const std::stringstream& str_stream);
/// Helper function that receives a serialized DataSketches Theta sketch in
/// 'serialized_sketch', deserializes it and update the deserialized sketch to 'sketch'.
/// Returns false if the deserialization fails (the error log will be written),
/// true otherwise.
bool update_sketch_to_theta_union(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_union& sketch);
/// Helper function that receives a serialized DataSketches Theta sketch in
/// 'serialized_sketch', deserializes it and update the deserialized sketch to 'sketch'.
/// Returns false if 'serialized_sketch' is empty or deserialization fails (the error log
/// will be written), true otherwise.
bool update_sketch_to_theta_intersection(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_intersection& sketch);
/// Helper function that receives a vector and returns a comma separated StringVal that
/// holds the items from the vector keeping the order.

View File

@@ -22,6 +22,7 @@
#include "thirdparty/datasketches/hll.hpp"
#include "thirdparty/datasketches/theta_sketch.hpp"
#include "thirdparty/datasketches/theta_union.hpp"
#include "thirdparty/datasketches/theta_intersection.hpp"
#include "thirdparty/datasketches/theta_a_not_b.hpp"
#include "thirdparty/datasketches/kll_sketch.hpp"
#include "udf/udf-internal.h"
@@ -160,19 +161,6 @@ StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx,
return StringVal::null();
}
bool update_sketch_to_theta_union(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_union& union_sketch) {
if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
datasketches::theta_sketch::unique_ptr sketch_ptr;
if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
LogSketchDeserializationError(ctx);
return false;
}
union_sketch.update(*sketch_ptr);
}
return true;
}
StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx,
const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
datasketches::theta_union union_sketch = datasketches::theta_union::builder().build();
@@ -190,6 +178,25 @@ StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx,
return StringStreamToStringVal(ctx, serialized_input);
}
StringVal DataSketchesFunctions::DsThetaIntersectF(FunctionContext* ctx,
const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
datasketches::theta_intersection intersection_sketch;
// Update two sketches to theta_intersection
// Note that if one of the sketches is null, null is returned.
if (!update_sketch_to_theta_intersection(
ctx, first_serialized_sketch, intersection_sketch)) {
return StringVal::null();
}
if (!update_sketch_to_theta_intersection(
ctx, second_serialized_sketch, intersection_sketch)) {
return StringVal::null();
}
datasketches::compact_theta_sketch sketch = intersection_sketch.get_result();
std::stringstream serialized_input;
sketch.serialize(serialized_input);
return StringStreamToStringVal(ctx, serialized_input);
}
FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
const StringVal& serialized_sketch, const DoubleVal& rank) {
if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null();

View File

@@ -85,6 +85,14 @@ public:
const StringVal& first_serialized_sketch,
const StringVal& second_serialized_sketch);
/// 'first_serialized_sketch' and 'second_serialized_sketch' are both expected as
/// serialized Apache DataSketches Theta sketches. If they are not, then the query
/// fails. Intersect two sketches and return the resulting sketch after the
/// intersection.
static StringVal DsThetaIntersectF(FunctionContext* ctx,
const StringVal& first_serialized_sketch,
const StringVal& second_serialized_sketch);
/// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
/// it is not, then the query fails. 'rank' is used to identify which item (estimate)
/// to return from the sketched dataset. E.g. 0.1 means the item where 10% of the

View File

@@ -1009,6 +1009,8 @@ visible_functions = [
'_ZN6impala21DataSketchesFunctions14DsThetaExcludeEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
[['ds_theta_union_f'], 'STRING', ['STRING', 'STRING'],
'_ZN6impala21DataSketchesFunctions13DsThetaUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
[['ds_theta_intersect_f'], 'STRING', ['STRING', 'STRING'],
'_ZN6impala21DataSketchesFunctions17DsThetaIntersectFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
[['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
'_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
[['ds_kll_n'], 'BIGINT', ['STRING'],

View File

@@ -481,4 +481,89 @@ select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) from sketch_interme
BIGINT
---- RESULTS
15
====
---- QUERY
# Check if the intersection of a valid sketch with a null value returns an null sketch.
select
ds_theta_estimate(ds_theta_intersect_f(date_sketch, null)),
ds_theta_estimate(ds_theta_intersect_f(null, float_sketch))
from sketch_store;
---- TYPES
BIGINT,BIGINT
---- RESULTS
0,0
0,0
0,0
0,0
====
---- QUERY
# Check that ds_theta_intersect_f() returns an empty sketch for 2 empty sketch.
select ds_theta_estimate(ds_theta_intersect_f(
ds_theta_sketch(cast(f2 as float)), ds_theta_sketch(cast(f2 as float))))
from functional_parquet.emptytable;
---- TYPES
BIGINT
---- RESULTS
0
====
---- QUERY
# Checks that ds_theta_intersect_f() returns an null sketch for 2 null inputs.
select ds_theta_estimate(ds_theta_intersect_f(null_str, some_nulls)) from
functional_parquet.nullrows where id='b';
---- TYPES
BIGINT
---- RESULTS
0
====
---- QUERY
# ds_theta_intersect_f() returns an error if it receives an invalid serialized sketch.
select ds_theta_intersect_f(date_string_col, null) from functional_parquet.alltypestiny
where id=1;
---- CATCH
UDF ERROR: Unable to deserialize sketch.
====
---- QUERY
# ds_theta_intersect_f() returns an error if it receives an invalid serialized sketch.
select ds_theta_intersect_f(sketch1, sketch2) from (
select ds_theta_sketch(float_col) sketch1, max(date_string_col) sketch2 from
functional_parquet.alltypestiny
) t
---- CATCH
UDF ERROR: Unable to deserialize sketch.
====
---- QUERY
# Intersect the sketches from theta_sketches_impala_hive2 and checks if the intersect
# produces the same result as if these sketches were used separately to get the estimates.
select
ds_theta_estimate(ds_theta_intersect_f(i_ti, h_ti)) as ti,
ds_theta_estimate(ds_theta_intersect_f(i_i, h_i)) as i,
ds_theta_estimate(ds_theta_intersect_f(i_bi, h_bi)) as bi,
ds_theta_estimate(ds_theta_intersect_f(i_f, h_f)) as f,
ds_theta_estimate(ds_theta_intersect_f(i_d, h_d)) as d,
ds_theta_estimate(ds_theta_intersect_f(i_s, h_s)) as s,
ds_theta_estimate(ds_theta_intersect_f(i_c, h_c)) as c,
ds_theta_estimate(ds_theta_intersect_f(i_v, h_v)) as v,
ds_theta_estimate(ds_theta_intersect_f(i_nc, h_nc)) as nc
from theta_sketches_impala_hive2;
---- TYPES
BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
---- RESULTS
5,7,6,6,7,4,4,3,0
====
---- QUERY
# Check that the non-empty input sketches are distinct and the result is empty.
select ds_theta_estimate(ds_theta_intersect_f(date_sketch, float_sketch))
from sketch_store where year=2009 and month=1;
---- TYPES
BIGINT
---- RESULTS
0
====
---- QUERY
# Check When the inputs aren't the same but has some (but not all) items common.
select ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2)) from sketch_intermediate;
---- TYPES
BIGINT
---- RESULTS
5
====