Fix InfluxDB v3 handling wrt identical tag and field names (not allowed!)

This commit is contained in:
Göran Sander
2025-12-13 08:43:23 +01:00
parent b4f8baeb26
commit 0b82072a4c
8 changed files with 61 additions and 20 deletions

View File

@@ -24,7 +24,7 @@ import { setupAnonUsageReportTimer } from './lib/telemetry.js';
import { setupPromClient } from './lib/prom-client.js';
import { setupConfigVisServer } from './lib/config-visualise.js';
import { setupUdpEventsStorage } from './lib/udp-event.js';
import { setupUdpQueueMetricsStorage } from './lib/post-to-influxdb.js';
import { setupUdpQueueMetricsStorage } from './lib/influxdb/index.js';
// Suppress experimental warnings
// https://stackoverflow.com/questions/55778283/how-to-disable-warnings-when-node-is-launched-via-a-global-shell-script

View File

@@ -7,7 +7,7 @@ import https from 'https';
import axios from 'axios';
import globals from '../globals.js';
import { postHealthMetricsToInfluxdb } from './post-to-influxdb.js';
import { postHealthMetricsToInfluxdb } from './influxdb/index.js';
import { postHealthMetricsToNewRelic } from './post-to-new-relic.js';
import { postHealthToMQTT } from './post-to-mqtt.js';
import { getServerHeaders } from './serverheaders.js';
@@ -101,6 +101,10 @@ export function getHealthStatsFromSense(serverName, host, tags, headers) {
globals.logger.debug('HEALTH: Calling HEALTH metrics Prometheus method');
saveHealthMetricsToPrometheus(host, response.data, tags);
}
} else {
globals.logger.error(
`HEALTH: Received non-200 response code (${response.status}) from server '${serverName}' (${host})`
);
}
})
.catch((err) => {

View File

@@ -47,7 +47,8 @@ export async function postLogEventToInfluxdbV3(msg) {
// Handle each message type with its specific fields
if (msg.source === 'qseow-engine') {
// Engine fields: message, exception_message, command, result_code, origin, context, session_id, raw_event
// Engine fields: message, exception_message, command, result_code_field, origin, context, session_id, raw_event
// NOTE: result_code uses _field suffix to avoid conflict with result_code tag
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
@@ -57,7 +58,7 @@ export async function postLogEventToInfluxdbV3(msg) {
.setStringField('message', msg.message)
.setStringField('exception_message', msg.exception_message || '')
.setStringField('command', msg.command || '')
.setStringField('result_code', msg.result_code || '')
.setStringField('result_code_field', msg.result_code || '')
.setStringField('origin', msg.origin || '')
.setStringField('context', msg.context || '')
.setStringField('session_id', msg.session_id || '')
@@ -76,7 +77,8 @@ export async function postLogEventToInfluxdbV3(msg) {
if (msg?.engine_exe_version?.length > 0)
point.setTag('engine_exe_version', msg.engine_exe_version);
} else if (msg.source === 'qseow-proxy') {
// Proxy fields: message, exception_message, command, result_code, origin, context, raw_event
// Proxy fields: message, exception_message, command, result_code_field, origin, context, raw_event
// NOTE: result_code uses _field suffix to avoid conflict with result_code tag
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
@@ -86,7 +88,7 @@ export async function postLogEventToInfluxdbV3(msg) {
.setStringField('message', msg.message)
.setStringField('exception_message', msg.exception_message || '')
.setStringField('command', msg.command || '')
.setStringField('result_code', msg.result_code || '')
.setStringField('result_code_field', msg.result_code || '')
.setStringField('origin', msg.origin || '')
.setStringField('context', msg.context || '')
.setStringField('raw_event', JSON.stringify(msg));
@@ -97,7 +99,8 @@ export async function postLogEventToInfluxdbV3(msg) {
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
} else if (msg.source === 'qseow-scheduler') {
// Scheduler fields: message, exception_message, app_name, app_id, execution_id, raw_event
// Scheduler fields: message, exception_message, app_name_field, app_id_field, execution_id, raw_event
// NOTE: app_name and app_id use _field suffix to avoid conflict with conditional tags
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
@@ -106,8 +109,8 @@ export async function postLogEventToInfluxdbV3(msg) {
.setTag('subsystem', msg.subsystem || 'n/a')
.setStringField('message', msg.message)
.setStringField('exception_message', msg.exception_message || '')
.setStringField('app_name', msg.app_name || '')
.setStringField('app_id', msg.app_id || '')
.setStringField('app_name_field', msg.app_name || '')
.setStringField('app_id_field', msg.app_id || '')
.setStringField('execution_id', msg.execution_id || '')
.setStringField('raw_event', JSON.stringify(msg));
@@ -118,7 +121,8 @@ export async function postLogEventToInfluxdbV3(msg) {
if (msg?.task_id?.length > 0) point.setTag('task_id', msg.task_id);
if (msg?.task_name?.length > 0) point.setTag('task_name', msg.task_name);
} else if (msg.source === 'qseow-repository') {
// Repository fields: message, exception_message, command, result_code, origin, context, raw_event
// Repository fields: message, exception_message, command, result_code_field, origin, context, raw_event
// NOTE: result_code uses _field suffix to avoid conflict with result_code tag
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
@@ -128,7 +132,7 @@ export async function postLogEventToInfluxdbV3(msg) {
.setStringField('message', msg.message)
.setStringField('exception_message', msg.exception_message || '')
.setStringField('command', msg.command || '')
.setStringField('result_code', msg.result_code || '')
.setStringField('result_code_field', msg.result_code || '')
.setStringField('origin', msg.origin || '')
.setStringField('context', msg.context || '')
.setStringField('raw_event', JSON.stringify(msg));
@@ -151,7 +155,7 @@ export async function postLogEventToInfluxdbV3(msg) {
.setTag('proxy_session_id', msg.proxy_session_id || '-1')
.setTag('session_id', msg.session_id || '-1')
.setTag('event_activity_source', msg.event_activity_source || '<Unknown>')
.setStringField('app_id', msg.app_id || '')
.setStringField('app_id_field', msg.app_id || '')
.setFloatField('process_time', msg.process_time)
.setFloatField('work_time', msg.work_time)
.setFloatField('lock_time', msg.lock_time)
@@ -192,6 +196,7 @@ export async function postLogEventToInfluxdbV3(msg) {
}
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`LOG EVENT INFLUXDB V3: Wrote data to InfluxDB v3`);
globals.logger.verbose('LOG EVENT INFLUXDB V3: Sent Butler SOS log event data to InfluxDB');

View File

@@ -2,6 +2,20 @@ import { Point as Point3 } from '@influxdata/influxdb3-client';
import globals from '../../../globals.js';
import { isInfluxDbEnabled } from '../shared/utils.js';
/**
* Sanitize tag values for InfluxDB line protocol.
* Remove or replace characters that cause parsing issues.
*
* @param {string} value - The value to sanitize
* @returns {string} - The sanitized value
*/
function sanitizeTagValue(value) {
if (!value) return value;
return String(value)
.replace(/[<>\\]/g, '')
.replace(/\s+/g, '-');
}
/**
* Posts a user event to InfluxDB v3.
*
@@ -26,7 +40,17 @@ export async function postUserEventToInfluxdbV3(msg) {
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
// Validate required fields
if (!msg.host || !msg.command || !msg.user_directory || !msg.user_id || !msg.origin) {
globals.logger.warn(
`USER EVENT INFLUXDB V3: Missing required fields in user event message: ${JSON.stringify(msg)}`
);
return;
}
// Create a new point with the data to be written to InfluxDB v3
// NOTE: InfluxDB v3 does not allow the same name for both tags and fields,
// unlike v1/v2. Fields use different names with _field suffix where needed.
const point = new Point3('user_events')
.setTag('host', msg.host)
.setTag('event_action', msg.command)
@@ -34,17 +58,17 @@ export async function postUserEventToInfluxdbV3(msg) {
.setTag('userDirectory', msg.user_directory)
.setTag('userId', msg.user_id)
.setTag('origin', msg.origin)
.setStringField('userFull', `${msg.user_directory}\\${msg.user_id}`)
.setStringField('userId', msg.user_id);
.setStringField('userFull_field', `${msg.user_directory}\\${msg.user_id}`)
.setStringField('userId_field', msg.user_id);
// Add app id and name to tags and fields if available
if (msg?.appId) {
point.setTag('appId', msg.appId);
point.setStringField('appId', msg.appId);
point.setStringField('appId_field', msg.appId);
}
if (msg?.appName) {
point.setTag('appName', msg.appName);
point.setStringField('appName', msg.appName);
point.setStringField('appName_field', msg.appName);
}
// Add user agent info to tags if available
@@ -75,12 +99,20 @@ export async function postUserEventToInfluxdbV3(msg) {
// Write to InfluxDB
try {
// Convert point to line protocol and write directly
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`USER EVENT INFLUXDB V3: Wrote data to InfluxDB v3`);
} catch (err) {
globals.logger.error(
`USER EVENT INFLUXDB V3: Error saving user event to InfluxDB v3! ${globals.getErrorMessage(err)}`
);
// Log the line protocol for debugging
try {
const lineProtocol = point.toLineProtocol();
globals.logger.debug(`USER EVENT INFLUXDB V3: Failed line protocol: ${lineProtocol}`);
} catch (e) {
// Ignore errors in debug logging
}
}
globals.logger.verbose('USER EVENT INFLUXDB V3: Sent Butler SOS user event data to InfluxDB');

View File

@@ -2,7 +2,7 @@ import later from '@breejs/later';
import { Duration } from 'luxon';
import globals from '../globals.js';
import { postButlerSOSMemoryUsageToInfluxdb } from './post-to-influxdb.js';
import { postButlerSOSMemoryUsageToInfluxdb } from './influxdb/index.js';
import { postButlerSOSUptimeToNewRelic } from './post-to-new-relic.js';
const fullUnits = ['years', 'months', 'days', 'hours', 'minutes', 'seconds'];

View File

@@ -1,7 +1,7 @@
import { Mutex } from 'async-mutex';
import globals from '../globals.js';
import { storeRejectedEventCountInfluxDB, storeEventCountInfluxDB } from './post-to-influxdb.js';
import { storeRejectedEventCountInfluxDB, storeEventCountInfluxDB } from './influxdb/index.js';
/**
* Class for tracking counts of UDP events received from Qlik Sense.

View File

@@ -1,5 +1,5 @@
import globals from '../../../globals.js';
import { postLogEventToInfluxdb } from '../../post-to-influxdb.js';
import { postLogEventToInfluxdb } from '../../influxdb/index.js';
import { postLogEventToNewRelic } from '../../post-to-new-relic.js';
import { postLogEventToMQTT } from '../../post-to-mqtt.js';
import { categoriseLogEvent } from '../../log-event-categorise.js';

View File

@@ -4,7 +4,7 @@ import { UAParser } from 'ua-parser-js';
// Load global variables and functions
import globals from '../../../globals.js';
import { sanitizeField } from '../../udp-queue-manager.js';
import { postUserEventToInfluxdb } from '../../post-to-influxdb.js';
import { postUserEventToInfluxdb } from '../../influxdb/index.js';
import { postUserEventToNewRelic } from '../../post-to-new-relic.js';
import { postUserEventToMQTT } from '../../post-to-mqtt.js';