From 1b468b87a3927aa0d3f17cd4ba291735344796b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=B6ran=20Sander?= Date: Mon, 15 Dec 2025 04:47:46 +0100 Subject: [PATCH] feat: Add retries when writing UDP queue metrics to InfluxDB v3 --- src/lib/influxdb/factory.js | 64 +++++++++++++--------- src/lib/influxdb/index.js | 105 ++++++++++++++++++++++++++++++++++-- 2 files changed, 141 insertions(+), 28 deletions(-) diff --git a/src/lib/influxdb/factory.js b/src/lib/influxdb/factory.js index b1d8083..f99f12c 100644 --- a/src/lib/influxdb/factory.js +++ b/src/lib/influxdb/factory.js @@ -174,20 +174,28 @@ export async function storeRejectedEventCountInfluxDB() { * @returns {Promise} Promise that resolves when data has been posted to InfluxDB */ export async function postUserEventQueueMetricsToInfluxdb() { - const version = getInfluxDbVersion(); + try { + const version = getInfluxDbVersion(); - if (version === 1) { - return storeUserEventQueueMetricsV1(); - } - if (version === 2) { - return storeUserEventQueueMetricsV2(); - } - if (version === 3) { - return postUserEventQueueMetricsToInfluxdbV3(); - } + if (version === 1) { + return storeUserEventQueueMetricsV1(); + } + if (version === 2) { + return storeUserEventQueueMetricsV2(); + } + if (version === 3) { + return postUserEventQueueMetricsToInfluxdbV3(); + } - globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`); - throw new Error(`InfluxDB v${version} not supported`); + globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`); + throw new Error(`InfluxDB v${version} not supported`); + } catch (err) { + globals.logger.error( + `INFLUXDB FACTORY: Error in postUserEventQueueMetricsToInfluxdb: ${err.message}` + ); + globals.logger.debug(`INFLUXDB FACTORY: Error stack: ${err.stack}`); + throw err; + } } /** @@ -196,20 +204,28 @@ export async function postUserEventQueueMetricsToInfluxdb() { * @returns {Promise} Promise that resolves when data has been posted to InfluxDB */ export async function postLogEventQueueMetricsToInfluxdb() { - const version = getInfluxDbVersion(); + try { + const version = getInfluxDbVersion(); - if (version === 1) { - return storeLogEventQueueMetricsV1(); - } - if (version === 2) { - return storeLogEventQueueMetricsV2(); - } - if (version === 3) { - return postLogEventQueueMetricsToInfluxdbV3(); - } + if (version === 1) { + return storeLogEventQueueMetricsV1(); + } + if (version === 2) { + return storeLogEventQueueMetricsV2(); + } + if (version === 3) { + return postLogEventQueueMetricsToInfluxdbV3(); + } - globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`); - throw new Error(`InfluxDB v${version} not supported`); + globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`); + throw new Error(`InfluxDB v${version} not supported`); + } catch (err) { + globals.logger.error( + `INFLUXDB FACTORY: Error in postLogEventQueueMetricsToInfluxdb: ${err.message}` + ); + globals.logger.debug(`INFLUXDB FACTORY: Error stack: ${err.stack}`); + throw err; + } } /** diff --git a/src/lib/influxdb/index.js b/src/lib/influxdb/index.js index 10dbfa3..ab3302a 100644 --- a/src/lib/influxdb/index.js +++ b/src/lib/influxdb/index.js @@ -1,5 +1,6 @@ import { useRefactoredInfluxDb, getFormattedTime } from './shared/utils.js'; import * as factory from './factory.js'; +import globals from '../../globals.js'; // Import original implementation for fallback import * as original from '../post-to-influxdb.js'; @@ -95,7 +96,8 @@ export async function postUserEventToInfluxdb(msg) { try { return await factory.postUserEventToInfluxdb(msg); } catch (err) { - // If refactored code not yet implemented for this version, fall back to original + // If refactored code not yet implemented for this version, fall back to original globals.logger.error(`INFLUXDB ROUTING: User event - falling back to legacy code due to error: ${err.message}`); + globals.logger.debug(`INFLUXDB ROUTING: User event - error stack: ${err.stack}`); return await original.postUserEventToInfluxdb(msg); } } @@ -115,7 +117,8 @@ export async function postLogEventToInfluxdb(msg) { try { return await factory.postLogEventToInfluxdb(msg); } catch (err) { - // If refactored code not yet implemented for this version, fall back to original + // If refactored code not yet implemented for this version, fall back to original globals.logger.error(`INFLUXDB ROUTING: Log event - falling back to legacy code due to error: ${err.message}`); + globals.logger.debug(`INFLUXDB ROUTING: Log event - error stack: ${err.stack}`); return await original.postLogEventToInfluxdb(msg); } } @@ -181,9 +184,19 @@ export async function postUserEventQueueMetricsToInfluxdb(queueMetrics) { return await factory.postUserEventQueueMetricsToInfluxdb(); } catch (err) { // If refactored code not yet implemented for this version, fall back to original + globals.logger.error( + `INFLUXDB ROUTING: User event queue metrics - falling back to legacy code due to error: ${err.message}` + ); + globals.logger.debug( + `INFLUXDB ROUTING: User event queue metrics - error stack: ${err.stack}` + ); return await original.postUserEventQueueMetricsToInfluxdb(queueMetrics); } } + + globals.logger.verbose( + 'INFLUXDB ROUTING: User event queue metrics - using original implementation' + ); return await original.postUserEventQueueMetricsToInfluxdb(queueMetrics); } @@ -201,6 +214,12 @@ export async function postLogEventQueueMetricsToInfluxdb(queueMetrics) { return await factory.postLogEventQueueMetricsToInfluxdb(); } catch (err) { // If refactored code not yet implemented for this version, fall back to original + globals.logger.error( + `INFLUXDB ROUTING: Log event queue metrics - falling back to legacy code due to error: ${err.message}` + ); + globals.logger.debug( + `INFLUXDB ROUTING: Log event queue metrics - error stack: ${err.stack}` + ); return await original.postLogEventQueueMetricsToInfluxdb(queueMetrics); } } @@ -213,6 +232,84 @@ export async function postLogEventQueueMetricsToInfluxdb(queueMetrics) { * @returns {object} Object containing interval IDs for cleanup */ export function setupUdpQueueMetricsStorage() { - // This is version-agnostic, always use original - return original.setupUdpQueueMetricsStorage(); + const intervalIds = { + userEvents: null, + logEvents: null, + }; + + // Check if InfluxDB is enabled + if (globals.config.get('Butler-SOS.influxdbConfig.enable') !== true) { + globals.logger.info( + 'UDP QUEUE METRICS: InfluxDB is disabled. Skipping setup of queue metrics storage' + ); + return intervalIds; + } + + // Set up user events queue metrics storage + if ( + globals.config.get('Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.enable') === + true + ) { + const writeFrequency = globals.config.get( + 'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency' + ); + + intervalIds.userEvents = setInterval(async () => { + try { + globals.logger.verbose( + 'UDP QUEUE METRICS: Timer for storing user event queue metrics to InfluxDB triggered' + ); + await postUserEventQueueMetricsToInfluxdb(); + } catch (err) { + globals.logger.error( + `UDP QUEUE METRICS: Error storing user event queue metrics to InfluxDB: ${ + err && err.stack ? err.stack : err + }` + ); + } + }, writeFrequency); + + globals.logger.info( + `UDP QUEUE METRICS: Set up timer for storing user event queue metrics to InfluxDB (interval: ${writeFrequency}ms)` + ); + } else { + globals.logger.info( + 'UDP QUEUE METRICS: User event queue metrics storage to InfluxDB is disabled' + ); + } + + // Set up log events queue metrics storage + if ( + globals.config.get('Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.enable') === + true + ) { + const writeFrequency = globals.config.get( + 'Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency' + ); + + intervalIds.logEvents = setInterval(async () => { + try { + globals.logger.verbose( + 'UDP QUEUE METRICS: Timer for storing log event queue metrics to InfluxDB triggered' + ); + await postLogEventQueueMetricsToInfluxdb(); + } catch (err) { + globals.logger.error( + `UDP QUEUE METRICS: Error storing log event queue metrics to InfluxDB: ${ + err && err.stack ? err.stack : err + }` + ); + } + }, writeFrequency); + + globals.logger.info( + `UDP QUEUE METRICS: Set up timer for storing log event queue metrics to InfluxDB (interval: ${writeFrequency}ms)` + ); + } else { + globals.logger.info( + 'UDP QUEUE METRICS: Log event queue metrics storage to InfluxDB is disabled' + ); + } + + return intervalIds; }