low-code: Alias stream_interval and stream_partition to stream_slice in interpolation context (#25373)
* add aliases
* Raise error if the alias is found in the context
* format
* Comment
* Automated Commit - Formatting Changes
* rename to stream partition in greenhouse manifest
* Revert "rename to stream partition in greenhouse manifest"
This reverts commit d513ef418f.
* Clean up test
* Other test
* last test
---------
Co-authored-by: girarda <girarda@users.noreply.github.com>
This commit is contained in:
@@ -31,6 +31,12 @@ class JinjaInterpolation(Interpolation):
|
||||
Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/#
|
||||
"""
|
||||
|
||||
# These aliases are used to deprecate existing keywords without breaking all existing connectors.
|
||||
ALIASES = {
|
||||
"stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values
|
||||
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._environment = Environment()
|
||||
self._environment.filters.update(**filters)
|
||||
@@ -38,6 +44,16 @@ class JinjaInterpolation(Interpolation):
|
||||
|
||||
def eval(self, input_str: str, config: Config, default: Optional[str] = None, **additional_parameters):
|
||||
context = {"config": config, **additional_parameters}
|
||||
|
||||
for alias, equivalent in self.ALIASES.items():
|
||||
if alias in context:
|
||||
# This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises
|
||||
raise ValueError(
|
||||
f"Found reserved keyword {alias} in interpolation context. This is unexpected and indicative of a bug in the CDK."
|
||||
)
|
||||
elif equivalent in context:
|
||||
context[alias] = context[equivalent]
|
||||
|
||||
try:
|
||||
if isinstance(input_str, str):
|
||||
result = self._eval(input_str, context)
|
||||
|
||||
@@ -35,19 +35,73 @@ def test_get_value_from_a_list_of_mappings():
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_name, s, value",
|
||||
"s, value",
|
||||
[
|
||||
("test_number", "{{1}}", 1),
|
||||
("test_list", "{{[1,2]}}", [1, 2]),
|
||||
("test_dict", "{{ {1:2} }}", {1: 2}),
|
||||
("test_addition", "{{ 1+2 }}", 3),
|
||||
pytest.param("{{1}}", 1, id="test_number"),
|
||||
pytest.param("{{1}}", 1, id="test_number"),
|
||||
pytest.param("{{[1,2]}}", [1, 2], id="test_list"),
|
||||
pytest.param("{{ {1:2} }}", {1: 2}, id="test_dict"),
|
||||
pytest.param("{{ 1+2 }}", 3, id="test_addition"),
|
||||
],
|
||||
)
|
||||
def test_literals(test_name, s, value):
|
||||
def test_literals(s, value):
|
||||
val = interpolation.eval(s, None)
|
||||
assert val == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"context, input_string, expected_value",
|
||||
[
|
||||
pytest.param(
|
||||
{"stream_slice": {"stream_slice_key": "hello"}},
|
||||
"{{ stream_slice['stream_slice_key'] }}",
|
||||
"hello",
|
||||
id="test_get_value_from_stream_slice"),
|
||||
pytest.param(
|
||||
{},
|
||||
"{{ stream_slice['stream_slice_key'] }}",
|
||||
None,
|
||||
id="test_get_value_from_stream_slice_no_slice"
|
||||
),
|
||||
pytest.param(
|
||||
{"stream_slice": {"stream_slice_key": "hello"}},
|
||||
"{{ stream_partition['stream_slice_key'] }}",
|
||||
"hello",
|
||||
id="test_get_value_from_stream_slicer"
|
||||
),
|
||||
pytest.param(
|
||||
{},
|
||||
"{{ stream_partition['stream_slice_key'] }}",
|
||||
None,
|
||||
id="test_get_value_from_stream_partition_no_stream_slice"
|
||||
),
|
||||
pytest.param(
|
||||
{"stream_slice": {"stream_slice_key": "hello"}},
|
||||
"{{ stream_interval['stream_slice_key'] }}",
|
||||
"hello",
|
||||
id="test_get_value_from_stream_interval"
|
||||
)
|
||||
],
|
||||
)
|
||||
def test_stream_slice_alias(context, input_string, expected_value):
|
||||
config = {}
|
||||
val = interpolation.eval(input_string, config, **context)
|
||||
assert val == expected_value
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alias", [
|
||||
pytest.param("stream_interval", id="test_error_is_raised_if_stream_interval_in_context"),
|
||||
pytest.param("stream_partition", id="test_error_is_raised_if_stream_partition_in_context"),
|
||||
]
|
||||
)
|
||||
def test_error_is_raised_if_alias_is_already_in_context(alias):
|
||||
config = {}
|
||||
context = {alias: "a_value"}
|
||||
with pytest.raises(ValueError):
|
||||
interpolation.eval("a_key", config, **context)
|
||||
|
||||
|
||||
def test_positive_day_delta():
|
||||
delta_template = "{{ day_delta(25) }}"
|
||||
interpolation = JinjaInterpolation()
|
||||
@@ -75,15 +129,15 @@ def test_negative_day_delta():
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"test_name, s, expected_value",
|
||||
"s, expected_value",
|
||||
[
|
||||
("test_timestamp_from_timestamp", "{{ timestamp(1621439283) }}", 1621439283),
|
||||
("test_timestamp_from_string", "{{ timestamp('2021-05-19') }}", 1621382400),
|
||||
("test_timestamp_from_rfc3339", "{{ timestamp('2017-01-01T00:00:00.0Z') }}", 1483228800),
|
||||
("test_max", "{{ max(1,2) }}", 2),
|
||||
pytest.param("{{ timestamp(1621439283) }}", 1621439283, id="test_timestamp_from_timestamp"),
|
||||
pytest.param("{{ timestamp('2021-05-19') }}", 1621382400, id="test_timestamp_from_string"),
|
||||
pytest.param("{{ timestamp('2017-01-01T00:00:00.0Z') }}", 1483228800, id="test_timestamp_from_rfc3339"),
|
||||
pytest.param("{{ max(1,2) }}", 2, id="test_max"),
|
||||
],
|
||||
)
|
||||
def test_macros(test_name, s, expected_value):
|
||||
def test_macros(s, expected_value):
|
||||
interpolation = JinjaInterpolation()
|
||||
config = {}
|
||||
val = interpolation.eval(s, config)
|
||||
|
||||
Reference in New Issue
Block a user