diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index ec15ff17f..31b1a50c5 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -232,6 +232,10 @@ struct THdfsTable { // Used so that each THdfsFileBlock can just reference an index in this list rather // than duplicate the list of network address, which helps reduce memory usage. 7: optional list network_addresses + + // Indicates that this table's partitions reside on more than one filesystem. + // TODO: remove once INSERT across filesystems is supported. + 8: optional bool multiple_filesystems } struct THBaseTable { diff --git a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java index 63474ee13..41e73b0bd 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/HdfsUri.java @@ -66,15 +66,13 @@ public class HdfsUri { throw new AnalysisException("URI path must be absolute: " + uriPath_); } try { - FileSystem fs = uriPath_.getFileSystem(FileSystemUtil.getConfiguration()); - if (!(fs instanceof DistributedFileSystem)) { + if (!FileSystemUtil.isDistributedFileSystem(uriPath_)) { throw new AnalysisException(String.format("URI location '%s' " + "must point to an HDFS file system.", uriPath_)); } } catch (IOException e) { throw new AnalysisException(e.getMessage(), e); } - // Fully-qualify the path uriPath_ = FileSystemUtil.createFullyQualifiedPath(uriPath_); if (registerPrivReq) { diff --git a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java index 41b134d11..cf8516805 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/InsertStmt.java @@ -14,11 +14,13 @@ package com.cloudera.impala.analysis; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ import com.cloudera.impala.catalog.Table; import com.cloudera.impala.catalog.Type; import com.cloudera.impala.catalog.View; import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.FileSystemUtil; import com.cloudera.impala.planner.DataSink; import com.cloudera.impala.thrift.THdfsFileFormat; import com.google.common.base.Joiner; @@ -347,7 +350,20 @@ public class InsertStmt extends StatementBase { "(%s) because Impala does not have WRITE access to at least one HDFS path" + ": %s", targetTableName_, hdfsTable.getFirstLocationWithoutWriteAccess())); } - + if (hdfsTable.spansMultipleFileSystems()) { + throw new AnalysisException(String.format("Unable to INSERT into target table " + + "(%s) because the table spans multiple file-systems.", targetTableName_)); + } + try { + if (!FileSystemUtil.isDistributedFileSystem(new Path(hdfsTable.getLocation()))) { + throw new AnalysisException(String.format("Unable to INSERT into target " + + "table (%s) because %s is not an HDFS file-system.", targetTableName_, + hdfsTable.getLocation())); + } + } catch (IOException e) { + throw new AnalysisException(String.format("Unable to INSERT into target " + + "table (%s): %s.", targetTableName_, e.getMessage()), e); + } for (int colIdx = 0; colIdx < numClusteringCols; ++colIdx) { Column col = hdfsTable.getColumns().get(colIdx); // Hive has a number of issues handling BOOLEAN partition columns (see HIVE-6590). diff --git a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java index 93e084532..515a6801d 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/LoadDataStmt.java @@ -132,6 +132,8 @@ public class LoadDataStmt extends StatementBase { try { Path source = sourceDataPath_.getPath(); FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration()); + // sourceDataPath_.analyze() ensured that path is on an HDFS filesystem. + Preconditions.checkState(fs instanceof DistributedFileSystem); DistributedFileSystem dfs = (DistributedFileSystem) fs; if (!dfs.exists(source)) { throw new AnalysisException(String.format( @@ -184,20 +186,33 @@ public class LoadDataStmt extends StatementBase { "location: ", hdfsTable.getFullName()); HdfsPartition partition; + String location; if (partitionSpec_ != null) { partition = hdfsTable.getPartition(partitionSpec_.getPartitionSpecKeyValues()); + location = partition.getLocation(); if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { throw new AnalysisException(noWriteAccessErrorMsg + partition.getLocation()); } } else { // "default" partition partition = hdfsTable.getPartitions().get(0); + location = hdfsTable.getLocation(); if (!hdfsTable.hasWriteAccess()) { throw new AnalysisException(noWriteAccessErrorMsg + hdfsTable.getLocation()); } } Preconditions.checkNotNull(partition); + // Until Frontend.loadTableData() can handle cross-filesystem and filesystems + // that aren't HDFS, require that source and dest are on the same HDFS. + if (!FileSystemUtil.isPathOnFileSystem(new Path(location), fs)) { + throw new AnalysisException(String.format( + "Unable to LOAD DATA into target table (%s) because source path (%s) and " + + "destination %s (%s) are on different file-systems.", + hdfsTable.getFullName(), + source, partitionSpec_ == null ? "table" : "partition", + partition.getLocation())); + } // Verify the files being loaded are supported. for (FileStatus fStatus: fs.listStatus(source)) { if (fs.isDirectory(fStatus.getPath())) continue; diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java index e1fb424c3..e8eddeb53 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java @@ -160,6 +160,9 @@ public class HdfsTable extends Table { // Sum of sizes of all Hdfs files in this table. Set in load(). private long totalHdfsBytes_; + // True iff the table's partitions are located on more than one filesystem. + private boolean multipleFileSystems_ = false; + // Base Hdfs directory where files of this table are stored. // For unpartitioned tables it is simply the path where all files live. // For partitioned tables it is the root directory @@ -236,6 +239,8 @@ public class HdfsTable extends Table { public Map> getFileDescMap() { return fileDescMap_; } + public boolean spansMultipleFileSystems() { return multipleFileSystems_; } + /** * Loads the file block metadata for the given collection of FileDescriptors. The * FileDescriptors are passed as a tree, where the first level is indexed by @@ -763,6 +768,8 @@ public class HdfsTable extends Table { try { // Each partition could reside on a different filesystem. FileSystem fs = partDirPath.getFileSystem(CONF); + multipleFileSystems_ = multipleFileSystems_ || + !FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs); if (fs.exists(partDirPath)) { // FileSystem does not have an API that takes in a timestamp and returns a list // of files that has been added/changed since. Therefore, we are calling @@ -1197,6 +1204,7 @@ public class HdfsTable extends Table { hdfsBaseDir_ = hdfsTable.getHdfsBaseDir(); nullColumnValue_ = hdfsTable.nullColumnValue; nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue; + multipleFileSystems_ = hdfsTable.multiple_filesystems; hostIndex_.populate(hdfsTable.getNetwork_addresses()); resetPartitionMd(); @@ -1255,6 +1263,7 @@ public class HdfsTable extends Table { THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), nullPartitionKeyValue_, nullColumnValue_, idToPartition); hdfsTable.setAvroSchema(avroSchema_); + hdfsTable.setMultiple_filesystems(multipleFileSystems_); if (includeFileDesc) { // Network addresses are used only by THdfsFileBlocks which are inside // THdfsFileDesc, so include network addreses only when including THdfsFileDesc. diff --git a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java index 56ac19999..253f32b1e 100644 --- a/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/com/cloudera/impala/common/FileSystemUtil.java @@ -198,6 +198,14 @@ public class FileSystemUtil { return fileName.startsWith(".") || fileName.startsWith("_"); } + /** + * Return true iff path is on a DFS filesystem. + */ + public static boolean isDistributedFileSystem(Path path) throws IOException { + FileSystem fs = path.getFileSystem(CONF); + return fs instanceof DistributedFileSystem; + } + public static DistributedFileSystem getDistributedFileSystem(Path path) throws IOException { FileSystem fs = path.getFileSystem(CONF); @@ -218,6 +226,21 @@ public class FileSystemUtil { return location.makeQualified(FileSystem.getDefaultUri(CONF), location); } + /** + * Return true iff the path is on the given filesystem. + */ + public static Boolean isPathOnFileSystem(Path path, FileSystem fs) { + try { + // Call makeQualified() for the side-effect of FileSystem.checkPath() which will + // throw an exception if path is not on fs. + fs.makeQualified(path); + return true; + } catch (IllegalArgumentException e) { + // Path is not on fs. + return false; + } + } + /** * Returns the configuration. */