Add MetricDefs, static definitions of metric metadata generated from json

Adds a static definition of the metric metadata used by Impala. The
metric names, descriptions, and other properties are defined in
common/thrift/metrics.json file, and the generate_metrics.py script
creates a thrift representation. The metric definitions are then
available in a constant map which is used at runtime to instantiate
metrics, looking them up in the map by the metric key.

New metrics should be defined by adding an entry to the list of metrics
in metrics.json with the following properties:

key:         The unique string identifying the metric. If the metric can
             be templated, e.g. rpc call duration, it may be a format
             string (in the format used by strings::Substitute()).
description: A text description of the metric. May also be a format
             string.
label:       A brief title for the metric, not currently used by
             Impala but provided for external tools.
units:       The unit of the metric. Must be a valid value of TUnit.
kind:        The kind of metric, e.g. GAUGE or COUNTER. Must be a valid
             value of TMetricKind.
contexts:    The context in which this metric may be instantiated.
             Usually "IMPALAD", "STATESTORED", "CATALOGD", but may be
             a different kind of 'entity'. Not currently used by
             Impala but provided for modeling purposes for external
             tools.

For example, adding the counter for the total number of queries run over
the lifetime of the impalad process might look like:

  {
    "key": "impala-server.num-queries",
    "description": "The total number of queries processed.",
    "label": "Queries",
    "units": "UNIT",
    "kind": "COUNTER",
    "contexts": [
      "IMPALAD"
    ]
  }

TODO: Incorporate 'label' into the metrics debug page.
TODO: Verify the context at runtime, e.g. verify 'contexts' contains,
      e.g. a DCHECK.

After the metric definition is added, the generate_metrics.py script
will generate the TMetricDefs.thrift that contains a TMetricDef for
the metric definition. At runtime, the metric can be instantiated
using the key defined in metrics.json. Gauges, Counters, and
Properties are instantiated using static methods on MetricGroup. Other
metric types are instantiated using static CreateAndRegister methods
on their associated classes.

TODO: Generate a thrift enum used to lookup metric defs.
TODO: Consolidate the instantiation of metrics that are created
      outside of metrics.h (i.e. collection metrics, memory metrics).
TODO: Need a better way to verify if metric definitions are missing.

Change-Id: Iba7f94144d0c34f273c502ce6b9a2130ea8fedaa
Reviewed-on: http://gerrit.cloudera.org:8080/330
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Matthew Jacobs
2015-04-23 13:24:29 -07:00
committed by Internal Jenkins
parent cf0b6bc595
commit fe87bb1563
28 changed files with 1773 additions and 266 deletions

3
.gitignore vendored
View File

@@ -37,6 +37,3 @@ tests/test-hive-udfs/target/
cdh-*-hdfs-data/
avro_schemas/
cluster_logs/
# This file is auto-generated in the build process
common/thrift/ErrorCodes.thrift

View File

