mirror of
https://github.com/apache/impala.git
synced 2025-12-20 02:20:11 -05:00
IMPALA-12782: Show info of the event processing in /events webUI
The /events page of catalogd shows the metrics and status of the event-processor. This patch adds more info in this page, including - lag info - current event batch that's being processed See the screenshot attached in the JIRA for how it looks like. Also moves the error message to the top to highlight the error status. Fixes the issue of not updating latest event id when event processor is stopped. Also fixes the issue of error message not cleared after global INVALIDATE METADATA. Adds a debug action, catalogd_event_processing_delay, to inject a sleep while processing an event. So the web page can be captured more easily. Also adds a missing test for showing the error message of event-processing in the /events page. Tests: - Add e2e test to verify the content of the page. Change-Id: I2e7d4952c7fd04ae89b6751204499bf9dd99f57c Reviewed-on: http://gerrit.cloudera.org:8080/20986 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
63f52807f0
commit
effc9df933
@@ -30,6 +30,7 @@
|
|||||||
#include "util/collection-metrics.h"
|
#include "util/collection-metrics.h"
|
||||||
#include "util/debug-util.h"
|
#include "util/debug-util.h"
|
||||||
#include "util/event-metrics.h"
|
#include "util/event-metrics.h"
|
||||||
|
#include "util/json-util.h"
|
||||||
#include "util/logging-support.h"
|
#include "util/logging-support.h"
|
||||||
#include "util/metrics.h"
|
#include "util/metrics.h"
|
||||||
#include "util/pretty-printer.h"
|
#include "util/pretty-printer.h"
|
||||||
@@ -957,24 +958,84 @@ void CatalogServer::GetCatalogUsage(Document* document) {
|
|||||||
|
|
||||||
void CatalogServer::EventMetricsUrlCallback(
|
void CatalogServer::EventMetricsUrlCallback(
|
||||||
const Webserver::WebRequest& req, Document* document) {
|
const Webserver::WebRequest& req, Document* document) {
|
||||||
|
auto& allocator = document->GetAllocator();
|
||||||
TEventProcessorMetricsSummaryResponse event_processor_summary_response;
|
TEventProcessorMetricsSummaryResponse event_processor_summary_response;
|
||||||
Status status = catalog_->GetEventProcessorSummary(&event_processor_summary_response);
|
Status status = catalog_->GetEventProcessorSummary(&event_processor_summary_response);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
Value error(status.GetDetail().c_str(), document->GetAllocator());
|
Value error(status.GetDetail().c_str(), allocator);
|
||||||
document->AddMember("error", error, document->GetAllocator());
|
document->AddMember("error", error, allocator);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Value event_processor_summary(
|
Value event_processor_summary(
|
||||||
event_processor_summary_response.summary.c_str(), document->GetAllocator());
|
event_processor_summary_response.summary.c_str(), allocator);
|
||||||
document->AddMember(
|
document->AddMember("event_processor_metrics", event_processor_summary, allocator);
|
||||||
"event_processor_metrics", event_processor_summary, document->GetAllocator());
|
|
||||||
if (event_processor_summary_response.__isset.error_msg) {
|
if (event_processor_summary_response.__isset.error_msg) {
|
||||||
Value error_msg(
|
Value error_msg(event_processor_summary_response.error_msg.c_str(), allocator);
|
||||||
event_processor_summary_response.error_msg.c_str(), document->GetAllocator());
|
document->AddMember("event_processor_error_msg", error_msg, allocator);
|
||||||
document->AddMember(
|
|
||||||
"event_processor_error_msg", error_msg, document->GetAllocator());
|
|
||||||
}
|
}
|
||||||
|
const TEventBatchProgressInfo& progress_info =
|
||||||
|
event_processor_summary_response.progress;
|
||||||
|
JsonObjWrapper progress_info_obj(allocator);
|
||||||
|
// Add lag info
|
||||||
|
progress_info_obj.AddMember("last_synced_event_id", progress_info.last_synced_event_id);
|
||||||
|
progress_info_obj.AddMember("last_synced_event_time_s",
|
||||||
|
progress_info.last_synced_event_time_s);
|
||||||
|
progress_info_obj.AddMember("latest_event_id", progress_info.latest_event_id);
|
||||||
|
progress_info_obj.AddMember("latest_event_time_s", progress_info.latest_event_time_s);
|
||||||
|
int64_t lag_time = max(0L,
|
||||||
|
progress_info.latest_event_time_s - progress_info.last_synced_event_time_s);
|
||||||
|
progress_info_obj.AddMember("lag_time",
|
||||||
|
PrettyPrinter::Print(lag_time, TUnit::TIME_S));
|
||||||
|
progress_info_obj.AddMember("last_synced_event_time",
|
||||||
|
ToStringFromUnix(progress_info.last_synced_event_time_s));
|
||||||
|
progress_info_obj.AddMember("latest_event_time",
|
||||||
|
ToStringFromUnix(progress_info.latest_event_time_s));
|
||||||
|
// Add current batch info
|
||||||
|
if (progress_info.num_hms_events > 0) {
|
||||||
|
int progress = 0;
|
||||||
|
if (progress_info.num_filtered_events > 0) {
|
||||||
|
progress =
|
||||||
|
100 * progress_info.current_event_index / progress_info.num_filtered_events;
|
||||||
|
}
|
||||||
|
int64_t now_ms = UnixMillis();
|
||||||
|
int64_t elapsed_ms = max(0L, now_ms - progress_info.current_batch_start_time_ms);
|
||||||
|
progress_info_obj.AddMember("num_hms_events", progress_info.num_hms_events);
|
||||||
|
progress_info_obj.AddMember("num_filtered_events", progress_info.num_filtered_events);
|
||||||
|
progress_info_obj.AddMember("num_synced_events", progress_info.current_event_index);
|
||||||
|
progress_info_obj.AddMember("synced_percent", progress);
|
||||||
|
progress_info_obj.AddMember("min_event_id", progress_info.min_event_id);
|
||||||
|
progress_info_obj.AddMember("max_event_id", progress_info.max_event_id);
|
||||||
|
progress_info_obj.AddMember("min_event_time",
|
||||||
|
ToStringFromUnix(progress_info.min_event_time_s));
|
||||||
|
progress_info_obj.AddMember("max_event_time",
|
||||||
|
ToStringFromUnix(progress_info.max_event_time_s));
|
||||||
|
progress_info_obj.AddMember("start_time",
|
||||||
|
ToStringFromUnixMillis(progress_info.current_batch_start_time_ms));
|
||||||
|
progress_info_obj.AddMember("elapsed_time",
|
||||||
|
PrettyPrinter::Print(elapsed_ms, TUnit::TIME_MS));
|
||||||
|
progress_info_obj.AddMember("start_time_of_event",
|
||||||
|
ToStringFromUnixMillis(progress_info.current_event_start_time_ms));
|
||||||
|
progress_info_obj.AddMember("elapsed_time_current_event",
|
||||||
|
PrettyPrinter::Print(max(0L,
|
||||||
|
now_ms - progress_info.current_event_start_time_ms), TUnit::TIME_MS));
|
||||||
|
if (progress_info.__isset.current_event) {
|
||||||
|
JsonObjWrapper current_event(allocator);
|
||||||
|
current_event.AddMember("event_id", progress_info.current_event.eventId);
|
||||||
|
current_event.AddMember("event_time",
|
||||||
|
ToStringFromUnix(progress_info.current_event.eventTime));
|
||||||
|
current_event.AddMember("event_type", progress_info.current_event.eventType);
|
||||||
|
current_event.AddMember("cat_name", progress_info.current_event.catName);
|
||||||
|
current_event.AddMember("db_name", progress_info.current_event.dbName);
|
||||||
|
current_event.AddMember("tbl_name", progress_info.current_event.tableName);
|
||||||
|
progress_info_obj.value.AddMember("current_event", current_event.value, allocator);
|
||||||
|
}
|
||||||
|
if (progress_info.current_event_batch_size > 1) {
|
||||||
|
progress_info_obj.AddMember("current_event_batch_size",
|
||||||
|
progress_info.current_event_batch_size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
document->AddMember("progress-info", progress_info_obj.value, allocator);
|
||||||
}
|
}
|
||||||
|
|
||||||
void CatalogServer::CatalogObjectsUrlCallback(const Webserver::WebRequest& req,
|
void CatalogServer::CatalogObjectsUrlCallback(const Webserver::WebRequest& req,
|
||||||
|
|||||||
@@ -74,4 +74,25 @@ void ProtobufToJson(const google::protobuf::Message& pb, rapidjson::Document* do
|
|||||||
rapidjson::Value* obj);
|
rapidjson::Value* obj);
|
||||||
} // namespace impala
|
} // namespace impala
|
||||||
|
|
||||||
|
/// A wrapper for creating a rapidjson::Value with new fields.
|
||||||
|
struct JsonObjWrapper {
|
||||||
|
JsonObjWrapper(rapidjson::MemoryPoolAllocator<>& alloc):
|
||||||
|
value(rapidjson::kObjectType), allocator(alloc) {}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
void AddMember(const char* name, const T& val) {
|
||||||
|
rapidjson::Value field(val);
|
||||||
|
value.AddMember(rapidjson::StringRef(name), field, allocator);
|
||||||
|
}
|
||||||
|
|
||||||
|
rapidjson::Value value;
|
||||||
|
rapidjson::MemoryPoolAllocator<>& allocator;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<>
|
||||||
|
inline void JsonObjWrapper::AddMember(const char* name, const std::string& val) {
|
||||||
|
rapidjson::Value field(val.c_str(), allocator);
|
||||||
|
value.AddMember(rapidjson::StringRef(name), field, allocator);
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -1065,10 +1065,35 @@ struct TCopyTestCaseReq {
|
|||||||
1: required string input_path
|
1: required string input_path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct TEventBatchProgressInfo {
|
||||||
|
// Number of original HMS events received in the current batch.
|
||||||
|
1: required i32 num_hms_events
|
||||||
|
// Number of filtered MetastoreEvents generated from the original HMS events.
|
||||||
|
2: required i32 num_filtered_events
|
||||||
|
3: required i32 current_event_index
|
||||||
|
// Number of HMS events represented by this filtered event. For most events this is 1.
|
||||||
|
// In case of BatchPartitionEvent this could be more than 1.
|
||||||
|
4: required i32 current_event_batch_size
|
||||||
|
5: required i64 min_event_id
|
||||||
|
6: required i64 min_event_time_s
|
||||||
|
7: required i64 max_event_id
|
||||||
|
8: required i64 max_event_time_s
|
||||||
|
// Timestamp when we start to process the current event batch
|
||||||
|
9: required i64 current_batch_start_time_ms
|
||||||
|
// Timestamp when we start to process the current event
|
||||||
|
10: required i64 current_event_start_time_ms
|
||||||
|
11: required i64 last_synced_event_id
|
||||||
|
12: required i64 last_synced_event_time_s
|
||||||
|
13: required i64 latest_event_id
|
||||||
|
14: required i64 latest_event_time_s
|
||||||
|
15: optional hive_metastore.NotificationEvent current_event
|
||||||
|
}
|
||||||
|
|
||||||
struct TEventProcessorMetricsSummaryResponse {
|
struct TEventProcessorMetricsSummaryResponse {
|
||||||
// summary view of the events processor which can include status,
|
// summary view of the events processor which can include status,
|
||||||
// metrics and other details
|
// metrics and other details
|
||||||
1: required string summary
|
1: required string summary
|
||||||
// Error messages if the events processor goes into ERROR/NEEDS_INVALIDATE states
|
// Error messages if the events processor goes into ERROR/NEEDS_INVALIDATE states
|
||||||
2: optional string error_msg
|
2: optional string error_msg
|
||||||
|
3: optional TEventBatchProgressInfo progress
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -693,6 +693,8 @@ public class MetastoreEvents {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DebugUtils.executeDebugAction(
|
||||||
|
BackendConfig.INSTANCE.debugActions(), DebugUtils.EVENT_PROCESSING_DELAY);
|
||||||
process();
|
process();
|
||||||
injectErrorIfNeeded();
|
injectErrorIfNeeded();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ import org.apache.impala.common.PrintUtils;
|
|||||||
import org.apache.impala.compat.MetastoreShim;
|
import org.apache.impala.compat.MetastoreShim;
|
||||||
import org.apache.impala.service.BackendConfig;
|
import org.apache.impala.service.BackendConfig;
|
||||||
import org.apache.impala.service.CatalogOpExecutor;
|
import org.apache.impala.service.CatalogOpExecutor;
|
||||||
|
import org.apache.impala.thrift.TEventBatchProgressInfo;
|
||||||
import org.apache.impala.thrift.TEventProcessorMetrics;
|
import org.apache.impala.thrift.TEventProcessorMetrics;
|
||||||
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
|
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
|
||||||
import org.apache.impala.util.MetaStoreUtil;
|
import org.apache.impala.util.MetaStoreUtil;
|
||||||
@@ -525,6 +526,12 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
|
|
||||||
// keeps track of the current event that we are processing
|
// keeps track of the current event that we are processing
|
||||||
private NotificationEvent currentEvent_;
|
private NotificationEvent currentEvent_;
|
||||||
|
private List<NotificationEvent> currentEventBatch_;
|
||||||
|
private MetastoreEvent currentFilteredEvent_;
|
||||||
|
private List<MetastoreEvent> currentFilteredEvents_;
|
||||||
|
private long currentBatchStartTimeMs_ = 0;
|
||||||
|
private long currentEventStartTimeMs_ = 0;
|
||||||
|
private int currentEventIndex_ = 0;
|
||||||
|
|
||||||
// keeps track of the last event id which we have synced to
|
// keeps track of the last event id which we have synced to
|
||||||
private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
|
private final AtomicLong lastSyncedEventId_ = new AtomicLong(-1);
|
||||||
@@ -667,6 +674,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
@Override
|
@Override
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
|
Preconditions.checkState(eventProcessorStatus_ != EventProcessorStatus.ACTIVE);
|
||||||
|
resetProgressInfo();
|
||||||
startScheduler();
|
startScheduler();
|
||||||
updateStatus(EventProcessorStatus.ACTIVE);
|
updateStatus(EventProcessorStatus.ACTIVE);
|
||||||
LOG.info(String.format("Successfully started metastore event processing."
|
LOG.info(String.format("Successfully started metastore event processing."
|
||||||
@@ -822,6 +830,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Clear the error message of the last failure and reset the progress info
|
||||||
|
eventProcessorErrorMsg_ = null;
|
||||||
|
resetProgressInfo();
|
||||||
lastSyncedEventId_.set(fromEventId);
|
lastSyncedEventId_.set(fromEventId);
|
||||||
lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId));
|
lastSyncedEventTimeSecs_.set(getEventTimeFromHMS(fromEventId));
|
||||||
updateStatus(EventProcessorStatus.ACTIVE);
|
updateStatus(EventProcessorStatus.ACTIVE);
|
||||||
@@ -955,7 +966,6 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void processEvents() {
|
public void processEvents() {
|
||||||
currentEvent_ = null;
|
|
||||||
try {
|
try {
|
||||||
EventProcessorStatus currentStatus = eventProcessorStatus_;
|
EventProcessorStatus currentStatus = eventProcessorStatus_;
|
||||||
if (currentStatus != EventProcessorStatus.ACTIVE) {
|
if (currentStatus != EventProcessorStatus.ACTIVE) {
|
||||||
@@ -1025,7 +1035,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void updateLatestEventId() {
|
public void updateLatestEventId() {
|
||||||
EventProcessorStatus currentStatus = eventProcessorStatus_;
|
EventProcessorStatus currentStatus = eventProcessorStatus_;
|
||||||
if (currentStatus != EventProcessorStatus.ACTIVE) {
|
if (currentStatus == EventProcessorStatus.DISABLED) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
|
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
|
||||||
@@ -1130,6 +1140,35 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
if (eventProcessorErrorMsg_ != null) {
|
if (eventProcessorErrorMsg_ != null) {
|
||||||
summaryResponse.setError_msg(eventProcessorErrorMsg_);
|
summaryResponse.setError_msg(eventProcessorErrorMsg_);
|
||||||
}
|
}
|
||||||
|
TEventBatchProgressInfo progressInfo = new TEventBatchProgressInfo();
|
||||||
|
progressInfo.last_synced_event_id = lastSyncedEventId_.get();
|
||||||
|
progressInfo.last_synced_event_time_s = lastSyncedEventTimeSecs_.get();
|
||||||
|
progressInfo.latest_event_id = latestEventId_.get();
|
||||||
|
progressInfo.latest_event_time_s = latestEventTimeSecs_.get();
|
||||||
|
// Assign these lists to local variables in case they are replaced concurrently.
|
||||||
|
// It's best effort to make the members in 'progressInfo' consistent but we can't
|
||||||
|
// guarantee it.
|
||||||
|
List<NotificationEvent> eventBatch = currentEventBatch_;
|
||||||
|
List<MetastoreEvent> filteredEvents = currentFilteredEvents_;
|
||||||
|
if (eventBatch != null && !eventBatch.isEmpty()) {
|
||||||
|
int numHmsEvents = eventBatch.size();
|
||||||
|
progressInfo.num_hms_events = numHmsEvents;
|
||||||
|
progressInfo.min_event_id = eventBatch.get(0).getEventId();
|
||||||
|
progressInfo.min_event_time_s = eventBatch.get(0).getEventTime();
|
||||||
|
NotificationEvent lastEvent = eventBatch.get(numHmsEvents - 1);
|
||||||
|
progressInfo.max_event_id = lastEvent.getEventId();
|
||||||
|
progressInfo.max_event_time_s = lastEvent.getEventTime();
|
||||||
|
progressInfo.current_batch_start_time_ms = currentBatchStartTimeMs_;
|
||||||
|
progressInfo.current_event_start_time_ms = currentEventStartTimeMs_;
|
||||||
|
if (filteredEvents != null) {
|
||||||
|
progressInfo.num_filtered_events = filteredEvents.size();
|
||||||
|
}
|
||||||
|
progressInfo.current_event_index = currentEventIndex_;
|
||||||
|
progressInfo.current_event_batch_size = currentFilteredEvent_ != null ?
|
||||||
|
currentFilteredEvent_.getNumberOfEvents() : 0;
|
||||||
|
progressInfo.current_event = currentEvent_;
|
||||||
|
}
|
||||||
|
summaryResponse.setProgress(progressInfo);
|
||||||
return summaryResponse;
|
return summaryResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1138,6 +1177,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
return metrics_;
|
return metrics_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void resetProgressInfo() {
|
||||||
|
currentEvent_ = null;
|
||||||
|
currentEventBatch_ = null;
|
||||||
|
currentFilteredEvent_ = null;
|
||||||
|
currentFilteredEvents_ = null;
|
||||||
|
currentBatchStartTimeMs_ = 0;
|
||||||
|
currentEventStartTimeMs_ = 0;
|
||||||
|
currentEventIndex_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the given list of notification events. Useful for tests which provide a list
|
* Process the given list of notification events. Useful for tests which provide a list
|
||||||
* of events
|
* of events
|
||||||
@@ -1148,7 +1197,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void processEvents(long currentEventId, List<NotificationEvent> events)
|
protected void processEvents(long currentEventId, List<NotificationEvent> events)
|
||||||
throws MetastoreNotificationException {
|
throws MetastoreNotificationException {
|
||||||
currentEvent_ = null;
|
currentEventBatch_ = events;
|
||||||
// update the events received metric before returning
|
// update the events received metric before returning
|
||||||
metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
|
metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
|
||||||
if (events.isEmpty()) {
|
if (events.isEmpty()) {
|
||||||
@@ -1162,17 +1211,19 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
}
|
}
|
||||||
final Timer.Context context =
|
final Timer.Context context =
|
||||||
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
|
metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
|
||||||
|
currentBatchStartTimeMs_ = System.currentTimeMillis();
|
||||||
Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
|
Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
|
||||||
try {
|
try {
|
||||||
List<MetastoreEvent> filteredEvents =
|
currentFilteredEvents_ =
|
||||||
metastoreEventFactory_.getFilteredEvents(events, metrics_);
|
metastoreEventFactory_.getFilteredEvents(events, metrics_);
|
||||||
if (filteredEvents.isEmpty()) {
|
if (currentFilteredEvents_.isEmpty()) {
|
||||||
NotificationEvent e = events.get(events.size() - 1);
|
NotificationEvent e = events.get(events.size() - 1);
|
||||||
lastSyncedEventId_.set(e.getEventId());
|
lastSyncedEventId_.set(e.getEventId());
|
||||||
lastSyncedEventTimeSecs_.set(e.getEventTime());
|
lastSyncedEventTimeSecs_.set(e.getEventTime());
|
||||||
|
resetProgressInfo();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
for (MetastoreEvent event : filteredEvents) {
|
for (MetastoreEvent event : currentFilteredEvents_) {
|
||||||
// synchronizing each event processing reduces the scope of the lock so the a
|
// synchronizing each event processing reduces the scope of the lock so the a
|
||||||
// potential reset() during event processing is not blocked for longer than
|
// potential reset() during event processing is not blocked for longer than
|
||||||
// necessary
|
// necessary
|
||||||
@@ -1181,13 +1232,14 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
currentEvent_ = event.metastoreNotificationEvent_;
|
currentEvent_ = event.metastoreNotificationEvent_;
|
||||||
|
currentFilteredEvent_ = event;
|
||||||
String targetName = event.getTargetName();
|
String targetName = event.getTargetName();
|
||||||
String desc = String.format("Processing %s on %s, eventId=%d",
|
String desc = String.format("Processing %s on %s, eventId=%d",
|
||||||
event.getEventType(), targetName, event.getEventId());
|
event.getEventType(), targetName, event.getEventId());
|
||||||
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
|
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(desc)) {
|
||||||
long startMs = System.currentTimeMillis();
|
currentEventStartTimeMs_ = System.currentTimeMillis();
|
||||||
event.processIfEnabled();
|
event.processIfEnabled();
|
||||||
long elapsedTimeMs = System.currentTimeMillis() - startMs;
|
long elapsedTimeMs = System.currentTimeMillis() - currentEventStartTimeMs_;
|
||||||
eventProcessingTime.put(event, elapsedTimeMs);
|
eventProcessingTime.put(event, elapsedTimeMs);
|
||||||
} catch (Exception processingEx) {
|
} catch (Exception processingEx) {
|
||||||
try {
|
try {
|
||||||
@@ -1200,6 +1252,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
throw processingEx;
|
throw processingEx;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
currentEventIndex_++;
|
||||||
deleteEventLog_.garbageCollect(event.getEventId());
|
deleteEventLog_.garbageCollect(event.getEventId());
|
||||||
lastSyncedEventId_.set(event.getEventId());
|
lastSyncedEventId_.set(event.getEventId());
|
||||||
lastSyncedEventTimeSecs_.set(event.getEventTime());
|
lastSyncedEventTimeSecs_.set(event.getEventTime());
|
||||||
@@ -1208,6 +1261,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
|||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
resetProgressInfo();
|
||||||
} catch (CatalogException e) {
|
} catch (CatalogException e) {
|
||||||
throw new MetastoreNotificationException(String.format(
|
throw new MetastoreNotificationException(String.format(
|
||||||
"Unable to process event %d of type %s. Event processing will be stopped.",
|
"Unable to process event %d of type %s. Event processing will be stopped.",
|
||||||
|
|||||||
@@ -71,6 +71,9 @@ public class DebugUtils {
|
|||||||
public static final String GET_FILTERED_EVENTS_DELAY =
|
public static final String GET_FILTERED_EVENTS_DELAY =
|
||||||
"catalogd_get_filtered_events_delay";
|
"catalogd_get_filtered_events_delay";
|
||||||
|
|
||||||
|
// debug action label to inject a delay in processing each HMS event
|
||||||
|
public static final String EVENT_PROCESSING_DELAY = "catalogd_event_processing_delay";
|
||||||
|
|
||||||
// debug action label for introducing delay in loading table metadata.
|
// debug action label for introducing delay in loading table metadata.
|
||||||
public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
|
public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
|||||||
from tests.common.custom_cluster_test_suite import (
|
from tests.common.custom_cluster_test_suite import (
|
||||||
DEFAULT_CLUSTER_SIZE,
|
DEFAULT_CLUSTER_SIZE,
|
||||||
CustomClusterTestSuite)
|
CustomClusterTestSuite)
|
||||||
|
from tests.common.skip import SkipIfFS
|
||||||
from tests.shell.util import run_impala_shell_cmd
|
from tests.shell.util import run_impala_shell_cmd
|
||||||
|
|
||||||
SMALL_QUERY_LOG_SIZE_IN_BYTES = 40 * 1024
|
SMALL_QUERY_LOG_SIZE_IN_BYTES = 40 * 1024
|
||||||
@@ -436,3 +437,68 @@ class TestWebPage(CustomClusterTestSuite):
|
|||||||
self.cluster.statestored.service.wait_for_live_subscribers(DEFAULT_CLUSTER_SIZE)
|
self.cluster.statestored.service.wait_for_live_subscribers(DEFAULT_CLUSTER_SIZE)
|
||||||
# Verify the topic metrics again
|
# Verify the topic metrics again
|
||||||
self._verify_topic_size_metrics()
|
self._verify_topic_size_metrics()
|
||||||
|
|
||||||
|
@SkipIfFS.hive
|
||||||
|
@CustomClusterTestSuite.with_args(
|
||||||
|
catalogd_args="--hms_event_polling_interval_s=1 "
|
||||||
|
"--debug_actions=catalogd_event_processing_delay:SLEEP@2000")
|
||||||
|
def test_event_processor_status(self, unique_database):
|
||||||
|
"""Verify the /events page by using a long delay in event processing."""
|
||||||
|
self.execute_query("create table {}.part (i int) partitioned by (p int)".format(
|
||||||
|
unique_database))
|
||||||
|
insert_stmt = "insert into {}.part partition(p) select id, month from "\
|
||||||
|
"functional.alltypes".format(unique_database)
|
||||||
|
self.execute_query(insert_stmt)
|
||||||
|
# Run the same INSERT statement in Hive to get non-self events.
|
||||||
|
self.run_stmt_in_hive("set hive.exec.dynamic.partition.mode=nonstrict;" + insert_stmt)
|
||||||
|
page = requests.get("http://localhost:25020/events").text
|
||||||
|
# Wait until the batched events are being processed
|
||||||
|
while "a batch of" not in page:
|
||||||
|
time.sleep(1)
|
||||||
|
page = requests.get("http://localhost:25020/events").text
|
||||||
|
expected_lines = [
|
||||||
|
"Lag Info", "Lag time:", "Current Event Batch", "Metastore Event Batch:",
|
||||||
|
"Event ID starts from", "Event time starts from",
|
||||||
|
"Started processing the current batch at",
|
||||||
|
"Started processing the current event at",
|
||||||
|
"Current Metastore event being processed",
|
||||||
|
"(a batch of ", " events on the same table)",
|
||||||
|
]
|
||||||
|
for expected in expected_lines:
|
||||||
|
assert expected in page, "Missing '%s' in events page:\n%s" % (expected, page)
|
||||||
|
|
||||||
|
@SkipIfFS.hive
|
||||||
|
@CustomClusterTestSuite.with_args(
|
||||||
|
catalogd_args="--hms_event_polling_interval_s=1 "
|
||||||
|
"--invalidate_metadata_on_event_processing_failure=false "
|
||||||
|
"--inject_process_event_failure_event_types=CREATE_TABLE")
|
||||||
|
def test_event_processor_error_message(self, unique_database):
|
||||||
|
"""Verify the /events page show the error of event processing"""
|
||||||
|
self.run_stmt_in_hive("create table {}.tbl(i int)".format(unique_database))
|
||||||
|
# Wait enough time for the event to be processed
|
||||||
|
time.sleep(2)
|
||||||
|
page = requests.get("http://localhost:25020/events").text
|
||||||
|
expected_lines = [
|
||||||
|
"Unexpected exception received while processing event",
|
||||||
|
"Event id:", "Event Type: CREATE_TABLE", "Event message:",
|
||||||
|
]
|
||||||
|
for expected in expected_lines:
|
||||||
|
assert expected in page, "Missing '%s' in events page:\n%s" % (expected, page)
|
||||||
|
|
||||||
|
# Verify the latest event id still get updated
|
||||||
|
json_res = json.loads(requests.get("http://localhost:25020/events?json").text)
|
||||||
|
old_latest_event_id = json_res["progress-info"]["latest_event_id"]
|
||||||
|
# Generate new events
|
||||||
|
self.run_stmt_in_hive("create table {}.tbl2(i int)".format(unique_database))
|
||||||
|
# Wait enough time for the event to be polled
|
||||||
|
time.sleep(2)
|
||||||
|
json_res = json.loads(requests.get("http://localhost:25020/events?json").text)
|
||||||
|
new_latest_event_id = json_res["progress-info"]["latest_event_id"]
|
||||||
|
assert new_latest_event_id > old_latest_event_id
|
||||||
|
# Current event (the failed one) should not be cleared
|
||||||
|
assert "current_event" in json_res["progress-info"]
|
||||||
|
|
||||||
|
# Verify the error message disappears after a global INVALIDATE METADATA
|
||||||
|
self.execute_query("invalidate metadata")
|
||||||
|
page = requests.get("http://localhost:25020/events").text
|
||||||
|
assert "Unexpected exception" not in page, "Still see error message:\n" + page
|
||||||
|
|||||||
@@ -19,12 +19,71 @@ under the License.
|
|||||||
{{> www/common-header.tmpl }}
|
{{> www/common-header.tmpl }}
|
||||||
{{^error}}
|
{{^error}}
|
||||||
|
|
||||||
<h3>Event Processor Summary</h3>
|
|
||||||
<pre>{{event_processor_metrics}}</pre>
|
|
||||||
|
|
||||||
{{?event_processor_error_msg}}
|
{{?event_processor_error_msg}}
|
||||||
<h3>Error Message</h3>
|
<h3>Error Message</h3>
|
||||||
<pre>{{event_processor_error_msg}}</pre>
|
<pre>{{event_processor_error_msg}}</pre>
|
||||||
{{/event_processor_error_msg}}
|
{{/event_processor_error_msg}}
|
||||||
|
|
||||||
|
<h3>Lag Info:</h3>
|
||||||
|
Lag time: {{progress-info.lag_time}}
|
||||||
|
<table class="table table-hover table-bordered">
|
||||||
|
<tr>
|
||||||
|
<th>
|
||||||
|
<th>Event ID</th>
|
||||||
|
<th>Event Timestamp (s)</th>
|
||||||
|
<th>Event Time</th>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Last Synced Event</td>
|
||||||
|
<td>{{progress-info.last_synced_event_id}}</td>
|
||||||
|
<td>{{progress-info.last_synced_event_time_s}}</td>
|
||||||
|
<td>{{progress-info.last_synced_event_time}}</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Latest Event in Metastore</td>
|
||||||
|
<td>{{progress-info.latest_event_id}}</td>
|
||||||
|
<td>{{progress-info.latest_event_time_s}}</td>
|
||||||
|
<td>{{progress-info.latest_event_time}}</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
{{?progress-info.num_hms_events}}
|
||||||
|
<h3>Current Event Batch</h3>
|
||||||
|
<p>
|
||||||
|
Metastore Event Batch: {{progress-info.num_hms_events}} events.</br>
|
||||||
|
Event ID starts from {{progress-info.min_event_id}} to {{progress-info.max_event_id}}.</br>
|
||||||
|
Event time starts from {{progress-info.min_event_time}} to {{progress-info.max_event_time}}.</br>
|
||||||
|
After batching and filtering, there are {{progress-info.num_filtered_events}} events to be processed.
|
||||||
|
{{progress-info.num_synced_events}} ({{progress-info.synced_percent}}%) have been processed.</br>
|
||||||
|
Started processing the current batch at {{progress-info.start_time}} ({{progress-info.elapsed_time}} ago).</br>
|
||||||
|
Started processing the current event at {{progress-info.start_time_of_event}} ({{progress-info.elapsed_time_current_event}} ago).
|
||||||
|
</p>
|
||||||
|
<p>Current Metastore event being processed
|
||||||
|
{{?progress-info.current_event_batch_size}}
|
||||||
|
(a batch of {{progress-info.current_event_batch_size}} events on the same table)
|
||||||
|
{{/progress-info.current_event_batch_size}}
|
||||||
|
:</p>
|
||||||
|
<table class="table table-hover table-bordered">
|
||||||
|
<tr>
|
||||||
|
<th>Event ID</th>
|
||||||
|
<th>Event Time</th>
|
||||||
|
<th>Event Type</th>
|
||||||
|
<th>Catalog Name</th>
|
||||||
|
<th>Database Name</th>
|
||||||
|
<th>Table Name</th>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>{{progress-info.current_event.event_id}}</td>
|
||||||
|
<td>{{progress-info.current_event.event_time}}</td>
|
||||||
|
<td>{{progress-info.current_event.event_type}}</td>
|
||||||
|
<td>{{progress-info.current_event.cat_name}}</td>
|
||||||
|
<td>{{progress-info.current_event.db_name}}</td>
|
||||||
|
<td>{{progress-info.current_event.tbl_name}}</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
{{/progress-info.num_hms_events}}
|
||||||
|
|
||||||
|
<h3>Event Processor Summary</h3>
|
||||||
|
<pre>{{event_processor_metrics}}</pre>
|
||||||
|
|
||||||
{{> www/common-footer.tmpl }}
|
{{> www/common-footer.tmpl }}
|
||||||
|
|||||||
Reference in New Issue
Block a user