mirror of
https://github.com/apache/impala.git
synced 2025-12-23 21:08:39 -05:00
IMPALA-10529: Fix hit DCHECK in DiskIoMgr::AssignQueue in core-s3 build
For start option "scratch_dirs", it only considers local filesystem as the default filesystem, regardless of the setting of DefaultFS(for a remote scratch dir, it needs to explicitly set it with the remote fs prefix). However, the function AssignQueue() would assign the queue based on not only the path string but also the default filesystem setting. For example, if scratch_dirs is set as "/tmp", the scratch dir is supposed to be in the local filesystem, but the AssignQueue() would consider it as "s3a://xxx/tmp" if a s3 path is set as the default fs. To fix this, the solution is to add a bool variable to AssignQueue() to decide whether or not to check the default fs setting when parsing the file path. For all of the scratch dirs, AssignQueue() won't check the default fs. Tests: Added a unit testcase: TmpFileMgrTest::TestSpillingWithRemoteDefaultFS. Ran and Passed TmpFileMgrTest. Change-Id: Ic07945abe65d90235aa8dea92dd3c3821a4f1f53 Reviewed-on: http://gerrit.cloudera.org:8080/17136 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Thomas Tauber-Marshall
parent
ca17e307ab
commit
d89c04bf80
@@ -810,22 +810,23 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
|
||||
int DiskIoMgr::AssignQueue(
|
||||
const char* file, int disk_id, bool expected_local, bool check_default_fs) {
|
||||
// If it's a remote range, check for an appropriate remote disk queue.
|
||||
if (!expected_local) {
|
||||
if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) {
|
||||
if (IsHdfsPath(file, check_default_fs) && FLAGS_num_remote_hdfs_io_threads > 0) {
|
||||
return RemoteDfsDiskId();
|
||||
}
|
||||
if (IsS3APath(file)) return RemoteS3DiskId();
|
||||
if (IsABFSPath(file)) return RemoteAbfsDiskId();
|
||||
if (IsADLSPath(file)) return RemoteAdlsDiskId();
|
||||
if (IsOzonePath(file)) return RemoteOzoneDiskId();
|
||||
if (IsS3APath(file, check_default_fs)) return RemoteS3DiskId();
|
||||
if (IsABFSPath(file, check_default_fs)) return RemoteAbfsDiskId();
|
||||
if (IsADLSPath(file, check_default_fs)) return RemoteAdlsDiskId();
|
||||
if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
|
||||
}
|
||||
// Assign to a local disk queue.
|
||||
DCHECK(!IsS3APath(file)); // S3 is always remote.
|
||||
DCHECK(!IsABFSPath(file)); // ABFS is always remote.
|
||||
DCHECK(!IsADLSPath(file)); // ADLS is always remote.
|
||||
DCHECK(!IsOzonePath(file)); // Ozone is always remote.
|
||||
DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
|
||||
DCHECK(!IsABFSPath(file, check_default_fs)); // ABFS is always remote.
|
||||
DCHECK(!IsADLSPath(file, check_default_fs)); // ADLS is always remote.
|
||||
DCHECK(!IsOzonePath(file, check_default_fs)); // Ozone is always remote.
|
||||
if (disk_id == -1) {
|
||||
// disk id is unknown, assign it an arbitrary one.
|
||||
disk_id = next_disk_id_.Add(1);
|
||||
|
||||
@@ -288,8 +288,10 @@ class DiskIoMgr : public CacheLineAligned {
|
||||
/// Determine which disk queue this file should be assigned to. Returns an index into
|
||||
/// disk_queues_. The disk_id is the volume ID for the local disk that holds the
|
||||
/// files, or -1 if unknown. Flag expected_local is true iff this impalad is
|
||||
/// co-located with the datanode for this file.
|
||||
int AssignQueue(const char* file, int disk_id, bool expected_local = false);
|
||||
/// co-located with the datanode for this file. Flag check_default_fs is false iff
|
||||
/// the file is a temporary file.
|
||||
int AssignQueue(
|
||||
const char* file, int disk_id, bool expected_local, bool check_default_fs);
|
||||
|
||||
int64_t min_buffer_size() const { return min_buffer_size_; }
|
||||
int64_t max_buffer_size() const { return max_buffer_size_; }
|
||||
|
||||
@@ -506,8 +506,8 @@ ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const c
|
||||
DCHECK_GE(disk_id, -1);
|
||||
DCHECK_GE(offset, 0);
|
||||
DCHECK_GE(len, 0);
|
||||
disk_id =
|
||||
ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(file, disk_id, expected_local);
|
||||
disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue(
|
||||
file, disk_id, expected_local, /* check_default_fs */ true);
|
||||
ScanRange* range = obj_pool->Add(new ScanRange);
|
||||
range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts,
|
||||
move(sub_ranges), metadata);
|
||||
|
||||
@@ -52,6 +52,9 @@ class TestEnv {
|
||||
/// If not called, a process memory tracker with no limit is created.
|
||||
void SetProcessMemTrackerArgs(int64_t bytes_limit, bool use_metrics);
|
||||
|
||||
/// Set the Default FS of ExecEnv.
|
||||
void SetDefaultFS(const string& fs) { exec_env_->default_fs_ = fs; }
|
||||
|
||||
/// Initialize the TestEnv with the specified arguments.
|
||||
Status Init();
|
||||
|
||||
|
||||
@@ -1757,4 +1757,32 @@ TEST_F(TmpFileMgrTest, TestTmpFileBufferPoolOneWriteDone) {
|
||||
TestTmpFileBufferPoolTearDown(tmp_file_mgr);
|
||||
}
|
||||
|
||||
/// Test setting a remote fs for the default fs, but should not affect the spilling.
|
||||
TEST_F(TmpFileMgrTest, TestSpillingWithRemoteDefaultFS) {
|
||||
vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1"});
|
||||
TmpFileMgr tmp_file_mgr;
|
||||
RemoveAndCreateDirs(tmp_dirs);
|
||||
string org_default_fs = test_env_->exec_env()->default_fs();
|
||||
string fake_remote_default_fs = "s3a://fake_s3";
|
||||
test_env_->SetDefaultFS(fake_remote_default_fs);
|
||||
|
||||
ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
|
||||
TUniqueId id;
|
||||
TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
|
||||
string data = "arbitrary data";
|
||||
MemRange data_mem_range(reinterpret_cast<uint8_t*>(&data[0]), data.size());
|
||||
|
||||
unique_ptr<TmpWriteHandle> handle;
|
||||
WriteRange::WriteDoneCallback callback = [this](const Status& status) {
|
||||
EXPECT_TRUE(status.ok());
|
||||
SignalCallback(status);
|
||||
};
|
||||
ASSERT_OK(file_group.Write(data_mem_range, callback, &handle));
|
||||
WaitForWrite(handle.get());
|
||||
WaitForCallbacks(1);
|
||||
file_group.Close();
|
||||
test_env_->SetDefaultFS(org_default_fs);
|
||||
test_env_->TearDownQueries();
|
||||
}
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -657,10 +657,14 @@ TmpFile::TmpFile(
|
||||
blacklisted_(false) {}
|
||||
|
||||
int TmpFile::AssignDiskQueue(bool is_local_buffer) const {
|
||||
// The file paths of TmpFiles are absolute paths, doesn't support default fs.
|
||||
if (is_local_buffer) {
|
||||
return file_group_->io_mgr_->AssignQueue(local_buffer_path_.c_str(), -1, true);
|
||||
// Assign a disk queue for a local buffer, which is associated with a remote file.
|
||||
return file_group_->io_mgr_->AssignQueue(local_buffer_path_.c_str(),
|
||||
/* disk_id */ -1, /* expected_local */ true, /* check_default_fs */ false);
|
||||
}
|
||||
return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, expected_local_);
|
||||
return file_group_->io_mgr_->AssignQueue(
|
||||
path_.c_str(), disk_id_, expected_local_, /* check_default_fs */ false);
|
||||
}
|
||||
|
||||
bool TmpFile::Blacklist(const ErrorMsg& msg) {
|
||||
@@ -1518,6 +1522,8 @@ Status TmpWriteHandle::Write(RequestContext* io_ctx, MemRange buffer,
|
||||
|
||||
// Set all member variables before calling AddWriteRange(): after it succeeds,
|
||||
// WriteComplete() may be called concurrently with the remainder of this function.
|
||||
// If the TmpFile is not local, the disk queue assigned should be for the
|
||||
// buffer.
|
||||
data_len_ = buffer.len();
|
||||
file_ = tmp_file;
|
||||
write_range_.reset(new WriteRange(tmp_file->path(), file_offset,
|
||||
|
||||
Reference in New Issue
Block a user