@@ -65,6 +65,10 @@ set(SRC_FILES
JniCatalog_constants.cpp
JniCatalog_types.cpp
Logging_types.cpp
Metrics_constants.cpp
Metrics_types.cpp
MetricDefs_constants.cpp
MetricDefs_types.cpp
NetworkTest_constants.cpp
NetworkTest_types.cpp
NetworkTestService.cpp

View File

@@ -150,12 +150,10 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
topic_updates_ready_(false), last_sent_catalog_version_(0L),
catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
topic_processing_time_metric_ = metrics_->RegisterMetric(
new StatsMetric<double>(CATALOG_SERVER_TOPIC_PROCESSING_TIMES,
TUnit::TIME_S));
topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
}
Status CatalogServer::Start() {
TNetworkAddress subscriber_address =
MakeNetworkAddress(FLAGS_hostname, FLAGS_state_store_subscriber_port);

View File

@@ -67,8 +67,7 @@ int main(int argc, char** argv) {
StartThreadInstrumentation(metrics.get(), webserver.get());
InitRpcEventTracing(webserver.get());
metrics->AddProperty<string>("catalog.version", GetVersionString(true),
"catalogd build version");
metrics->AddProperty<string>("catalog.version", GetVersionString(true));
CatalogServer catalog_server(metrics.get());
EXIT_IF_ERROR(catalog_server.Start());

View File

@@ -106,77 +106,42 @@ ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses,
active_llama_handle_metric_ = metrics->AddProperty<string>(
"resource-broker.active-llama-handle", "none");
reservation_rpc_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.reservation-request-rpc-time",
TUnit::TIME_S, "The time, in seconds, that a Reserve() RPC takes to "
"Llama"));
reservation_response_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.reservation-request-response-time",
TUnit::TIME_S, "The time, in seconds, that a reservation request takes "
"to be fulfilled by Llama"));
reservation_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.reservation-request-rpc-time");
reservation_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.reservation-request-response-time");
reservation_requests_total_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-total", 0, TUnit::UNIT,
"The total number of reservation requests made by this Impala daemon to Llama");
"resource-broker.reservation-requests-total", 0);
reservation_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-fulfilled", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which succeeded");
"resource-broker.reservation-requests-fulfilled", 0);
reservation_requests_failed_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-failed", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama which "
"failed");
"resource-broker.reservation-requests-failed", 0);
reservation_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-rejected", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which were rejected");
"resource-broker.reservation-requests-rejected", 0);
reservation_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-timedout", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which timed out");
"resource-broker.reservation-requests-timedout", 0);
expansion_rpc_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.expansion-request-rpc-time",
TUnit::TIME_S,
"The time, in seconds, that a Reserve() RPC takes to Llama"));
expansion_response_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.expansion-request-response-time",
TUnit::TIME_S, "The time, in seconds, that a expansion request takes "
"to be fulfilled by Llama"));
expansion_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.expansion-request-rpc-time");
expansion_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.expansion-request-response-time");
expansion_requests_total_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-total", 0, TUnit::UNIT,
"The total number of expansion requests made by this Impala daemon to Llama");
"resource-broker.expansion-requests-total", 0);
expansion_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-fulfilled", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which succeeded");
"resource-broker.expansion-requests-fulfilled", 0);
expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-failed", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama which "
"failed");
"resource-broker.expansion-requests-failed", 0);
expansion_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-rejected", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which were rejected");
"resource-broker.expansion-requests-rejected", 0);
expansion_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-timedout", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which timed out");
"resource-broker.expansion-requests-timedout", 0);
requests_released_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.requests-released", 0, TUnit::UNIT,
"The number of resource-release requests received from Llama");
"resource-broker.requests-released", 0);
allocated_memory_metric_ = metrics->AddGauge<uint64_t>(
"resource-broker.memory-resources-in-use", 0L, TUnit::BYTES, "The total"
" number of bytes currently allocated to this Impala daemon by Llama");
"resource-broker.memory-resources-in-use", 0L);
allocated_vcpus_metric_ = metrics->AddGauge<uint64_t>(
"resource-broker.vcpu-resources-in-use", 0, TUnit::UNIT, "The total number "
"of vcpus currently allocated to this Impala daemon by Llama");
requests_released_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.requests-released", 0, TUnit::UNIT, "The total number of "
"resource allocations released by this Impala daemon");
"resource-broker.vcpu-resources-in-use", 0);
}
Status ResourceBroker::Init() {

View File

@@ -30,6 +30,9 @@ using namespace impala;
using namespace rapidjson;
using namespace strings;
// Metric key format for rpc call duration metrics.
const string RPC_TIME_STATS_METRIC_KEY = "rpc-method.$0.call_duration";
// Singleton class to keep track of all RpcEventHandlers, and to render them to a
// web-based summary page.
class RpcEventHandlerManager {
@@ -165,10 +168,9 @@ void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
if (it == method_map_.end()) {
MethodDescriptor* descriptor = new MethodDescriptor();
descriptor->name = fn_name;
const string& time_metric_name =
Substitute("rpc-method.$0.$1.call_duration", server_name_, descriptor->name);
descriptor->time_stats = metrics_->RegisterMetric(
new StatsMetric<double>(time_metric_name, TUnit::TIME_MS));
const string& rpc_name = Substitute("$0.$1", server_name_, descriptor->name);
descriptor->time_stats = StatsMetric<double>::CreateAndRegister(metrics_,
RPC_TIME_STATS_METRIC_KEY, rpc_name);
it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
}
}

View File

@@ -392,7 +392,14 @@ Status Coordinator::Exec(QuerySchedule& schedule,
query_events_->MarkEvent("Ready to start remote fragments");
int backend_num = 0;
StatsMetric<double> latencies("fragment-latencies", TUnit::TIME_NS);
// TODO: Add a runtime-profile stats mechanism so this doesn't need to create a
// non-registered TMetricDef.
TMetricDef md;
md.__set_key("fragment-latencies");
md.__set_units(TUnit::TIME_NS);
md.__set_kind(TMetricKind::STATS);
StatsMetric<double> latencies(md);
for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
fragment_idx < request.fragments.size(); ++fragment_idx) {
const FragmentExecParams& params = (*fragment_exec_params)[fragment_idx];

View File

@@ -49,7 +49,11 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
}
TEST(MemTestTest, ConsumptionMetric) {
UIntGauge metric("test", TUnit::BYTES, 0);
TMetricDef md;
md.__set_key("test");
md.__set_units(TUnit::BYTES);
md.__set_kind(TMetricKind::GAUGE);
UIntGauge metric(md, 0);
EXPECT_EQ(metric.value(), 0);
MemTracker t(&metric, 100, -1, "");

View File

@@ -201,15 +201,14 @@ MemTracker::~MemTracker() {
}
void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
num_gcs_metric_ = metrics->AddCounter(
Substitute("$0.num-gcs", prefix), 0L, TUnit::UNIT);
num_gcs_metric_ = metrics->AddCounter(Substitute("$0.num-gcs", prefix), 0L);
// TODO: Consider a total amount of bytes freed counter
bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
Substitute("$0.bytes-freed-by-last-gc", prefix), -1, TUnit::BYTES);
Substitute("$0.bytes-freed-by-last-gc", prefix), -1);
bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
Substitute("$0.bytes-over-limit", prefix), -1, TUnit::BYTES);
Substitute("$0.bytes-over-limit", prefix), -1);
}
// Calling this on the query tracker results in output like:

View File

