From fb1c9c46d64004307cdac556724cedf6ef87ff4e Mon Sep 17 00:00:00 2001 From: Dan Hecht Date: Wed, 5 Nov 2014 13:10:05 -0800 Subject: [PATCH] S3: Disallow INSERT and LOAD DATA to non-HDFS or tables that span filesystems We won't immediately support writing to tables that are not HDFS or have partitions across multiple filesystems. Now that we can load metadata for such tables, let's explicitly disallow this and issue errors. Previously, this was not allowed simply because the metadata load for these kinds of tables would have failed. For now, I've manually tested this change. Once we can actually create tables with these properties, I'll come back and add the test cases (we currently can't even create these tables with our current hive configuration). Change-Id: I3d355b8fecc1babc08de31159b1599ab497b8712 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/5141 Reviewed-by: Daniel Hecht Tested-by: jenkins --- common/thrift/CatalogObjects.thrift | 4 ++++ .../com/cloudera/impala/analysis/HdfsUri.java | 4 +--- .../cloudera/impala/analysis/InsertStmt.java | 18 ++++++++++++++- .../impala/analysis/LoadDataStmt.java | 15 ++++++++++++ .../cloudera/impala/catalog/HdfsTable.java | 9 ++++++++ .../impala/common/FileSystemUtil.java | 23 +++++++++++++++++++ 6 files changed, 69 insertions(+), 4 deletions(-) diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index ec15ff17f..31b1a50c5 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -232,6 +232,10 @@ struct THdfsTable { // Used so that each THdfsFileBlock can just reference an index in this list rather // than duplicate the list of network address, which helps reduce memory usage. 7: optional list network_addresses + + // Indicates that this table's partitions reside on more than one filesystem. + // TODO: remove once INSERT across filesystems is supported. + 8: optional bool multiple_filesystems } struct THBaseTable { diff --git a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java index 63474ee13..41e73b0bd 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java @@ -66,15 +66,13 @@ public class HdfsUri { throw new AnalysisException("URI path must be absolute: " + uriPath_); } try { - FileSystem fs = uriPath_.getFileSystem(FileSystemUtil.getConfiguration()); - if (!(fs instanceof DistributedFileSystem)) { + if (!FileSystemUtil.isDistributedFileSystem(uriPath_)) { throw new AnalysisException(String.format("URI location '%s' " + "must point to an HDFS file system.", uriPath_)); } } catch (IOException e) { throw new AnalysisException(e.getMessage(), e); } - // Fully-qualify the path uriPath_ = FileSystemUtil.createFullyQualifiedPath(uriPath_); if (registerPrivReq) { diff --git a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java index 41b134d11..cf8516805 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java @@ -14,11 +14,13 @@ package com.cloudera.impala.analysis; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ import com.cloudera.impala.catalog.Table; import com.cloudera.impala.catalog.Type; import com.cloudera.impala.catalog.View; import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.FileSystemUtil; import com.cloudera.impala.planner.DataSink; import com.cloudera.impala.thrift.THdfsFileFormat; import com.google.common.base.Joiner; @@ -347,7 +350,20 @@ public class InsertStmt extends StatementBase { "(%s) because Impala does not have WRITE access to at least one HDFS path" + ": %s", targetTableName_, hdfsTable.getFirstLocationWithoutWriteAccess())); } - + if (hdfsTable.spansMultipleFileSystems()) { + throw new AnalysisException(String.format("Unable to INSERT into target table " + + "(%s) because the table spans multiple file-systems.", targetTableName_)); + } + try { + if (!FileSystemUtil.isDistributedFileSystem(new Path(hdfsTable.getLocation()))) { + throw new AnalysisException(String.format("Unable to INSERT into target " + + "table (%s) because %s is not an HDFS file-system.", targetTableName_, + hdfsTable.getLocation())); + } + } catch (IOException e) { + throw new AnalysisException(String.format("Unable to INSERT into target " + + "table (%s): %s.", targetTableName_, e.getMessage()), e); + } for (int colIdx = 0; colIdx < numClusteringCols; ++colIdx) { Column col = hdfsTable.getColumns().get(colIdx); // Hive has a number of issues handling BOOLEAN partition columns (see HIVE-6590). diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java index 93e084532..515a6801d 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java @@ -132,6 +132,8 @@ public class LoadDataStmt extends StatementBase { try { Path source = sourceDataPath_.getPath(); FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration()); + // sourceDataPath_.analyze() ensured that path is on an HDFS filesystem. + Preconditions.checkState(fs instanceof DistributedFileSystem); DistributedFileSystem dfs = (DistributedFileSystem) fs; if (!dfs.exists(source)) { throw new AnalysisException(String.format( @@ -184,20 +186,33 @@ public class LoadDataStmt extends StatementBase { "location: ", hdfsTable.getFullName()); HdfsPartition partition; + String location; if (partitionSpec_ != null) { partition = hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues()); + location = partition.getLocation(); if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { throw new AnalysisException(noWriteAccessErrorMsg + partition.getLocation()); } } else { // "default" partition partition = hdfsTable.getPartitions().get(0); + location = hdfsTable.getLocation(); if (!hdfsTable.hasWriteAccess()) { throw new AnalysisException(noWriteAccessErrorMsg + hdfsTable.getLocation()); } } Preconditions.checkNotNull(partition); + // Until Frontend.loadTableData() can handle cross-filesystem and filesystems + // that aren't HDFS, require that source and dest are on the same HDFS. + if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) { + throw new AnalysisException(String.format( + "Unable to LOAD DATA into target table (%s) because source path (%s) and " + + "destination %s (%s) are on different file-systems.", + hdfsTable.getFullName(), + source, partitionSpec_ == null ? "table" : "partition", + partition.getLocation())); + } // Verify the files being loaded are supported. for (FileStatus fStatus: fs.listStatus(source)) { if (fs.isDirectory(fStatus.getPath())) continue; diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java index e1fb424c3..e8eddeb53 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java @@ -160,6 +160,9 @@ public class HdfsTable extends Table { // Sum of sizes of all Hdfs files in this table. Set in load(). private long totalHdfsBytes_; + // True iff the table's partitions are located on more than one filesystem. + private boolean multipleFileSystems_ = false; + // Base Hdfs directory where files of this table are stored. // For unpartitioned tables it is simply the path where all files live. // For partitioned tables it is the root directory @@ -236,6 +239,8 @@ public class HdfsTable extends Table { public Map> getFileDescMap() { return fileDescMap_; } + public boolean spansMultipleFileSystems() { return multipleFileSystems_; } + /** * Loads the file block metadata for the given collection of FileDescriptors. The * FileDescriptors are passed as a tree, where the first level is indexed by @@ -763,6 +768,8 @@ public class HdfsTable extends Table { try { // Each partition could reside on a different filesystem. FileSystem fs = partDirPath.getFileSystem(CONF); + multipleFileSystems_ = multipleFileSystems_ || + !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs); if (fs.exists(partDirPath)) { // FileSystem does not have an API that takes in a timestamp and returns a list // of files that has been added/changed since. Therefore, we are calling @@ -1197,6 +1204,7 @@ public class HdfsTable extends Table { hdfsBaseDir_ = hdfsTable.getHdfsBaseDir(); nullColumnValue_ = hdfsTable.nullColumnValue; nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue; + multipleFileSystems_ = hdfsTable.multiple_filesystems; hostIndex_.populate(hdfsTable.getNetwork_addresses()); resetPartitionMd(); @@ -1255,6 +1263,7 @@ public class HdfsTable extends Table { THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), nullPartitionKeyValue_, nullColumnValue_, idToPartition); hdfsTable.setAvroSchema(avroSchema_); + hdfsTable.setMultiple_filesystems(multipleFileSystems_); if (includeFileDesc) { // Network addresses are used only by THdfsFileBlocks which are inside // THdfsFileDesc, so include network addreses only when including THdfsFileDesc. diff --git a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java index 56ac19999..253f32b1e 100644 --- a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java @@ -198,6 +198,14 @@ public class FileSystemUtil { return fileName.startsWith(".") || fileName.startsWith("_"); } + /** + * Return true iff path is on a DFS filesystem. + */ + public static boolean isDistributedFileSystem(Path path) throws IOException { + FileSystem fs = path.getFileSystem(CONF); + return fs instanceof DistributedFileSystem; + } + public static DistributedFileSystem getDistributedFileSystem(Path path) throws IOException { FileSystem fs = path.getFileSystem(CONF); @@ -218,6 +226,21 @@ public class FileSystemUtil { return location.makeQualified(FileSystem.getDefaultUri(CONF), location); } + /** + * Return true iff the path is on the given filesystem. + */ + public static Boolean isPathOnFileSystem(Path path, FileSystem fs) { + try { + // Call makeQualified() for the side-effect of FileSystem.checkPath() which will + // throw an exception if path is not on fs. + fs.makeQualified(path); + return true; + } catch (IllegalArgumentException e) { + // Path is not on fs. + return false; + } + } + /** * Returns the configuration. */