IMPALA-14223: Cleanup subdirectories in INSERT OVERWRITE

If an external table contains data files in subdirectories, and
recursive listing is enabled, Impala considers the files in the
subdirectories as part of the table. However, currently INSERT OVERWRITE
and TRUNCATE do not always delete these files, leading to data
corruption.

This change takes care of INSERT OVERWRITE.

Before this change, for unpartitioned external tables, only top-level
data files were deleted and data files in subdirectories (whether
hidden, ignored or normal) were kept.

After this change, directories are also deleted in addition to
(non-hidden) data files, with the exception of hidden and ignored
directories. (Note: for ignored directories, see
--ignored_dir_prefix_list).

Note that for partitioned tables, INSERT OVERWRITE completely removes
the partition directories that are affected, and this change does not
alter that.

Testing:
 - extended the tests in test_recursive_listing.py::TestRecursiveListing

Change-Id: I1a40a22e18e6a384da982d300422ac8995ed0273
Reviewed-on: http://gerrit.cloudera.org:8080/23165
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
This commit is contained in:
Csaba Ringhofer
2025-07-01 18:23:49 +02:00
committed by Daniel Becker
parent c2705fa480
commit 95a073aa08
3 changed files with 95 additions and 34 deletions

View File

@@ -17,6 +17,7 @@
#include "runtime/dml-exec-state.h"
#include <algorithm>
#include <mutex>
#include <boost/algorithm/string.hpp>
@@ -47,6 +48,8 @@
DEFINE_bool(insert_inherit_permissions, false, "If true, new directories created by "
"INSERTs will inherit the permissions of their parent directories");
DECLARE_string(ignored_dir_prefix_list);
using namespace impala;
using boost::algorithm::is_any_of;
using boost::algorithm::split;
@@ -179,6 +182,61 @@ bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update,
return catalog_update->updated_partitions.size() != 0;
}
// Deletes data files and directories in 'path'. Keeps top-level
// - hidden files
// - hidden directories
// - ignored directories.
Status DeleteUnpartitionedDirData(const hdfsFS& fs_connection,
const string& path, HdfsOperationSet* hdfs_ops) {
int num_files = 0;
// hfdsListDirectory() only sets errno if there is an error, but it doesn't set
// it to 0 if the call succeed. When there is no error, errno could be any
// value. So need to clear errno before calling it.
// Once HDFS-8407 is fixed, the errno reset won't be needed.
errno = 0;
hdfsFileInfo* existing_files_and_dirs =
hdfsListDirectory(fs_connection, path.c_str(), &num_files);
if (existing_files_and_dirs == nullptr && errno == EAGAIN) {
errno = 0;
existing_files_and_dirs =
hdfsListDirectory(fs_connection, path.c_str(), &num_files);
}
// hdfsListDirectory() returns nullptr not only when there is an error but also
// when the directory is empty(HDFS-8407). Need to check errno to make sure
// the call fails.
if (existing_files_and_dirs == nullptr && errno != 0) {
return Status(GetHdfsErrorMsg("Could not list directory: ", path));
}
vector<string> ignored_prefixes;
boost::split(ignored_prefixes, FLAGS_ignored_dir_prefix_list,
[](char ch) { return ch == ','; });
for (int i = 0; i < num_files; ++i) {
const string file_or_dir_name =
boost::filesystem::path(existing_files_and_dirs[i].mName).filename().string();
if (!IsHiddenFile(file_or_dir_name)) {
if (existing_files_and_dirs[i].mKind == kObjectKindFile) {
hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
} else if (existing_files_and_dirs[i].mKind == kObjectKindDirectory) {
auto file_or_dir_name_starts_with_non_empty_prefix =
[&file_or_dir_name](const string& prefix) {
return !prefix.empty() && file_or_dir_name.find(prefix) == 0;
};
bool dir_ignored = std::any_of(ignored_prefixes.begin(), ignored_prefixes.end(),
file_or_dir_name_starts_with_non_empty_prefix);
if (!dir_ignored) {
hdfs_ops->Add(DELETE, existing_files_and_dirs[i].mName);
}
}
}
}
hdfsFreeFileInfo(existing_files_and_dirs, num_files);
return Status::OK();
}
Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
bool s3_skip_insert_staging, HdfsTableDescriptor* hdfs_table,
RuntimeProfile* profile) {
@@ -225,38 +283,9 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
if (partition.first.empty()) {
// If the root directory is written to, then the table must not be partitioned
DCHECK(per_partition_status_.size() == 1);
// We need to be a little more careful, and only delete data files in the root
// because the tmp directories the sink(s) wrote are there also.
// So only delete files in the table directory - all files are treated as data
// files by Hive and Impala, but directories are ignored (and may legitimately
// be used to store permanent non-table data by other applications).
int num_files = 0;
// hfdsListDirectory() only sets errno if there is an error, but it doesn't set
// it to 0 if the call succeed. When there is no error, errno could be any
// value. So need to clear errno before calling it.
// Once HDFS-8407 is fixed, the errno reset won't be needed.
errno = 0;
hdfsFileInfo* existing_files =
hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
if (existing_files == nullptr && errno == EAGAIN) {
errno = 0;
existing_files =
hdfsListDirectory(partition_fs_connection, part_path.c_str(), &num_files);
}
// hdfsListDirectory() returns nullptr not only when there is an error but also
// when the directory is empty(HDFS-8407). Need to check errno to make sure
// the call fails.
if (existing_files == nullptr && errno != 0) {
return Status(GetHdfsErrorMsg("Could not list directory: ", part_path));
}
for (int i = 0; i < num_files; ++i) {
const string filename =
boost::filesystem::path(existing_files[i].mName).filename().string();
if (existing_files[i].mKind == kObjectKindFile && !IsHiddenFile(filename)) {
partition_create_ops.Add(DELETE, existing_files[i].mName);
}
}
hdfsFreeFileInfo(existing_files, num_files);
RETURN_IF_ERROR(DeleteUnpartitionedDirData(
partition_fs_connection, part_path, &partition_create_ops));
} else {
// This is a partition directory, not the root directory; we can delete
// recursively with abandon, after checking that it ever existed.

View File

@@ -151,6 +151,37 @@ class TestRecursiveListing(ImpalaTestSuite):
assert len(self._show_files(fq_tbl_name)) == 0
assert len(self._get_rows(fq_tbl_name)) == 0
# Verify that INSERT OVERWRITE removes data files in subdirectories too.
# Regression test for IMPALA-13778.
self.filesystem_client.create_file("{0}/file1.txt".format(part_path), "file1")
self.filesystem_client.make_dir("{0}/dir1".format(part_path))
self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path), "file1")
# Also add a hidden and an ignored dir.
self.filesystem_client.make_dir("{0}/_tmp.hiddendir".format(part_path))
self.filesystem_client.create_file(
"{0}/_tmp.hiddendir/file1.txt".format(part_path), "file1")
self.filesystem_client.make_dir("{0}/-tmp.ignoreddir".format(part_path))
self.filesystem_client.create_file(
"{0}/-tmp.ignoreddir/file1.txt".format(part_path), "file1")
self.execute_query_expect_success(self.client,
"insert overwrite {tbl} {part} select 'str'".format(
tbl=fq_tbl_name, part="partition(p=1)" if partitioned else ""))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
assert ((not partitioned)
== self.filesystem_client.exists("{0}/_tmp.hiddendir".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists(
"{0}/_tmp.hiddendir/file1.txt".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists("{0}/-tmp.ignoreddir".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists(
"{0}/-tmp.ignoreddir/file1.txt".format(part_path)))
@SkipIfFS.no_partial_listing
@pytest.mark.execute_serially
def test_large_staging_dirs(self, unique_database):

View File

@@ -67,7 +67,8 @@ class TestInsertBehaviour(ImpalaTestSuite):
TBL_NAME = "insert_overwrite_nopart"
table_dir = "%s/functional.db/%s/" % (WAREHOUSE, TBL_NAME)
hidden_file_locations = [".hidden", "_hidden"]
dir_locations = ["dir", ".hidden_dir"]
hidden_dir_locations = [".hidden_dir", "_hidden_dir"]
dir_locations = ["dir"] + hidden_dir_locations
for dir_ in dir_locations:
self.filesystem_client.make_dir(table_dir + dir_)
@@ -87,7 +88,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
assert self.filesystem_client.exists(table_dir + file_), "Hidden file {0} was " \
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_)
for dir_ in dir_locations:
for dir_ in hidden_dir_locations:
assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} was " \
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_)