@@ -690,40 +690,40 @@ AdmissionController::GetPoolMetrics(const string& pool_name) {
PoolMetrics* pool_metrics = &pool_metrics_map_[pool_name];
pool_metrics->local_admitted = metrics_->AddCounter(
Substitute(LOCAL_ADMITTED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_ADMITTED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_queued = metrics_->AddCounter(
Substitute(LOCAL_QUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_QUEUED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_dequeued = metrics_->AddCounter(
Substitute(LOCAL_DEQUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_DEQUEUED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_rejected = metrics_->AddCounter(
Substitute(LOCAL_REJECTED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_REJECTED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_timed_out = metrics_->AddCounter(
Substitute(LOCAL_TIMED_OUT_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_TIMED_OUT_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_completed = metrics_->AddCounter(
Substitute(LOCAL_COMPLETED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_COMPLETED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_time_in_queue_ms = metrics_->AddCounter(
Substitute(LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_num_running = metrics_->AddGauge(
Substitute(CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_in_queue = metrics_->AddGauge(
Substitute(CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_mem_usage = metrics_->AddGauge(
Substitute(CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_mem_estimate = metrics_->AddGauge(
Substitute(CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_num_running = metrics_->AddGauge(
Substitute(LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_in_queue = metrics_->AddGauge(
Substitute(LOCAL_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_mem_usage = metrics_->AddGauge(
Substitute(LOCAL_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_MEM_USAGE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_mem_estimate = metrics_->AddGauge(
Substitute(LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT, 0L, pool_name);
return pool_metrics;
}
}

View File

@@ -67,8 +67,8 @@ const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-durat
RequestPoolService::RequestPoolService(MetricGroup* metrics) :
metrics_(metrics), resolve_pool_ms_metric_(NULL) {
DCHECK(metrics_ != NULL);
resolve_pool_ms_metric_ = metrics_->RegisterMetric(
new StatsMetric<double>(RESOLVE_POOL_METRIC_NAME, TUnit::TIME_MS));
resolve_pool_ms_metric_ =
StatsMetric<double>::CreateAndRegister(metrics_, RESOLVE_POOL_METRIC_NAME);
if (FLAGS_fair_scheduler_allocation_path.empty() &&
FLAGS_llama_site_path.empty()) {

View File

@@ -105,20 +105,15 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
"statestore-subscriber.last-recovery-duration", 0.0);
last_recovery_time_metric_ = metrics_->AddProperty<string>(
"statestore-subscriber.last-recovery-time", "N/A");
topic_update_interval_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.topic-update-interval-time",
TUnit::TIME_S));
topic_update_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.topic-update-duration",
TUnit::TIME_S));
heartbeat_interval_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.heartbeat-interval-time",
TUnit::TIME_S));
topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-interval-time");
topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-duration");
heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.heartbeat-interval-time");
registration_id_metric_ = metrics->AddProperty<string>(
"statestore-subscriber.registration-id", "N/A",
"The most recent registration ID for this subscriber with the statestore. Set to "
"'N/A' if no registration has been completed");
"statestore-subscriber.registration-id", "N/A");
client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
}
@@ -130,9 +125,8 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
Callbacks* cb = &(update_callbacks_[topic_id]);
cb->callbacks.push_back(callback);
if (cb->processing_time_metric == NULL) {
const string& metric_name = Substitute(CALLBACK_METRIC_PATTERN, topic_id);
cb->processing_time_metric = metrics_->RegisterMetric(
new StatsMetric<double>(metric_name, TUnit::TIME_S));
cb->processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
CALLBACK_METRIC_PATTERN, topic_id);
}
topic_registrations_[topic_id] = is_transient;
return Status::OK;

View File

@@ -234,17 +234,16 @@ Statestore::Statestore(MetricGroup* metrics)
DCHECK(metrics != NULL);
num_subscribers_metric_ =
metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0L);
subscriber_set_metric_ =
metrics->RegisterMetric(new SetMetric<string>(STATESTORE_LIVE_SUBSCRIBERS_LIST,
set<string>()));
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0L);
value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0L);
topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0L);
topic_update_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>(STATESTORE_UPDATE_DURATION, TUnit::TIME_S));
heartbeat_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>(STATESTORE_HEARTBEAT_DURATION, TUnit::TIME_S));
topic_update_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
heartbeat_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");

View File

@@ -38,8 +38,7 @@ const std::string IMPALA_CGROUP_SUFFIX = "_impala";
const int32_t CPU_DEFAULT_WEIGHT = 1024;
CgroupsMgr::CgroupsMgr(MetricGroup* metrics) {
active_cgroups_metric_ =
metrics->AddCounter<int64_t>("cgroups-mgr.active-cgroups", 0);
active_cgroups_metric_ = metrics->AddGauge<int64_t>("cgroups-mgr.active-cgroups", 0);
}
Status CgroupsMgr::Init(const string& cgroups_hierarchy_path,

View File

@@ -149,7 +149,7 @@ class CgroupsMgr {
std::string* cgroup_path, std::string* tasks_path) const;
/// Number of currently active Impala-managed cgroups.
IntCounter* active_cgroups_metric_;
IntGauge* active_cgroups_metric_;
/// Root of the CPU cgroup hierarchy. Created cgroups are placed directly under it.
std::string cgroups_hierarchy_path_;

View File

@@ -42,8 +42,15 @@ namespace impala {
template <typename T>
class SetMetric : public Metric {
public:
SetMetric(const std::string& key, const std::set<T>& value,
const std::string& description = "") : Metric(key, description), value_(value) { }
static SetMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::set<T>& value) {
return metrics->RegisterMetric(new SetMetric(MetricDefs::Get(key), value));
}
SetMetric(const TMetricDef& def, const std::set<T>& value)
: Metric(def), value_(value) {
DCHECK_EQ(def.kind, TMetricKind::SET);
}
/// Put an item in this set.
void Add(const T& item) {
@@ -106,8 +113,14 @@ class SetMetric : public Metric {
template <typename T>
class StatsMetric : public Metric {
public:
StatsMetric(const std::string& key, const TUnit::type unit,
const std::string& description = "") : Metric(key, description), unit_(unit) { }
static StatsMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::string& arg = "") {
return metrics->RegisterMetric(new StatsMetric(MetricDefs::Get(key, arg)));
}
StatsMetric(const TMetricDef& def) : Metric(def), unit_(def.units) {
DCHECK_EQ(def.kind, TMetricKind::STATS);
}
void Update(const T& value) {
boost::lock_guard<boost::mutex> l(lock_);

View File

@@ -154,9 +154,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
// Initialize memory usage metrics
MEM_POOL_TOTAL_BYTES = m->AddGauge<int64_t>(
ImpaladMetricKeys::MEM_POOL_TOTAL_BYTES, 0L, TUnit::BYTES);
ImpaladMetricKeys::MEM_POOL_TOTAL_BYTES, 0L);
HASH_TABLE_TOTAL_BYTES = m->AddGauge(
ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES, 0L, TUnit::BYTES);
ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES, 0L);
// Initialize insert metrics
NUM_FILES_OPEN_FOR_INSERT = m->AddGauge<int64_t>(
@@ -165,31 +165,25 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
// Initialize IO mgr metrics
IO_MGR_NUM_OPEN_FILES = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0L);
IO_MGR_NUM_BUFFERS = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0L);
IO_MGR_TOTAL_BYTES = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0L, TUnit::BYTES);
IO_MGR_NUM_BUFFERS = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0L);
IO_MGR_TOTAL_BYTES = m->AddGauge<int64_t>(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0L);
IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0L);
IO_MGR_BYTES_READ = m->AddGauge(
ImpaladMetricKeys::IO_MGR_BYTES_READ, 0L, TUnit::BYTES);
IO_MGR_BYTES_READ = m->AddGauge(ImpaladMetricKeys::IO_MGR_BYTES_READ, 0L);
IO_MGR_LOCAL_BYTES_READ = m->AddGauge(
ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0L, TUnit::BYTES);
ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0L);
IO_MGR_CACHED_BYTES_READ = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0L, TUnit::BYTES);
ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0L);
IO_MGR_SHORT_CIRCUIT_BYTES_READ = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0L, TUnit::BYTES);
ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0L);
IO_MGR_BYTES_WRITTEN = m->AddGauge<int64_t>(
ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0L, TUnit::BYTES);
ImpaladMetricKeys::IO_MGR_BYTES_WRITTEN, 0L);
// Initialize catalog metrics
CATALOG_NUM_DBS = m->AddGauge<int64_t>(
ImpaladMetricKeys::CATALOG_NUM_DBS, 0L);
CATALOG_NUM_TABLES = m->AddGauge<int64_t>(
ImpaladMetricKeys::CATALOG_NUM_TABLES, 0L);
CATALOG_READY = m->AddProperty<bool>(
ImpaladMetricKeys::CATALOG_READY, false);
CATALOG_NUM_DBS = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_DBS, 0L);
CATALOG_NUM_TABLES = m->AddGauge<int64_t>(ImpaladMetricKeys::CATALOG_NUM_TABLES, 0L);
CATALOG_READY = m->AddProperty<bool>(ImpaladMetricKeys::CATALOG_READY, false);
}
}

View File

@@ -31,22 +31,29 @@ TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = NULL;
TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = NULL;
TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = NULL;
TcmallocMetric* TcmallocMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
const string& tcmalloc_var) {
return metrics->RegisterMetric(
new TcmallocMetric(MetricDefs::Get(key), tcmalloc_var));
}
Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics) {
#ifndef ADDRESS_SANITIZER
TcmallocMetric::BYTES_IN_USE = metrics->RegisterMetric(new TcmallocMetric(
"tcmalloc.bytes-in-use", "generic.current_allocated_bytes"));
TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(metrics,
"tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
TcmallocMetric::TOTAL_BYTES_RESERVED = metrics->RegisterMetric(new TcmallocMetric(
"tcmalloc.total-bytes-reserved", "generic.heap_size"));
TcmallocMetric::TOTAL_BYTES_RESERVED = TcmallocMetric::CreateAndRegister(metrics,
"tcmalloc.total-bytes-reserved", "generic.heap_size");
TcmallocMetric::PAGEHEAP_FREE_BYTES = metrics->RegisterMetric(new TcmallocMetric(
"tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes"));
TcmallocMetric::PAGEHEAP_FREE_BYTES = TcmallocMetric::CreateAndRegister(metrics,
"tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes");
TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = metrics->RegisterMetric(new TcmallocMetric(
"tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes"));
TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = TcmallocMetric::CreateAndRegister(metrics,
"tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes");
TcmallocMetric::PHYSICAL_BYTES_RESERVED = metrics->RegisterMetric(
new TcmallocMetric::PhysicalBytesMetric("tcmalloc.physical-bytes-reserved"));
new TcmallocMetric::PhysicalBytesMetric(
MetricDefs::Get("tcmalloc.physical-bytes-reserved")));
#endif
if (register_jvm_metrics) {
@@ -55,11 +62,19 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
return Status::OK;
}
JvmMetric::JvmMetric(const string& key, const string& mempool_name, JvmMetricType type)
: IntGauge(key, TUnit::BYTES) {
JvmMetric* JvmMetric::CreateAndRegister(MetricGroup* metrics, const string& key,
const string& pool_name, JvmMetric::JvmMetricType type) {
string pool_name_for_key = pool_name;
to_lower(pool_name_for_key);
replace(pool_name_for_key.begin(), pool_name_for_key.end(), ' ', '-');
return metrics->RegisterMetric(new JvmMetric(MetricDefs::Get(key, pool_name_for_key),
pool_name, type));
}
JvmMetric::JvmMetric(const TMetricDef& def, const string& mempool_name,
JvmMetricType type) : IntGauge(def, 0) {
mempool_name_ = mempool_name;
metric_type_ = type;
}
Status JvmMetric::InitMetrics(MetricGroup* metrics) {
@@ -69,31 +84,20 @@ Status JvmMetric::InitMetrics(MetricGroup* metrics) {
TGetJvmMetricsResponse response;
RETURN_IF_ERROR(JniUtil::GetJvmMetrics(request, &response));
BOOST_FOREACH(const TJvmMemoryPool& usage, response.memory_pools) {
string name = usage.name;
to_lower(name);
replace(name.begin(), name.end(), ' ', '-');
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.max-usage-bytes", name), usage.name, MAX));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.current-usage-bytes", name),
usage.name, CURRENT));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.committed-usage-bytes", name), usage.name,
COMMITTED));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.init-usage-bytes", name), usage.name, INIT));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.peak-max-usage-bytes", name), usage.name,
PEAK_MAX));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.peak-current-usage-bytes", name),
usage.name, PEAK_CURRENT));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.peak-committed-usage-bytes", name), usage.name,
PEAK_COMMITTED));
metrics->RegisterMetric(
new JvmMetric(Substitute("jvm.$0.peak-init-usage-bytes", name), usage.name,
PEAK_INIT));
JvmMetric::CreateAndRegister(metrics, "jvm.$0.max-usage-bytes", usage.name, MAX);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.current-usage-bytes", usage.name,
CURRENT);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.committed-usage-bytes", usage.name,
COMMITTED);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.init-usage-bytes", usage.name, INIT);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-max-usage-bytes", usage.name,
PEAK_MAX);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-current-usage-bytes", usage.name,
PEAK_CURRENT);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-committed-usage-bytes", usage.name,
PEAK_COMMITTED);
JvmMetric::CreateAndRegister(metrics, "jvm.$0.peak-init-usage-bytes", usage.name,
PEAK_INIT);
}
return Status::OK;

