feat: Add retries when writing UDP queue metrics to InfluxDB v3

This commit is contained in:
Göran Sander
2025-12-15 04:47:46 +01:00
parent 2c4ad6ec46
commit 1b468b87a3
2 changed files with 141 additions and 28 deletions

View File

@@ -174,20 +174,28 @@ export async function storeRejectedEventCountInfluxDB() {
* @returns {Promise<void>} Promise that resolves when data has been posted to InfluxDB
*/
export async function postUserEventQueueMetricsToInfluxdb() {
const version = getInfluxDbVersion();
try {
const version = getInfluxDbVersion();
if (version === 1) {
return storeUserEventQueueMetricsV1();
}
if (version === 2) {
return storeUserEventQueueMetricsV2();
}
if (version === 3) {
return postUserEventQueueMetricsToInfluxdbV3();
}
if (version === 1) {
return storeUserEventQueueMetricsV1();
}
if (version === 2) {
return storeUserEventQueueMetricsV2();
}
if (version === 3) {
return postUserEventQueueMetricsToInfluxdbV3();
}
globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`);
throw new Error(`InfluxDB v${version} not supported`);
globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`);
throw new Error(`InfluxDB v${version} not supported`);
} catch (err) {
globals.logger.error(
`INFLUXDB FACTORY: Error in postUserEventQueueMetricsToInfluxdb: ${err.message}`
);
globals.logger.debug(`INFLUXDB FACTORY: Error stack: ${err.stack}`);
throw err;
}
}
/**
@@ -196,20 +204,28 @@ export async function postUserEventQueueMetricsToInfluxdb() {
* @returns {Promise<void>} Promise that resolves when data has been posted to InfluxDB
*/
export async function postLogEventQueueMetricsToInfluxdb() {
const version = getInfluxDbVersion();
try {
const version = getInfluxDbVersion();
if (version === 1) {
return storeLogEventQueueMetricsV1();
}
if (version === 2) {
return storeLogEventQueueMetricsV2();
}
if (version === 3) {
return postLogEventQueueMetricsToInfluxdbV3();
}
if (version === 1) {
return storeLogEventQueueMetricsV1();
}
if (version === 2) {
return storeLogEventQueueMetricsV2();
}
if (version === 3) {
return postLogEventQueueMetricsToInfluxdbV3();
}
globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`);
throw new Error(`InfluxDB v${version} not supported`);
globals.logger.debug(`INFLUXDB FACTORY: Unknown InfluxDB version: v${version}`);
throw new Error(`InfluxDB v${version} not supported`);
} catch (err) {
globals.logger.error(
`INFLUXDB FACTORY: Error in postLogEventQueueMetricsToInfluxdb: ${err.message}`
);
globals.logger.debug(`INFLUXDB FACTORY: Error stack: ${err.stack}`);
throw err;
}
}
/**

View File

@@ -1,5 +1,6 @@
import { useRefactoredInfluxDb, getFormattedTime } from './shared/utils.js';
import * as factory from './factory.js';
import globals from '../../globals.js';
// Import original implementation for fallback
import * as original from '../post-to-influxdb.js';
@@ -95,7 +96,8 @@ export async function postUserEventToInfluxdb(msg) {
try {
return await factory.postUserEventToInfluxdb(msg);
} catch (err) {
// If refactored code not yet implemented for this version, fall back to original
// If refactored code not yet implemented for this version, fall back to original globals.logger.error(`INFLUXDB ROUTING: User event - falling back to legacy code due to error: ${err.message}`);
globals.logger.debug(`INFLUXDB ROUTING: User event - error stack: ${err.stack}`);
return await original.postUserEventToInfluxdb(msg);
}
}
@@ -115,7 +117,8 @@ export async function postLogEventToInfluxdb(msg) {
try {
return await factory.postLogEventToInfluxdb(msg);
} catch (err) {
// If refactored code not yet implemented for this version, fall back to original
// If refactored code not yet implemented for this version, fall back to original globals.logger.error(`INFLUXDB ROUTING: Log event - falling back to legacy code due to error: ${err.message}`);
globals.logger.debug(`INFLUXDB ROUTING: Log event - error stack: ${err.stack}`);
return await original.postLogEventToInfluxdb(msg);
}
}
@@ -181,9 +184,19 @@ export async function postUserEventQueueMetricsToInfluxdb(queueMetrics) {
return await factory.postUserEventQueueMetricsToInfluxdb();
} catch (err) {
// If refactored code not yet implemented for this version, fall back to original
globals.logger.error(
`INFLUXDB ROUTING: User event queue metrics - falling back to legacy code due to error: ${err.message}`
);
globals.logger.debug(
`INFLUXDB ROUTING: User event queue metrics - error stack: ${err.stack}`
);
return await original.postUserEventQueueMetricsToInfluxdb(queueMetrics);
}
}
globals.logger.verbose(
'INFLUXDB ROUTING: User event queue metrics - using original implementation'
);
return await original.postUserEventQueueMetricsToInfluxdb(queueMetrics);
}
@@ -201,6 +214,12 @@ export async function postLogEventQueueMetricsToInfluxdb(queueMetrics) {
return await factory.postLogEventQueueMetricsToInfluxdb();
} catch (err) {
// If refactored code not yet implemented for this version, fall back to original
globals.logger.error(
`INFLUXDB ROUTING: Log event queue metrics - falling back to legacy code due to error: ${err.message}`
);
globals.logger.debug(
`INFLUXDB ROUTING: Log event queue metrics - error stack: ${err.stack}`
);
return await original.postLogEventQueueMetricsToInfluxdb(queueMetrics);
}
}
@@ -213,6 +232,84 @@ export async function postLogEventQueueMetricsToInfluxdb(queueMetrics) {
* @returns {object} Object containing interval IDs for cleanup
*/
export function setupUdpQueueMetricsStorage() {
// This is version-agnostic, always use original
return original.setupUdpQueueMetricsStorage();
const intervalIds = {
userEvents: null,
logEvents: null,
};
// Check if InfluxDB is enabled
if (globals.config.get('Butler-SOS.influxdbConfig.enable') !== true) {
globals.logger.info(
'UDP QUEUE METRICS: InfluxDB is disabled. Skipping setup of queue metrics storage'
);
return intervalIds;
}
// Set up user events queue metrics storage
if (
globals.config.get('Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.enable') ===
true
) {
const writeFrequency = globals.config.get(
'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency'
);
intervalIds.userEvents = setInterval(async () => {
try {
globals.logger.verbose(
'UDP QUEUE METRICS: Timer for storing user event queue metrics to InfluxDB triggered'
);
await postUserEventQueueMetricsToInfluxdb();
} catch (err) {
globals.logger.error(
`UDP QUEUE METRICS: Error storing user event queue metrics to InfluxDB: ${
err && err.stack ? err.stack : err
}`
);
}
}, writeFrequency);
globals.logger.info(
`UDP QUEUE METRICS: Set up timer for storing user event queue metrics to InfluxDB (interval: ${writeFrequency}ms)`
);
} else {
globals.logger.info(
'UDP QUEUE METRICS: User event queue metrics storage to InfluxDB is disabled'
);
}
// Set up log events queue metrics storage
if (
globals.config.get('Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.enable') ===
true
) {
const writeFrequency = globals.config.get(
'Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency'
);
intervalIds.logEvents = setInterval(async () => {
try {
globals.logger.verbose(
'UDP QUEUE METRICS: Timer for storing log event queue metrics to InfluxDB triggered'
);
await postLogEventQueueMetricsToInfluxdb();
} catch (err) {
globals.logger.error(
`UDP QUEUE METRICS: Error storing log event queue metrics to InfluxDB: ${
err && err.stack ? err.stack : err
}`
);
}
}, writeFrequency);
globals.logger.info(
`UDP QUEUE METRICS: Set up timer for storing log event queue metrics to InfluxDB (interval: ${writeFrequency}ms)`
);
} else {
globals.logger.info(
'UDP QUEUE METRICS: Log event queue metrics storage to InfluxDB is disabled'
);
}
return intervalIds;
}