IMPALA-8564: Add table/view create time in the lineage graph

This patch adds table/view create time in the lineage graph. This is
needed for Impala/Atlas integration. See ATLAS-3080.

Below is an example of the updated lineage graph.
{
    "queryText":"create table lineage_test_tbl as select int_col, tinyint_col from functional.alltypes",
    "queryId":"0:0",
    "hash":"407f23b24758ffcb2ac445b9703f5c44",
    "user":"dummy_user",
    "timestamp":1547867921,
    "edges":[
        {
            "sources":[
                1
            ],
            "targets":[
                0
            ],
            "edgeType":"PROJECTION"
        },
        {
            "sources":[
                3
            ],
            "targets":[
                2
            ],
            "edgeType":"PROJECTION"
        }
    ],
    "vertices":[
        {
            "id":0,
            "vertexType":"COLUMN",
            "vertexId":"int_col",
            "metadata":{
                "tableName":"default.lineage_test_tbl",
                "tableCreateTime":1559151337
            }
        },
        {
            "id":1,
            "vertexType":"COLUMN",
            "vertexId":"functional.alltypes.int_col",
            "metadata":{
                "tableName":"functional.alltypes",
                "tableCreateTime":1559151317
            }
        },
        {
            "id":2,
            "vertexType":"COLUMN",
            "vertexId":"tinyint_col",
            "metadata":{
                "tableName":"default.lineage_test_tbl",
                "tableCreateTime":1559151337
            }
        },
        {
            "id":3,
            "vertexType":"COLUMN",
            "vertexId":"functional.alltypes.tinyint_col",
            "metadata":{
                "tableName":"functional.alltypes",
                "tableCreateTime":1559151317
            }
        }
    ]
}

Testing:
- Updated lineage tests in PlannerTest
- Updated test_lineage.py
- Ran all FE tests

Change-Id: If4f578d7b299a76c30323b10a883ba32f8713d82
Reviewed-on: http://gerrit.cloudera.org:8080/13399
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Fredy Wijaya
2019-05-21 18:01:21 -07:00
committed by Impala Public Jenkins
parent 95a1da2d32
commit d9af99589f
15 changed files with 2191 additions and 627 deletions

View File

@@ -32,14 +32,10 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
LOG = logging.getLogger(__name__)
class TestLineage(CustomClusterTestSuite):
lineage_log_dir = tempfile.mkdtemp()
query = """
select count(*) from functional.alltypes
"""
@classmethod
def setup_class(cls):
super(TestLineage, cls).setup_class()
@@ -55,7 +51,8 @@ class TestLineage(CustomClusterTestSuite):
UNIX times."""
LOG.info("lineage_event_log_dir is " + self.lineage_log_dir)
before_time = int(time.time())
result = self.execute_query_expect_success(self.client, self.query)
query = "select count(*) from functional.alltypes"
result = self.execute_query_expect_success(self.client, query)
profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
after_time = int(time.time())
LOG.info("before_time " + str(before_time) + " after_time " + str(after_time))
@@ -78,3 +75,32 @@ class TestLineage(CustomClusterTestSuite):
assert end_time <= after_time
else:
LOG.info("empty file: " + log_path)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}".format(lineage_log_dir))
def test_create_table_timestamp(self, vector, unique_database):
"""Test that 'createTableTime' in the lineage graph are populated with valid value
from HMS."""
query = "create table {0}.lineage_test_tbl as select int_col, tinyint_col " \
"from functional.alltypes".format(unique_database)
result = self.execute_query_expect_success(self.client, query)
profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1)
# Wait to flush the lineage log files.
time.sleep(3)
for log_filename in os.listdir(self.lineage_log_dir):
log_path = os.path.join(self.lineage_log_dir, log_filename)
# Only the coordinator's log file will be populated.
if os.path.getsize(log_path) > 0:
with open(log_path) as log_file:
lineage_json = json.load(log_file)
assert lineage_json["queryId"] == profile_query_id
vertices = lineage_json["vertices"]
for vertex in vertices:
if vertex["vertexId"] == "int_col":
assert "metadata" in vertex
table_name = vertex["metadata"]["tableName"]
table_create_time = int(vertex["metadata"]["tableCreateTime"])
assert "{0}.lineage_test_tbl".format(unique_database) == table_name
assert table_create_time != -1