View File

@@ -54,8 +54,7 @@ class TcmallocMetric : public UIntGauge {
/// include the tcmalloc metadata.
class PhysicalBytesMetric : public UIntGauge {
public:
PhysicalBytesMetric(const std::string& key)
: UIntGauge(key, TUnit::BYTES) { }
PhysicalBytesMetric(const TMetricDef& def) : UIntGauge(def, 0) { }
private:
virtual void CalculateValue() {
@@ -65,13 +64,16 @@ class TcmallocMetric : public UIntGauge {
static PhysicalBytesMetric* PHYSICAL_BYTES_RESERVED;
TcmallocMetric(const std::string& key, const std::string& tcmalloc_var)
: UIntGauge(key, TUnit::BYTES), tcmalloc_var_(tcmalloc_var) { }
static TcmallocMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::string& tcmalloc_var);
private:
/// Name of the tcmalloc property this metric should fetch.
const std::string tcmalloc_var_;
TcmallocMetric(const TMetricDef& def, const std::string& tcmalloc_var)
: UIntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
virtual void CalculateValue() {
MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(), &value_);
}
@@ -105,8 +107,11 @@ class JvmMetric : public IntGauge {
PEAK_CURRENT
};
static JvmMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::string& pool_name, JvmMetric::JvmMetricType type);
/// Private constructor to ensure only InitMetrics() can create JvmMetrics.
JvmMetric(const std::string& key, const std::string& mempool_name, JvmMetricType type);
JvmMetric(const TMetricDef& def, const std::string& mempool_name, JvmMetricType type);
/// The name of the memory pool, defined by the Jvm.
std::string mempool_name_;

View File

@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <boost/scoped_ptr.hpp>
#include <limits>
#include <map>
#include "util/jni-util.h"
#include "util/thread.h"
@@ -38,8 +39,37 @@ void AssertValue(M* metric, const T& value,
}
}
TEST(MetricsTest, CounterMetrics) {
class MetricsTest : public testing::Test {
public:
void ResetMetricDefs() {
MetricDefs::GetInstance()->metric_defs_ = g_MetricDefs_constants;
}
virtual void SetUp() {
ResetMetricDefs();
}
virtual void TearDown() {
ResetMetricDefs();
}
void AddMetricDef(const string& key, const TMetricKind::type kind,
const TUnit::type units, const string& desc = "") {
map<string, TMetricDef>& defs = MetricDefs::GetInstance()->metric_defs_.TMetricDefs;
EXPECT_EQ(defs.end(), defs.find(key));
TMetricDef def;
def.__set_key(key);
def.__set_kind(kind);
def.__set_units(units);
def.__set_description(desc);
defs.insert(pair<string, TMetricDef>(key, def));
}
};
TEST_F(MetricsTest, CounterMetrics) {
MetricGroup metrics("CounterMetrics");
AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT);
IntCounter* int_counter = metrics.AddCounter("counter", 0L);
AssertValue(int_counter, 0, "0");
int_counter->Increment(1);
@@ -49,13 +79,15 @@ TEST(MetricsTest, CounterMetrics) {
int_counter->set_value(3456);
AssertValue(int_counter, 3456, "3.46K");
AddMetricDef("counter_with_units", TMetricKind::COUNTER, TUnit::BYTES);
IntCounter* int_counter_with_units =
metrics.AddCounter("counter_with_units", 10L, TUnit::BYTES);
metrics.AddCounter("counter_with_units", 10L);
AssertValue(int_counter_with_units, 10, "10.00 B");
}
TEST(MetricsTest, GaugeMetrics) {
TEST_F(MetricsTest, GaugeMetrics) {
MetricGroup metrics("GaugeMetrics");
AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
IntGauge* int_gauge = metrics.AddGauge("gauge", 0L);
AssertValue(int_gauge, 0, "0");
int_gauge->Increment(-1);
@@ -65,18 +97,21 @@ TEST(MetricsTest, GaugeMetrics) {
int_gauge->set_value(3456);
AssertValue(int_gauge, 3456, "3456");
AddMetricDef("gauge_with_units", TMetricKind::GAUGE, TUnit::TIME_S);
IntGauge* int_gauge_with_units =
metrics.AddGauge("gauge_with_units", 10L, TUnit::TIME_S);
metrics.AddGauge("gauge_with_units", 10L);
AssertValue(int_gauge_with_units, 10, "10s000ms");
}
TEST(MetricsTest, PropertyMetrics) {
TEST_F(MetricsTest, PropertyMetrics) {
MetricGroup metrics("PropertyMetrics");
AddMetricDef("bool_property", TMetricKind::PROPERTY, TUnit::NONE);
BooleanProperty* bool_property = metrics.AddProperty("bool_property", false);
AssertValue(bool_property, false, "false");
bool_property->set_value(true);
AssertValue(bool_property, true, "true");
AddMetricDef("string_property", TMetricKind::PROPERTY, TUnit::NONE);
StringProperty* string_property = metrics.AddProperty("string_property",
string("string1"));
AssertValue(string_property, "string1", "string1");
@@ -84,8 +119,9 @@ TEST(MetricsTest, PropertyMetrics) {
AssertValue(string_property, "string2", "string2");
}
TEST(MetricsTest, NonFiniteValues) {
TEST_F(MetricsTest, NonFiniteValues) {
MetricGroup metrics("NanValues");
AddMetricDef("inf_value", TMetricKind::GAUGE, TUnit::NONE);
double inf = numeric_limits<double>::infinity();
DoubleGauge* gauge = metrics.AddGauge("inf_value", inf);
AssertValue(gauge, inf, "inf");
@@ -95,12 +131,13 @@ TEST(MetricsTest, NonFiniteValues) {
EXPECT_TRUE(gauge->ToHumanReadable() == "nan");
}
TEST(MetricsTest, SetMetrics) {
TEST_F(MetricsTest, SetMetrics) {
MetricGroup metrics("SetMetrics");
set<int> item_set;
item_set.insert(4); item_set.insert(5); item_set.insert(6);
SetMetric<int>* set_metric =
metrics.RegisterMetric(new SetMetric<int>("set", item_set));
AddMetricDef("set", TMetricKind::SET, TUnit::NONE);
SetMetric<int>* set_metric = SetMetric<int>::CreateAndRegister(&metrics, "set",
item_set);
EXPECT_EQ(set_metric->ToHumanReadable(), "[4, 5, 6]");
set_metric->Add(7);
@@ -111,11 +148,12 @@ TEST(MetricsTest, SetMetrics) {
EXPECT_EQ(set_metric->ToHumanReadable(), "[5, 6, 7]");
}
TEST(MetricsTest, StatsMetrics) {
TEST_F(MetricsTest, StatsMetrics) {
// Uninitialised stats metrics don't print anything other than the count
MetricGroup metrics("StatsMetrics");
StatsMetric<double>* stats_metric = metrics.RegisterMetric(
new StatsMetric<double>("stats", TUnit::UNIT));
AddMetricDef("stats", TMetricKind::STATS, TUnit::NONE);
StatsMetric<double>* stats_metric = StatsMetric<double>::CreateAndRegister(&metrics,
"stats");
EXPECT_EQ(stats_metric->ToHumanReadable(), "count: 0");
stats_metric->Update(0.0);
@@ -125,8 +163,9 @@ TEST(MetricsTest, StatsMetrics) {
EXPECT_EQ(stats_metric->ToHumanReadable(), "count: 3, last: 2.000000, min: 0.000000, "
"max: 2.000000, mean: 1.000000, stddev: 0.816497");
StatsMetric<double>* stats_metric_with_units = metrics.RegisterMetric(
new StatsMetric<double>("stats_units", TUnit::BYTES));
AddMetricDef("stats_units", TMetricKind::STATS, TUnit::BYTES);
StatsMetric<double>* stats_metric_with_units =
StatsMetric<double>::CreateAndRegister(&metrics, "stats_units");
EXPECT_EQ(stats_metric_with_units->ToHumanReadable(), "count: 0");
stats_metric_with_units->Update(0.0);
@@ -137,7 +176,7 @@ TEST(MetricsTest, StatsMetrics) {
"max: 2.00 B, mean: 1.00 B, stddev: 0.82 B");
}
TEST(MetricsTest, MemMetric) {
TEST_F(MetricsTest, MemMetric) {
#ifndef ADDRESS_SANITIZER
MetricGroup metrics("MemMetrics");
RegisterMemoryMetrics(&metrics, false);
@@ -170,7 +209,7 @@ TEST(MetricsTest, MemMetric) {
#endif
}
TEST(MetricsTest, JvmMetrics) {
TEST_F(MetricsTest, JvmMetrics) {
MetricGroup metrics("JvmMetrics");
RegisterMemoryMetrics(&metrics, true);
UIntGauge* jvm_total_used =
@@ -194,19 +233,21 @@ void AssertJson(const Value& val, const string& name, const string& value,
if (!units.empty()) EXPECT_EQ(val["units"].GetString(), units);
}
TEST(MetricsJsonTest, Counters) {
TEST_F(MetricsTest, CountersJson) {
MetricGroup metrics("CounterMetrics");
AddMetricDef("counter", TMetricKind::COUNTER, TUnit::UNIT, "description");
metrics.AddCounter("counter", 0L);
Document document;
Value val;
metrics.ToJson(true, &document, &val);
const Value& counter_val = val["metrics"][0u];
AssertJson(counter_val, "counter", "0", "", "COUNTER", "UNIT");
AssertJson(counter_val, "counter", "0", "description", "COUNTER", "UNIT");
EXPECT_EQ(counter_val["value"].GetInt(), 0);
}
TEST(MetricsJsonTest, Gauges) {
TEST_F(MetricsTest, GaugesJson) {
MetricGroup metrics("GaugeMetrics");
AddMetricDef("gauge", TMetricKind::GAUGE, TUnit::NONE);
metrics.AddGauge("gauge", 10L);
Document document;
Value val;
@@ -215,8 +256,9 @@ TEST(MetricsJsonTest, Gauges) {
EXPECT_EQ(val["metrics"][0u]["value"].GetInt(), 10);
}
TEST(MetricsJsonTest, Properties) {
TEST_F(MetricsTest, PropertiesJson) {
MetricGroup metrics("Properties");
AddMetricDef("property", TMetricKind::PROPERTY, TUnit::NONE);
metrics.AddProperty("property", string("my value"));
Document document;
Value val;
@@ -226,11 +268,12 @@ TEST(MetricsJsonTest, Properties) {
EXPECT_EQ(string(val["metrics"][0u]["value"].GetString()), "my value");
}
TEST(MetricsJsonTest, SetMetrics) {
TEST_F(MetricsTest, SetMetricsJson) {
MetricGroup metrics("SetMetrics");
set<int> item_set;
item_set.insert(4); item_set.insert(5); item_set.insert(6);
metrics.RegisterMetric(new SetMetric<int>("set", item_set));
AddMetricDef("set", TMetricKind::SET, TUnit::NONE);
SetMetric<int>::CreateAndRegister(&metrics, "set", item_set);
Document document;
Value val;
@@ -245,10 +288,11 @@ TEST(MetricsJsonTest, SetMetrics) {
AssertJson(set_val, "set", "[4, 5, 6]", "");
}
TEST(MetricsJsonTest, StatsMetrics) {
TEST_F(MetricsTest, StatsMetricsJson) {
MetricGroup metrics("StatsMetrics");
StatsMetric<double>* metric =
metrics.RegisterMetric(new StatsMetric<double>("stats_metric", TUnit::UNIT));
AddMetricDef("stats_metric", TMetricKind::STATS, TUnit::UNIT);
StatsMetric<double>* metric = StatsMetric<double>::CreateAndRegister(&metrics,
"stats_metric");
metric->Update(10.0);
metric->Update(20.0);
Document document;
@@ -266,9 +310,10 @@ TEST(MetricsJsonTest, StatsMetrics) {
EXPECT_EQ(stats_val["stddev"].GetDouble(), 5.0);
}
TEST(MetricsJsonTest, UnitsAndDescription) {
TEST_F(MetricsTest, UnitsAndDescriptionJson) {
MetricGroup metrics("Units");
metrics.AddCounter("counter", 2048, TUnit::BYTES, "description");
AddMetricDef("counter", TMetricKind::COUNTER, TUnit::BYTES, "description");
metrics.AddCounter("counter", 2048);
Document document;
Value val;
metrics.ToJson(true, &document, &val);
@@ -277,12 +322,15 @@ TEST(MetricsJsonTest, UnitsAndDescription) {
EXPECT_EQ(counter_val["value"].GetInt(), 2048);
}
TEST(MetricGroupTest, JsonTest) {
TEST_F(MetricsTest, MetricGroupJson) {
MetricGroup metrics("JsonTest");
metrics.AddCounter("counter1", 2048, TUnit::BYTES, "description");
metrics.AddCounter("counter2", 2048, TUnit::BYTES, "description");
AddMetricDef("counter1", TMetricKind::COUNTER, TUnit::BYTES, "description");
AddMetricDef("counter2", TMetricKind::COUNTER, TUnit::BYTES, "description");
metrics.AddCounter("counter1", 2048);
metrics.AddCounter("counter2", 2048);
metrics.GetChildGroup("child1");
AddMetricDef("child_counter", TMetricKind::COUNTER, TUnit::BYTES, "description");
metrics.GetChildGroup("child2")->AddCounter("child_counter", 0);
IntCounter* counter = metrics.FindMetricForTesting<IntCounter>(string("child_counter"));

View File

@@ -52,7 +52,30 @@ void Metric::AddStandardFields(Document* document, Value* val) {
val->AddMember("human_readable", metric_value, document->GetAllocator());
}
MetricGroup::MetricGroup(const std::string& name)
MetricDefs* MetricDefs::GetInstance() {
// Note that this is not thread-safe in C++03 (but will be in C++11 see
// http://stackoverflow.com/a/19907903/132034). We don't bother with the double-check
// locking pattern because it introduces complexity whereas a race is very unlikely
// and it doesn't matter if we construct two instances since MetricDefsConstants is
// just a constant map.
static MetricDefs instance;
return &instance;
}
TMetricDef MetricDefs::Get(const string& key, const string& arg) {
MetricDefs* inst = GetInstance();
map<string, TMetricDef>::iterator it = inst->metric_defs_.TMetricDefs.find(key);
if (it == inst->metric_defs_.TMetricDefs.end()) {
DCHECK(false) << "Could not find metric definition for key=" << key << " arg=" << arg;
return TMetricDef();
}
TMetricDef md = it->second;
md.__set_key(Substitute(md.key, arg));
md.__set_description(Substitute(md.description, arg));
return md;
}
MetricGroup::MetricGroup(const string& name)
: obj_pool_(new ObjectPool()), name_(name) { }
Status MetricGroup::Init(Webserver* webserver) {
@@ -166,7 +189,7 @@ void MetricGroup::ToJson(bool include_children, Document* document, Value* out_v
*out_val = container;
}
MetricGroup* MetricGroup::GetChildGroup(const std::string& name) {
MetricGroup* MetricGroup::GetChildGroup(const string& name) {
lock_guard<mutex> l(lock_);
ChildGroupMap::iterator it = children_.find(name);
if (it != children_.end()) return it->second;

View File

@@ -33,8 +33,35 @@
#include "util/pretty-printer.h"
#include "util/webserver.h"
#include "gen-cpp/MetricDefs_types.h"
#include "gen-cpp/MetricDefs_constants.h"
namespace impala {
/// Singleton that provides metric definitions. Metrics are defined in metrics.json
/// and generate_metrics.py produces MetricDefs.thrift. This singleton wraps an instance
/// of the thrift definitions.
class MetricDefs {
public:
/// Gets the TMetricDef for the metric key. 'arg' is an optional argument to the
/// TMetricDef for metrics defined by a format string. The key must exist or a DCHECK
/// will fail.
/// TODO: Support multiple arguments.
static TMetricDef Get(const std::string& key, const std::string& arg = "");
private:
friend class MetricsTest;
/// Gets the MetricDefs singleton.
static MetricDefs* GetInstance();
/// Contains the map of all TMetricDefs, non-const for testing
MetricDefsConstants metric_defs_;
MetricDefs() { };
DISALLOW_COPY_AND_ASSIGN(MetricDefs);
};
/// A metric is a container for some value, identified by a string key. Most metrics are
/// numeric, but this metric base-class is general enough such that metrics may be lists,
/// maps, histograms or other arbitrary structures.
@@ -81,8 +108,7 @@ class Metric {
friend class MetricGroup;
Metric(const std::string& key, const std::string& description) :
key_(key), description_(description) { }
Metric(const TMetricDef& def) : key_(def.key), description_(def.description) { }
/// Convenience method to add standard fields (name, description, human readable string)
/// to 'val'.
@@ -103,14 +129,10 @@ class Metric {
template<typename T, TMetricKind::type metric_kind=TMetricKind::GAUGE>
class SimpleMetric : public Metric {
public:
SimpleMetric(const std::string& key, const TUnit::type unit,
const T& initial_value, const std::string& description = "")
: Metric(key, description), unit_(unit), value_(initial_value)
{ }
SimpleMetric(const std::string& key, const TUnit::type unit,
const std::string& description = "")
: Metric(key, description), unit_(unit) { }
SimpleMetric(const TMetricDef& metric_def, const T& initial_value)
: Metric(metric_def), unit_(metric_def.units), value_(initial_value) {
DCHECK_EQ(metric_kind, metric_def.kind);
}
virtual ~SimpleMetric() { }
@@ -220,27 +242,24 @@ class MetricGroup {
/// Create a gauge metric object with given key and initial value (owned by this object)
template<typename T>
SimpleMetric<T>* AddGauge(const std::string& key,
const T& value, const TUnit::type unit = TUnit::NONE,
const std::string& description = "") {
return RegisterMetric(new SimpleMetric<T, TMetricKind::GAUGE>
(key, unit, value, description));
SimpleMetric<T>* AddGauge(const std::string& key, const T& value,
const std::string& metric_def_arg = "") {
return RegisterMetric(new SimpleMetric<T, TMetricKind::GAUGE>(
MetricDefs::Get(key, metric_def_arg), value));
}
template<typename T>
SimpleMetric<T, TMetricKind::PROPERTY>* AddProperty(
const std::string& key, const T& value,
const std::string& description = "") {
return RegisterMetric(new SimpleMetric<T, TMetricKind::PROPERTY> (key,
TUnit::NONE, value, description));
SimpleMetric<T, TMetricKind::PROPERTY>* AddProperty(const std::string& key,
const T& value, const std::string& metric_def_arg = "") {
return RegisterMetric(new SimpleMetric<T, TMetricKind::PROPERTY>(
MetricDefs::Get(key, metric_def_arg), value));
}
template<typename T>
SimpleMetric<T, TMetricKind::COUNTER>* AddCounter(const std::string& key,
const T& value, const TUnit::type unit = TUnit::UNIT,
const std::string& description = "") {
return RegisterMetric(
new SimpleMetric<T, TMetricKind::COUNTER>(key, unit, value, description));
const T& value, const std::string& metric_def_arg = "") {
return RegisterMetric(new SimpleMetric<T, TMetricKind::COUNTER>(
MetricDefs::Get(key, metric_def_arg), value));
}
/// Returns a metric by key. All MetricGroups reachable from this group are searched in

View File

@@ -1 +1,3 @@
Opcodes.thrift
ErrorCodes.thrift
MetricDefs.thrift

View File

@@ -161,6 +161,8 @@ set (SRC_FILES
Llama.thrift
Logging.thrift
NetworkTest.thrift
MetricDefs.thrift
Metrics.thrift
PlanNodes.thrift
Planner.thrift
Partitions.thrift
@@ -176,6 +178,10 @@ add_custom_command(OUTPUT ErrorCodes.thrift
COMMAND python generate_error_codes.py
DEPENDS generate_error_codes.py)
add_custom_command(OUTPUT MetricDefs.thrift
COMMAND python generate_metrics.py
DEPENDS generate_metrics.py metrics.json)
# Create a build command for each of the thrift src files and generate
# a list of files they produce
THRIFT_GEN(THRIFT_ALL_FILES ${SRC_FILES})

View File

@@ -0,0 +1,45 @@
// Copyright 2015 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace cpp impala
namespace java com.cloudera.impala.thrift
// Metric and counter data types.
enum TUnit {
// A dimensionless numerical quantity
UNIT,
// Rate of a dimensionless numerical quantity
UNIT_PER_SECOND,
CPU_TICKS,
BYTES
BYTES_PER_SECOND,
TIME_NS,
DOUBLE_VALUE,
// No units at all, may not be a numerical quantity
NONE,
TIME_MS,
TIME_S
}
// The kind of value that a metric represents.
enum TMetricKind {
// May go up or down over time
GAUGE,
// A strictly increasing value
COUNTER,
// Fixed; will never change
PROPERTY,
STATS,
SET
}

View File

@@ -15,38 +15,12 @@
namespace cpp impala
namespace java com.cloudera.impala.thrift
// Metric and counter data types.
enum TUnit {
// A dimensionless numerical quantity
UNIT,
// Rate of a dimensionless numerical quantity
UNIT_PER_SECOND,
CPU_TICKS,
BYTES
BYTES_PER_SECOND,
TIME_NS,
DOUBLE_VALUE,
// No units at all, may not be a numerical quantity
NONE,
TIME_MS,
TIME_S
}
// The kind of value that a metric represents.
enum TMetricKind {
// May go up or down over time
GAUGE,
// A strictly increasing value
COUNTER,
// Fixed; will never change
PROPERTY
// TODO: Stats metrics etc. should be here when we implement Metric::ToThrift()
}
include "Metrics.thrift"
// Counter data
struct TCounter {
1: required string name
2: required TUnit unit
2: required Metrics.TUnit unit
3: required i64 value
}
@@ -66,7 +40,7 @@ struct TEventSequence {
// This can be used to reconstruct a time line for a particular counter.
struct TTimeSeriesCounter {
1: required string name
2: required TUnit unit
2: required Metrics.TUnit unit
// Period of intervals in ms
3: required i32 period_ms

View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python
# Copyright 2015 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import re
import json
def load_metrics(source_file):
"""Reads the json file of metric definitions and returns a map of metric names to
metric definitions"""
raw_metrics = json.loads(open(source_file).read())
metrics = { }
for m in raw_metrics:
if m['key'] in metrics:
assert False, "Metric key %s already used, check definition of %s" % (m['key'], m)
m['kind'] = "Metrics.TMetricKind.%s" % m['kind']
m['units'] = "Metrics.TUnit.%s" % m['units']
metrics[m['key']] = m
return metrics
PREAMBLE = """
// Copyright 2015 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//
// THIS FILE IS AUTO GENERATED BY generate_metrics.py DO NOT MODIFY IT BY HAND.
//
namespace cpp impala
namespace java com.cloudera.impala.thrift
include "Metrics.thrift"
// All metadata associated with a metric. Used to instanciate metrics.
struct TMetricDef {
1: optional string key
2: optional Metrics.TMetricKind kind
3: optional Metrics.TUnit units
4: optional list<string> contexts
5: optional string label
6: optional string description
}
"""
if __name__ == "__main__":
thrift_path = os.path.join(os.getenv('IMPALA_HOME'), 'common/thrift')
metrics = load_metrics(os.path.join(thrift_path, "metrics.json"))
metrics_json = json.dumps(metrics, sort_keys=True, indent=2)
# dumps writes the TMetricKind and TUnit as quoted strings which is not
# interpreted by the thrift compiler correctly. Need to remove the quotes around
# the enum values.
metrics_json = re.sub(r'"(Metrics.TMetricKind.\S+)"', r'\1', metrics_json)
metrics_json = re.sub(r'"(Metrics.TUnit.\S+)"', r'\1', metrics_json)
# The script will always generate the file, CMake will take care of running it only if
# necessary.
target_file = os.path.join(thrift_path, "MetricDefs.thrift")
fid = open(target_file, "w")
try:
fid.write(PREAMBLE)
fid.write("const map<string,TMetricDef> TMetricDefs =\n")
fid.write(metrics_json)
finally:
fid.close()
print("%s created." % target_file)

1316
common/thrift/metrics.json Normal file

File diff suppressed because it is too large Load Diff