mirror of
https://github.com/apache/impala.git
synced 2025-12-30 03:01:44 -05:00
S3: Disallow INSERT and LOAD DATA to non-HDFS or tables that span filesystems
We won't immediately support writing to tables that are not HDFS or have partitions across multiple filesystems. Now that we can load metadata for such tables, let's explicitly disallow this and issue errors. Previously, this was not allowed simply because the metadata load for these kinds of tables would have failed. For now, I've manually tested this change. Once we can actually create tables with these properties, I'll come back and add the test cases (we currently can't even create these tables with our current hive configuration). Change-Id: I3d355b8fecc1babc08de31159b1599ab497b8712 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/5141 Reviewed-by: Daniel Hecht <dhecht@cloudera.com> Tested-by: jenkins
This commit is contained in:
@@ -232,6 +232,10 @@ struct THdfsTable {
|
||||
// Used so that each THdfsFileBlock can just reference an index in this list rather
|
||||
// than duplicate the list of network address, which helps reduce memory usage.
|
||||
7: optional list<Types.TNetworkAddress> network_addresses
|
||||
|
||||
// Indicates that this table's partitions reside on more than one filesystem.
|
||||
// TODO: remove once INSERT across filesystems is supported.
|
||||
8: optional bool multiple_filesystems
|
||||
}
|
||||
|
||||
struct THBaseTable {
|
||||
|
||||
@@ -66,15 +66,13 @@ public class HdfsUri {
|
||||
throw new AnalysisException("URI path must be absolute: " + uriPath_);
|
||||
}
|
||||
try {
|
||||
FileSystem fs = uriPath_.getFileSystem(FileSystemUtil.getConfiguration());
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
if (!FileSystemUtil.isDistributedFileSystem(uriPath_)) {
|
||||
throw new AnalysisException(String.format("URI location '%s' " +
|
||||
"must point to an HDFS file system.", uriPath_));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
// Fully-qualify the path
|
||||
uriPath_ = FileSystemUtil.createFullyQualifiedPath(uriPath_);
|
||||
if (registerPrivReq) {
|
||||
|
||||
@@ -14,11 +14,13 @@
|
||||
|
||||
package com.cloudera.impala.analysis;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -31,6 +33,7 @@ import com.cloudera.impala.catalog.Table;
|
||||
import com.cloudera.impala.catalog.Type;
|
||||
import com.cloudera.impala.catalog.View;
|
||||
import com.cloudera.impala.common.AnalysisException;
|
||||
import com.cloudera.impala.common.FileSystemUtil;
|
||||
import com.cloudera.impala.planner.DataSink;
|
||||
import com.cloudera.impala.thrift.THdfsFileFormat;
|
||||
import com.google.common.base.Joiner;
|
||||
@@ -347,7 +350,20 @@ public class InsertStmt extends StatementBase {
|
||||
"(%s) because Impala does not have WRITE access to at least one HDFS path" +
|
||||
": %s", targetTableName_, hdfsTable.getFirstLocationWithoutWriteAccess()));
|
||||
}
|
||||
|
||||
if (hdfsTable.spansMultipleFileSystems()) {
|
||||
throw new AnalysisException(String.format("Unable to INSERT into target table " +
|
||||
"(%s) because the table spans multiple file-systems.", targetTableName_));
|
||||
}
|
||||
try {
|
||||
if (!FileSystemUtil.isDistributedFileSystem(new Path(hdfsTable.getLocation()))) {
|
||||
throw new AnalysisException(String.format("Unable to INSERT into target " +
|
||||
"table (%s) because %s is not an HDFS file-system.", targetTableName_,
|
||||
hdfsTable.getLocation()));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AnalysisException(String.format("Unable to INSERT into target " +
|
||||
"table (%s): %s.", targetTableName_, e.getMessage()), e);
|
||||
}
|
||||
for (int colIdx = 0; colIdx < numClusteringCols; ++colIdx) {
|
||||
Column col = hdfsTable.getColumns().get(colIdx);
|
||||
// Hive has a number of issues handling BOOLEAN partition columns (see HIVE-6590).
|
||||
|
||||
@@ -132,6 +132,8 @@ public class LoadDataStmt extends StatementBase {
|
||||
try {
|
||||
Path source = sourceDataPath_.getPath();
|
||||
FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration());
|
||||
// sourceDataPath_.analyze() ensured that path is on an HDFS filesystem.
|
||||
Preconditions.checkState(fs instanceof DistributedFileSystem);
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
if (!dfs.exists(source)) {
|
||||
throw new AnalysisException(String.format(
|
||||
@@ -184,20 +186,33 @@ public class LoadDataStmt extends StatementBase {
|
||||
"location: ", hdfsTable.getFullName());
|
||||
|
||||
HdfsPartition partition;
|
||||
String location;
|
||||
if (partitionSpec_ != null) {
|
||||
partition = hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues());
|
||||
location = partition.getLocation();
|
||||
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
|
||||
throw new AnalysisException(noWriteAccessErrorMsg + partition.getLocation());
|
||||
}
|
||||
} else {
|
||||
// "default" partition
|
||||
partition = hdfsTable.getPartitions().get(0);
|
||||
location = hdfsTable.getLocation();
|
||||
if (!hdfsTable.hasWriteAccess()) {
|
||||
throw new AnalysisException(noWriteAccessErrorMsg + hdfsTable.getLocation());
|
||||
}
|
||||
}
|
||||
Preconditions.checkNotNull(partition);
|
||||
|
||||
// Until Frontend.loadTableData() can handle cross-filesystem and filesystems
|
||||
// that aren't HDFS, require that source and dest are on the same HDFS.
|
||||
if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Unable to LOAD DATA into target table (%s) because source path (%s) and " +
|
||||
"destination %s (%s) are on different file-systems.",
|
||||
hdfsTable.getFullName(),
|
||||
source, partitionSpec_ == null ? "table" : "partition",
|
||||
partition.getLocation()));
|
||||
}
|
||||
// Verify the files being loaded are supported.
|
||||
for (FileStatus fStatus: fs.listStatus(source)) {
|
||||
if (fs.isDirectory(fStatus.getPath())) continue;
|
||||
|
||||
@@ -160,6 +160,9 @@ public class HdfsTable extends Table {
|
||||
// Sum of sizes of all Hdfs files in this table. Set in load().
|
||||
private long totalHdfsBytes_;
|
||||
|
||||
// True iff the table's partitions are located on more than one filesystem.
|
||||
private boolean multipleFileSystems_ = false;
|
||||
|
||||
// Base Hdfs directory where files of this table are stored.
|
||||
// For unpartitioned tables it is simply the path where all files live.
|
||||
// For partitioned tables it is the root directory
|
||||
@@ -236,6 +239,8 @@ public class HdfsTable extends Table {
|
||||
|
||||
public Map<String, List<FileDescriptor>> getFileDescMap() { return fileDescMap_; }
|
||||
|
||||
public boolean spansMultipleFileSystems() { return multipleFileSystems_; }
|
||||
|
||||
/**
|
||||
* Loads the file block metadata for the given collection of FileDescriptors. The
|
||||
* FileDescriptors are passed as a tree, where the first level is indexed by
|
||||
@@ -763,6 +768,8 @@ public class HdfsTable extends Table {
|
||||
try {
|
||||
// Each partition could reside on a different filesystem.
|
||||
FileSystem fs = partDirPath.getFileSystem(CONF);
|
||||
multipleFileSystems_ = multipleFileSystems_ ||
|
||||
!FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
|
||||
if (fs.exists(partDirPath)) {
|
||||
// FileSystem does not have an API that takes in a timestamp and returns a list
|
||||
// of files that has been added/changed since. Therefore, we are calling
|
||||
@@ -1197,6 +1204,7 @@ public class HdfsTable extends Table {
|
||||
hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
|
||||
nullColumnValue_ = hdfsTable.nullColumnValue;
|
||||
nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
|
||||
multipleFileSystems_ = hdfsTable.multiple_filesystems;
|
||||
hostIndex_.populate(hdfsTable.getNetwork_addresses());
|
||||
resetPartitionMd();
|
||||
|
||||
@@ -1255,6 +1263,7 @@ public class HdfsTable extends Table {
|
||||
THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
|
||||
nullPartitionKeyValue_, nullColumnValue_, idToPartition);
|
||||
hdfsTable.setAvroSchema(avroSchema_);
|
||||
hdfsTable.setMultiple_filesystems(multipleFileSystems_);
|
||||
if (includeFileDesc) {
|
||||
// Network addresses are used only by THdfsFileBlocks which are inside
|
||||
// THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
|
||||
|
||||
@@ -198,6 +198,14 @@ public class FileSystemUtil {
|
||||
return fileName.startsWith(".") || fileName.startsWith("_");
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true iff path is on a DFS filesystem.
|
||||
*/
|
||||
public static boolean isDistributedFileSystem(Path path) throws IOException {
|
||||
FileSystem fs = path.getFileSystem(CONF);
|
||||
return fs instanceof DistributedFileSystem;
|
||||
}
|
||||
|
||||
public static DistributedFileSystem getDistributedFileSystem(Path path)
|
||||
throws IOException {
|
||||
FileSystem fs = path.getFileSystem(CONF);
|
||||
@@ -218,6 +226,21 @@ public class FileSystemUtil {
|
||||
return location.makeQualified(FileSystem.getDefaultUri(CONF), location);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true iff the path is on the given filesystem.
|
||||
*/
|
||||
public static Boolean isPathOnFileSystem(Path path, FileSystem fs) {
|
||||
try {
|
||||
// Call makeQualified() for the side-effect of FileSystem.checkPath() which will
|
||||
// throw an exception if path is not on fs.
|
||||
fs.makeQualified(path);
|
||||
return true;
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Path is not on fs.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configuration.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user