Fix volume id for partitioned table

This commit is contained in:
Alan Choi
2013-04-09 13:15:28 -07:00
committed by Henry Robinson
parent 612e1b22dc
commit 0b0367d82e

View File

@@ -21,9 +21,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
@@ -65,9 +62,6 @@ import com.cloudera.impala.thrift.TTableDescriptor;
import com.cloudera.impala.thrift.TTableType;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -117,7 +111,10 @@ public class HdfsTable extends Table {
public long getLength() { return length; }
public String[] getHostPorts() { return hostPorts; }
public void setDiskIds(int[] diskIds) { this.diskIds = diskIds; }
public void setDiskIds(int[] diskIds) {
Preconditions.checkArgument(diskIds.length == hostPorts.length);
this.diskIds = diskIds;
}
/**
* Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
@@ -379,8 +376,8 @@ public class HdfsTable extends Table {
// Get the BlockStorageLocations for all the blocks
locations = DFS.getFileBlockStorageLocations(blocks);
} catch (IOException e) {
throw new RuntimeException("couldn't determine block storage locations:\n"
+ e.getMessage(), e);
LOG.error("Couldn't determine block storage locations:\n" + e.getMessage());
return;
}
if (locations == null || locations.length == 0) {
@@ -403,21 +400,8 @@ public class HdfsTable extends Table {
// to turn them into indices.
// TODO: the diskId should be eventually retrievable from Hdfs when
// the community agrees this API is useful.
// For each host, this is a mapping of the VolumeId object to a 0 based index.
Map<String, Map<VolumeId, Integer>> hostDiskIds = Maps.newHashMap();
for (int i = 0; i < pair.second; ++i, ++locationsIdx) {
String[] hosts = null;
try {
hosts = locations[locationsIdx].getHosts();
} catch (IOException e) {
LOG.error("getHosts() failed: " + e.getMessage());
continue;
}
VolumeId[] volumeIds = locations[i].getVolumeIds();
Preconditions.checkState(hosts.length == volumeIds.length);
VolumeId[] volumeIds = locations[locationsIdx].getVolumeIds();
// For each block replica, the disk id for the block on that host
int[] diskIds = new int[volumeIds.length];
@@ -435,7 +419,7 @@ public class HdfsTable extends Table {
}
pair.first.setBlockDiskIds(i, diskIds);
}
}
}
LOG.info("loaded disk ids for table " + getFullName());
LOG.info(Integer.toString(getNumNodes()));
}
@@ -504,22 +488,22 @@ public class HdfsTable extends Table {
}
// Make sure the number of values match up and that some values were found.
if (targetValues.size() == 0 ||
if (targetValues.size() == 0 ||
(targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) {
return null;
}
}
// Now search through all the partitions and check if their partition key values match
// the values being searched for.
// the values being searched for.
for (HdfsPartition partition: getPartitions()) {
if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
continue;
}
List<LiteralExpr> partitionValues = partition.getPartitionValues();
Preconditions.checkState(partitionValues.size() == targetValues.size());
boolean matchFound = true;
for (int i = 0; i < targetValues.size(); ++i) {
String value = partitionValues.get(i) instanceof NullLiteral ?
String value = partitionValues.get(i) instanceof NullLiteral ?
getNullPartitionKeyValue() : partitionValues.get(i).getStringValue();
if (!targetValues.get(i).equals(value.toLowerCase())) {
matchFound = false;
@@ -783,6 +767,7 @@ public class HdfsTable extends Table {
return hdfsBaseDir;
}
@Override
public int getNumNodes() {
return uniqueHostPorts.size();
}