Fix v3 log event handling to match v1/v2 data models

- Rewrite postLogEventToInfluxdb v3 to handle each message type distinctly
- Engine: stores session_id, windows_user, engine_exe_version
- Proxy: no session_id (key difference from engine)
- Scheduler: stores execution_id instead of command/result_code/origin/context
- Repository: similar to proxy, no session_id
- QIX-perf: stores performance metrics with float/integer fields
- All Point3 field types now correct (setStringField, setFloatField, setIntegerField)
- Conditional tags match v1/v2 behavior for each source type
- All 349 tests passing
This commit is contained in:
Göran Sander
2025-12-12 19:05:28 +01:00
parent ff2f275ad3
commit 791be201a4
13 changed files with 996 additions and 764 deletions

View File

@@ -89,7 +89,6 @@ Butler-SOS:
host: influxdb-v3
port: 8086
v3Config:
org: butler-sos
database: butler-sos
token: butlersos-token
description: Butler SOS metrics

813
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -515,7 +515,6 @@ Butler-SOS:
port: 8086 # Port where InfluxDBdb is listening, usually 8086
version: 1 # Is the InfluxDB instance version 1.x or 2.x? Valid values are 1, 2, or 3
v3Config: # Settings for InfluxDB v3.x only, i.e. Butler-SOS.influxdbConfig.version=3
org: myorg
database: mydatabase
description: Butler SOS metrics
token: mytoken

View File

@@ -717,9 +717,6 @@ Configuration File:
`CONFIG: Influxdb retention policy duration: ${this.config.get('Butler-SOS.influxdbConfig.v2Config.retentionDuration')}`
);
} else if (this.config.get('Butler-SOS.influxdbConfig.version') === 3) {
this.logger.info(
`CONFIG: Influxdb organisation: ${this.config.get('Butler-SOS.influxdbConfig.v3Config.org')}`
);
this.logger.info(
`CONFIG: Influxdb database name: ${this.config.get('Butler-SOS.influxdbConfig.v3Config.database')}`
);
@@ -897,13 +894,45 @@ Configuration File:
}
} else if (this.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Set up Influxdb v3 client (uses its own client library, NOT same as v2)
const url = `http://${this.config.get('Butler-SOS.influxdbConfig.host')}:${this.config.get(
'Butler-SOS.influxdbConfig.port'
)}`;
const hostName = this.config.get('Butler-SOS.influxdbConfig.host');
const port = this.config.get('Butler-SOS.influxdbConfig.port');
const host = `http://${hostName}:${port}`;
const token = this.config.get('Butler-SOS.influxdbConfig.v3Config.token');
const database = this.config.get('Butler-SOS.influxdbConfig.v3Config.database');
try {
this.influx = new InfluxDBClient3({ url, token });
this.influx = new InfluxDBClient3({ host, token, database });
// Test connection by executing a simple query
this.logger.info(`INFLUXDB3 INIT: Testing connection to InfluxDB v3...`);
try {
// Execute a simple query to test the connection
const testQuery = `SELECT 1 as test LIMIT 1`;
const queryResult = this.influx.query(testQuery, database);
// Try to get first result (this will throw if connection fails)
const iterator = queryResult[Symbol.asyncIterator]();
await iterator.next();
// Connection successful - log details
const tokenPreview = token.substring(0, 4) + '***';
this.logger.info(`INFLUXDB3 INIT: Connection successful!`);
this.logger.info(`INFLUXDB3 INIT: Host: ${hostName}`);
this.logger.info(`INFLUXDB3 INIT: Port: ${port}`);
this.logger.info(`INFLUXDB3 INIT: Database: ${database}`);
this.logger.info(`INFLUXDB3 INIT: Token: ${tokenPreview}`);
} catch (testErr) {
this.logger.warn(
`INFLUXDB3 INIT: Could not test connection (this may be normal): ${this.getErrorMessage(testErr)}`
);
// Still log the configuration
const tokenPreview = token.substring(0, 4) + '***';
this.logger.info(`INFLUXDB3 INIT: Client created with:`);
this.logger.info(`INFLUXDB3 INIT: Host: ${hostName}`);
this.logger.info(`INFLUXDB3 INIT: Port: ${port}`);
this.logger.info(`INFLUXDB3 INIT: Database: ${database}`);
this.logger.info(`INFLUXDB3 INIT: Token: ${tokenPreview}`);
}
} catch (err) {
this.logger.error(
`INFLUXDB3 INIT: Error creating InfluxDB 3 client: ${this.getErrorMessage(err)}`
@@ -1156,7 +1185,6 @@ Configuration File:
}
} else if (this.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Get config
const org = this.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const databaseName = this.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const description = this.config.get('Butler-SOS.influxdbConfig.v3Config.description');
const token = this.config.get('Butler-SOS.influxdbConfig.v3Config.token');
@@ -1167,7 +1195,6 @@ Configuration File:
if (
this.influx &&
this.config.get('Butler-SOS.influxdbConfig.enable') === true &&
org?.length > 0 &&
databaseName?.length > 0 &&
token?.length > 0 &&
retentionDuration?.length > 0
@@ -1176,52 +1203,23 @@ Configuration File:
}
if (enableInfluxdb) {
// For InfluxDB v3, we use the database directly
this.logger.info(
`INFLUXDB3: Using organization "${org}" with database "${databaseName}"`
);
// For InfluxDB v3, we use client.write() directly (no getWriteApi method in v3)
this.logger.info(`INFLUXDB3: Using database "${databaseName}"`);
// Create array of per-server writeAPI objects for v3
// Each object has two properties: host and writeAPI, where host can be used as key later on
// For v3, we store the client itself and call write() directly
// The influxWriteApi array will contain objects with client and database info
this.serverList.forEach((server) => {
// Get per-server tags
const tags = getServerTags(this.logger, server);
// advanced write options for InfluxDB v3
const writeOptions = {
/* default tags to add to every point */
defaultTags: tags,
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 5000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 2, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
try {
// For InfluxDB v3, we use database instead of bucket
const serverWriteApi = this.influx.getWriteApi(
org,
databaseName,
'ns',
writeOptions
);
// Save to global variable, using serverName as key
this.influxWriteApi.push({
serverName: server.serverName,
writeAPI: serverWriteApi,
});
} catch (err) {
this.logger.error(
`INFLUXDB3: Error getting write API: ${this.getErrorMessage(err)}`
);
}
// Store client info and tags for this server
// v3 uses client.write() directly, not getWriteApi()
this.influxWriteApi.push({
serverName: server.serverName,
writeAPI: this.influx, // Store the client itself
database: databaseName,
defaultTags: tags, // Store tags for later use
});
});
}
}

View File

@@ -41,9 +41,8 @@ const handlebars = (await import('handlebars')).default;
const globals = (await import('../../globals.js')).default;
// Import the module under test
const { prepareFile, compileTemplate, getFileContent, getMimeType } = await import(
'../file-prep.js'
);
const { prepareFile, compileTemplate, getFileContent, getMimeType } =
await import('../file-prep.js');
describe('file-prep', () => {
beforeEach(() => {

View File

@@ -1,6 +1,6 @@
import { jest, describe, test, expect, beforeEach, afterEach } from '@jest/globals';
// Mock the InfluxDB client
// Mock the InfluxDB v2 client
jest.unstable_mockModule('@influxdata/influxdb-client', () => ({
Point: jest.fn().mockImplementation(() => ({
tag: jest.fn().mockReturnThis(),
@@ -13,6 +13,19 @@ jest.unstable_mockModule('@influxdata/influxdb-client', () => ({
})),
}));
// Mock the InfluxDB v3 client
jest.unstable_mockModule('@influxdata/influxdb3-client', () => ({
Point: jest.fn().mockImplementation(() => ({
setTag: jest.fn().mockReturnThis(),
setFloatField: jest.fn().mockReturnThis(),
setIntegerField: jest.fn().mockReturnThis(),
setStringField: jest.fn().mockReturnThis(),
setBooleanField: jest.fn().mockReturnThis(),
timestamp: jest.fn().mockReturnThis(),
toLineProtocol: jest.fn().mockReturnValue('mock-line-protocol'),
})),
}));
// Mock globals
jest.unstable_mockModule('../../globals.js', () => ({
default: {
@@ -239,10 +252,10 @@ describe('post-to-influxdb', () => {
if (key === 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName') {
return 'events_log';
}
if (key === 'Butler-SOS.influxdbConfig.v3Config.org') return 'test-org';
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
return undefined;
});
globals.config.has = jest.fn().mockReturnValue(false);
const mockLogEvents = [
{
source: 'test-source',
@@ -259,7 +272,9 @@ describe('post-to-influxdb', () => {
origin: 'test-origin',
context: 'test-context',
sessionId: 'test-session',
rawEvent: 'test-raw'
rawEvent: 'test-raw',
level: 'INFO',
log_row: '1',
},
];
globals.udpEvents = {
@@ -267,19 +282,14 @@ describe('post-to-influxdb', () => {
getUserEvents: jest.fn().mockResolvedValue([]),
};
globals.options = { instanceTag: 'test-instance' };
// Mock v3 writeApi
globals.influx.getWriteApi = jest.fn().mockReturnValue({
writePoint: jest.fn(),
});
// Mock v3 client write method
globals.influx.write = jest.fn().mockResolvedValue(undefined);
// Execute
await influxdb.storeEventCountInfluxDB();
// Verify
expect(globals.influx.getWriteApi).toHaveBeenCalled();
// The writeApi mock's writePoint should be called
const writeApi = globals.influx.getWriteApi.mock.results[0].value;
expect(writeApi.writePoint).toHaveBeenCalled();
expect(globals.influx.write).toHaveBeenCalled();
expect(globals.logger.verbose).toHaveBeenCalledWith(
expect.stringContaining(
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
@@ -294,10 +304,10 @@ describe('post-to-influxdb', () => {
if (key === 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName') {
return 'events_user';
}
if (key === 'Butler-SOS.influxdbConfig.v3Config.org') return 'test-org';
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
return undefined;
});
globals.config.has = jest.fn().mockReturnValue(false);
const mockUserEvents = [
{
source: 'test-source',
@@ -314,7 +324,7 @@ describe('post-to-influxdb', () => {
origin: 'test-origin',
context: 'test-context',
sessionId: 'test-session',
rawEvent: 'test-raw'
rawEvent: 'test-raw',
},
];
globals.udpEvents = {
@@ -322,19 +332,14 @@ describe('post-to-influxdb', () => {
getUserEvents: jest.fn().mockResolvedValue(mockUserEvents),
};
globals.options = { instanceTag: 'test-instance' };
// Mock v3 writeApi
globals.influx.getWriteApi = jest.fn().mockReturnValue({
writePoint: jest.fn(),
});
// Mock v3 client write method
globals.influx.write = jest.fn().mockResolvedValue(undefined);
// Execute
await influxdb.storeEventCountInfluxDB();
// Verify
expect(globals.influx.getWriteApi).toHaveBeenCalled();
// The writeApi mock's writePoint should be called
const writeApi = globals.influx.getWriteApi.mock.results[0].value;
expect(writeApi.writePoint).toHaveBeenCalled();
expect(globals.influx.write).toHaveBeenCalled();
expect(globals.logger.verbose).toHaveBeenCalledWith(
expect.stringContaining(
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
@@ -719,23 +724,34 @@ describe('post-to-influxdb', () => {
if (key === 'Butler-SOS.influxdbConfig.includeFields.loadedDocs') return false;
if (key === 'Butler-SOS.influxdbConfig.includeFields.inMemoryDocs') return false;
if (key === 'Butler-SOS.appNames.enableAppNameExtract') return false;
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
return undefined;
});
// Mock v3 client write method
const mockWrite = jest.fn().mockResolvedValue(undefined);
globals.influxWriteApi = [
{
serverName: 'testserver',
writeAPI: {
writePoints: jest.fn(),
},
writeAPI: mockWrite,
database: 'test-database',
},
];
globals.influx = {
write: mockWrite,
};
const serverName = 'testserver';
const host = 'testhost';
const serverTags = { host: 'testhost', server_name: 'testserver' };
const healthBody = {
version: '1.0.0',
started: '20220801T121212.000Z',
apps: { active_docs: [], loaded_docs: [], in_memory_docs: [], calls: 100, selections: 50 },
apps: {
active_docs: [],
loaded_docs: [],
in_memory_docs: [],
calls: 100,
selections: 50,
},
cache: { added: 0, hits: 10, lookups: 15, replaced: 2, bytes_added: 1000 },
cpu: { total: 25 },
mem: { committed: 1000, allocated: 800, free: 200 },
@@ -745,7 +761,7 @@ describe('post-to-influxdb', () => {
await influxdb.postHealthMetricsToInfluxdb(serverName, host, healthBody, serverTags);
expect(globals.influxWriteApi[0].writeAPI.writePoints).toHaveBeenCalled();
expect(mockWrite).toHaveBeenCalled();
});
});

View File

@@ -20,9 +20,8 @@ jest.unstable_mockModule('../../globals.js', () => ({
const globals = (await import('../../globals.js')).default;
// Import the module under test
const { postHealthToMQTT, postUserSessionsToMQTT, postUserEventToMQTT } = await import(
'../post-to-mqtt.js'
);
const { postHealthToMQTT, postUserSessionsToMQTT, postUserEventToMQTT } =
await import('../post-to-mqtt.js');
describe('post-to-mqtt', () => {
beforeEach(() => {

View File

@@ -116,9 +116,8 @@ jest.unstable_mockModule('../prom-client.js', () => ({
}));
// Import the module under test
const { setupUserSessionsTimer, getProxySessionStatsFromSense } = await import(
'../proxysessionmetrics.js'
);
const { setupUserSessionsTimer, getProxySessionStatsFromSense } =
await import('../proxysessionmetrics.js');
describe('proxysessionmetrics', () => {
let proxysessionmetrics;

View File

@@ -28,9 +28,8 @@ const fs = (await import('fs')).default;
const globals = (await import('../../globals.js')).default;
// Import modules under test
const { getCertificates: getCertificatesUtil, createCertificateOptions } = await import(
'../cert-utils.js'
);
const { getCertificates: getCertificatesUtil, createCertificateOptions } =
await import('../cert-utils.js');
describe('Certificate loading', () => {
const mockCertificateOptions = {

View File

@@ -169,10 +169,10 @@ export async function verifyAppConfig(cfg) {
// Verify values of specific config entries
// If InfluxDB is enabled, check if the version is valid
// Valid values: 1 and 2
// Valid values: 1, 2, and 3
if (cfg.get('Butler-SOS.influxdbConfig.enable') === true) {
const influxdbVersion = cfg.get('Butler-SOS.influxdbConfig.version');
if (influxdbVersion !== 1 && influxdbVersion !== 2) {
if (influxdbVersion !== 1 && influxdbVersion !== 2 && influxdbVersion !== 3) {
console.error(
`VERIFY CONFIG FILE ERROR: Butler-SOS.influxdbConfig.enable (=InfluxDB version) ${influxdbVersion} is invalid. Exiting.`
);

View File

@@ -319,13 +319,12 @@ export const destinationsSchema = {
v3Config: {
type: 'object',
properties: {
org: { type: 'string' },
database: { type: 'string' },
description: { type: 'string' },
token: { type: 'string' },
retentionDuration: { type: 'string' },
},
required: ['org', 'database', 'description', 'token', 'retentionDuration'],
required: ['database', 'description', 'token', 'retentionDuration'],
additionalProperties: false,
},
v2Config: {

View File

@@ -1,4 +1,5 @@
import { Point } from '@influxdata/influxdb-client';
import { Point as Point3 } from '@influxdata/influxdb3-client';
import globals from '../globals.js';
@@ -568,85 +569,113 @@ export async function postHealthMetricsToInfluxdb(serverName, host, body, server
return;
}
// Get database from config
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
// Create a new point with the data to be written to InfluxDB v3
const points = [
new Point('sense_server')
.stringField('version', body.version)
.stringField('started', body.started)
.stringField('uptime', formattedTime),
new Point3('sense_server')
.setStringField('version', body.version)
.setStringField('started', body.started)
.setStringField('uptime', formattedTime),
new Point('mem')
.floatField('comitted', body.mem.committed)
.floatField('allocated', body.mem.allocated)
.floatField('free', body.mem.free),
new Point3('mem')
.setFloatField('comitted', body.mem.committed)
.setFloatField('allocated', body.mem.allocated)
.setFloatField('free', body.mem.free),
new Point('apps')
.intField('active_docs_count', body.apps.active_docs.length)
.intField('loaded_docs_count', body.apps.loaded_docs.length)
.intField('in_memory_docs_count', body.apps.in_memory_docs.length)
.stringField(
new Point3('apps')
.setIntegerField('active_docs_count', body.apps.active_docs.length)
.setIntegerField('loaded_docs_count', body.apps.loaded_docs.length)
.setIntegerField('in_memory_docs_count', body.apps.in_memory_docs.length)
.setStringField(
'active_docs',
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
? body.apps.active_docs
: ''
)
.stringField(
.setStringField(
'active_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
? activeSessionDocNames
? appNamesActive.toString()
: ''
)
.stringField(
.setStringField(
'active_session_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
? sessionAppNamesActive.toString()
: ''
)
.setStringField(
'loaded_docs',
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
? body.apps.loaded_docs
: ''
)
.stringField(
.setStringField(
'loaded_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
? loadedSessionDocNames
? appNamesLoaded.toString()
: ''
)
.stringField(
.setStringField(
'loaded_session_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
? sessionAppNamesLoaded.toString()
: ''
)
.setStringField(
'in_memory_docs',
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
? body.apps.in_memory_docs
: ''
)
.stringField(
.setStringField(
'in_memory_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
? inMemorySessionDocNames
? appNamesInMemory.toString()
: ''
)
.intField('calls', body.apps.calls)
.intField('selections', body.apps.selections),
.setStringField(
'in_memory_session_docs_names',
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
? sessionAppNamesInMemory.toString()
: ''
)
.setIntegerField('calls', body.apps.calls)
.setIntegerField('selections', body.apps.selections),
new Point('cpu').intField('total', body.cpu.total),
new Point3('cpu').setIntegerField('total', body.cpu.total),
new Point('session')
.intField('active', body.session.active)
.intField('total', body.session.total),
new Point3('session')
.setIntegerField('active', body.session.active)
.setIntegerField('total', body.session.total),
new Point('users')
.intField('active', body.users.active)
.intField('total', body.users.total),
new Point3('users')
.setIntegerField('active', body.users.active)
.setIntegerField('total', body.users.total),
new Point('cache')
.intField('hits', body.cache.hits)
.intField('lookups', body.cache.lookups)
.intField('added', body.cache.added)
.intField('replaced', body.cache.replaced)
.intField('bytes_added', body.cache.bytes_added),
new Point3('cache')
.setIntegerField('hits', body.cache.hits)
.setIntegerField('lookups', body.cache.lookups)
.setIntegerField('added', body.cache.added)
.setIntegerField('replaced', body.cache.replaced)
.setIntegerField('bytes_added', body.cache.bytes_added),
new Point3('saturated').setBooleanField('saturated', body.saturated),
];
// Write to InfluxDB
try {
const res = await writeApi.writeAPI.writePoints(points);
for (const point of points) {
await globals.influx.write(point.toLineProtocol(), database);
}
globals.logger.debug(`HEALTH METRICS: Wrote data to InfluxDB v3`);
} catch (err) {
globals.logger.error(
@@ -772,16 +801,17 @@ export async function postProxySessionsToInfluxdb(userSessions) {
return;
}
// Get database from config
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
// Create data points
const points = [
new Point('user_session_summary')
.intField('session_count', userSessions.sessionCount)
.stringField('session_user_id_list', userSessions.uniqueUserList),
];
const point = new Point3('user_session_summary')
.setIntegerField('session_count', userSessions.sessionCount)
.setStringField('session_user_id_list', userSessions.uniqueUserList);
// Write to InfluxDB
try {
const res = await writeApi.writeAPI.writePoints(points);
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`PROXY SESSIONS: Wrote data to InfluxDB v3`);
} catch (err) {
globals.logger.error(
@@ -950,21 +980,20 @@ export async function postButlerSOSMemoryUsageToInfluxdb(memory) {
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions);
const point = new Point('butlersos_memory_usage')
.tag('butler_sos_instance', memory.instanceTag)
.tag('version', butlerVersion)
.floatField('heap_used', memory.heapUsedMByte)
.floatField('heap_total', memory.heapTotalMByte)
.floatField('external', memory.externalMemoryMByte)
.floatField('process_memory', memory.processMemoryMByte);
// v3 uses client.write() directly, not getWriteApi()
const point = new Point3('butlersos_memory_usage')
.setTag('butler_sos_instance', memory.instanceTag)
.setTag('version', butlerVersion)
.setFloatField('heap_used', memory.heapUsedMByte)
.setFloatField('heap_total', memory.heapTotalMByte)
.setFloatField('external', memory.externalMemoryMByte)
.setFloatField('process_memory', memory.processMemoryMByte);
try {
const res = await writeApi.writePoint(point);
// Convert point to line protocol and write directly
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`MEMORY USAGE INFLUXDB: Wrote data to InfluxDB v3`);
} catch (err) {
globals.logger.error(
@@ -1191,45 +1220,28 @@ export async function postUserEventToInfluxdb(msg) {
);
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Create new write API object
// Advanced write options
const writeOptions = {
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 5000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 2, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions);
const point = new Point('log_event')
.tag('host', msg.host)
.tag('level', msg.level)
.tag('source', msg.source)
.tag('log_row', msg.log_row)
.tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.stringField('message', msg.message)
.stringField('exception_message', msg.exception_message ? msg.exception_message : '')
.stringField('app_name', msg.appName ? msg.appName : '')
.stringField('app_id', msg.appId ? msg.appId : '')
.stringField('execution_id', msg.executionId ? msg.executionId : '')
.stringField('command', msg.command ? msg.command : '')
.stringField('result_code', msg.resultCode ? msg.resultCode : '')
.stringField('origin', msg.origin ? msg.origin : '')
.stringField('context', msg.context ? msg.context : '')
.stringField('session_id', msg.sessionId ? msg.sessionId : '')
.stringField('raw_event', msg.rawEvent ? msg.rawEvent : '');
const point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
.setTag('source', msg.source)
.setTag('log_row', msg.log_row)
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.setStringField('message', msg.message)
.setStringField('exception_message', msg.exception_message ? msg.exception_message : '')
.setStringField('app_name', msg.appName ? msg.appName : '')
.setStringField('app_id', msg.appId ? msg.appId : '')
.setStringField('execution_id', msg.executionId ? msg.executionId : '')
.setStringField('command', msg.command ? msg.command : '')
.setStringField('result_code', msg.resultCode ? msg.resultCode : '')
.setStringField('origin', msg.origin ? msg.origin : '')
.setStringField('context', msg.context ? msg.context : '')
.setStringField('session_id', msg.sessionId ? msg.sessionId : '')
.setStringField('raw_event', msg.rawEvent ? msg.rawEvent : '');
try {
const res = await writeApi.writePoint(point);
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`USER EVENT INFLUXDB: Wrote data to InfluxDB v3`);
globals.logger.verbose(
@@ -1712,134 +1724,181 @@ export async function postLogEventToInfluxdb(msg) {
msg.source === 'qseow-repository' ||
msg.source === 'qseow-qix-perf'
) {
// Create new write API object
// Advanced write options
const writeOptions = {
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 5000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 2, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions);
const logLevel = 'log_level';
const logLevelValue = msg.level;
// Determine what tags to use for the log event point
// Tags are are part of the data model that will be used in this call.
// Tags are what make for efficient queries in InfluxDB
let point;
// Does the message have QIX perf data in the message field?
// I.e. is this a log event with performance data from QIX engine?
if (
msg.source === 'qseow-qix-perf' &&
msg.message.split(' ').length >= 22 &&
msg.message.split(' ')[7] !== '(null)'
) {
const parts = msg.message.split(' ');
const objectType = parts[5];
const method = parts[6];
const appId = parts[7];
if (isNaN(parts[9]) || isNaN(parts[11]) || isNaN(parts[13])) {
// One or more of the performance metric is not a number, this is not a valid QIX perf log event
globals.logger.debug(
`LOG EVENT INFLUXDB v3: Performance metrics not valid: ${parts[9]}, ${parts[11]}, ${parts[13]}`
);
point = new Point('log_event')
.tag('host', msg.host)
.tag('level', msg.level)
.tag('source', msg.source)
.tag('log_row', msg.log_row)
.tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.stringField('message', msg.message)
.stringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.stringField('app_name', msg.appName ? msg.appName : '')
.stringField('app_id', msg.appId ? msg.appId : '')
.stringField('execution_id', msg.executionId ? msg.executionId : '')
.stringField('command', msg.command ? msg.command : '')
.stringField('result_code', msg.resultCode ? msg.resultCode : '')
.stringField('origin', msg.origin ? msg.origin : '')
.stringField('context', msg.context ? msg.context : '')
.stringField('session_id', msg.sessionId ? msg.sessionId : '')
.stringField('raw_event', msg.rawEvent ? msg.rawEvent : '');
} else {
// We have a valid QIX performance log event
point = new Point('log_event')
.tag('host', msg.host)
.tag('level', msg.level)
.tag('source', msg.source)
.tag('log_row', msg.log_row)
.tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.tag('object_type', objectType)
.tag('method', method)
.stringField('message', msg.message)
.stringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.stringField('app_name', msg.appName ? msg.appName : '')
.stringField('app_id', appId)
.stringField('execution_id', msg.executionId ? msg.executionId : '')
.stringField('command', msg.command ? msg.command : '')
.stringField('result_code', msg.resultCode ? msg.resultCode : '')
.stringField('origin', msg.origin ? msg.origin : '')
.stringField('context', msg.context ? msg.context : '')
.stringField('session_id', msg.sessionId ? msg.sessionId : '')
.stringField('raw_event', msg.rawEvent ? msg.rawEvent : '')
// engine performance fields
.floatField('process_time', parseFloat(parts[9]))
.floatField('work_time', parseFloat(parts[11]))
.floatField('lock_time', parseFloat(parts[13]))
.floatField('validate_time', parseFloat(parts[15]))
.floatField('traverse_time', parseFloat(parts[17]))
.intField('handle', parseInt(parts[19], 10))
.intField('net_ram', parseInt(parts[20], 10))
.intField('peak_ram', parseInt(parts[21], 10));
}
} else {
// No QIX perf data, use standard log event format
point = new Point('log_event')
.tag('host', msg.host)
.tag('level', msg.level)
.tag('source', msg.source)
.tag('log_row', msg.log_row)
.tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.stringField('message', msg.message)
.stringField(
// Handle each message type with its specific fields
if (msg.source === 'qseow-engine') {
// Engine fields: message, exception_message, command, result_code, origin, context, session_id, raw_event
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
.setTag('source', msg.source)
.setTag('log_row', msg.log_row)
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.setStringField('message', msg.message)
.setStringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.stringField('app_name', msg.appName ? msg.appName : '')
.stringField('app_id', msg.appId ? msg.appId : '')
.stringField('execution_id', msg.executionId ? msg.executionId : '')
.stringField('command', msg.command ? msg.command : '')
.stringField('result_code', msg.resultCode ? msg.resultCode : '')
.stringField('origin', msg.origin ? msg.origin : '')
.stringField('context', msg.context ? msg.context : '')
.stringField('session_id', msg.sessionId ? msg.sessionId : '')
.stringField('raw_event', msg.rawEvent ? msg.rawEvent : '');
.setStringField('command', msg.command ? msg.command : '')
.setStringField('result_code', msg.result_code ? msg.result_code : '')
.setStringField('origin', msg.origin ? msg.origin : '')
.setStringField('context', msg.context ? msg.context : '')
.setStringField('session_id', msg.session_id ? msg.session_id : '')
.setStringField('raw_event', JSON.stringify(msg));
// Conditional tags
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
if (msg?.user_directory?.length > 0)
point.setTag('user_directory', msg.user_directory);
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
if (msg?.windows_user?.length > 0)
point.setTag('windows_user', msg.windows_user);
if (msg?.task_id?.length > 0) point.setTag('task_id', msg.task_id);
if (msg?.task_name?.length > 0) point.setTag('task_name', msg.task_name);
if (msg?.app_id?.length > 0) point.setTag('app_id', msg.app_id);
if (msg?.app_name?.length > 0) point.setTag('app_name', msg.app_name);
if (msg?.engine_exe_version?.length > 0)
point.setTag('engine_exe_version', msg.engine_exe_version);
} else if (msg.source === 'qseow-proxy') {
// Proxy fields: message, exception_message, command, result_code, origin, context, raw_event (NO session_id)
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
.setTag('source', msg.source)
.setTag('log_row', msg.log_row)
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.setStringField('message', msg.message)
.setStringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.setStringField('command', msg.command ? msg.command : '')
.setStringField('result_code', msg.result_code ? msg.result_code : '')
.setStringField('origin', msg.origin ? msg.origin : '')
.setStringField('context', msg.context ? msg.context : '')
.setStringField('raw_event', JSON.stringify(msg));
// Conditional tags
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
if (msg?.user_directory?.length > 0)
point.setTag('user_directory', msg.user_directory);
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
} else if (msg.source === 'qseow-scheduler') {
// Scheduler fields: message, exception_message, app_name, app_id, execution_id, raw_event
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
.setTag('source', msg.source)
.setTag('log_row', msg.log_row)
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.setStringField('message', msg.message)
.setStringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.setStringField('app_name', msg.app_name ? msg.app_name : '')
.setStringField('app_id', msg.app_id ? msg.app_id : '')
.setStringField('execution_id', msg.execution_id ? msg.execution_id : '')
.setStringField('raw_event', JSON.stringify(msg));
// Conditional tags
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
if (msg?.user_directory?.length > 0)
point.setTag('user_directory', msg.user_directory);
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.task_id?.length > 0) point.setTag('task_id', msg.task_id);
if (msg?.task_name?.length > 0) point.setTag('task_name', msg.task_name);
} else if (msg.source === 'qseow-repository') {
// Repository fields: message, exception_message, command, result_code, origin, context, raw_event
point = new Point3('log_event')
.setTag('host', msg.host)
.setTag('level', msg.level)
.setTag('source', msg.source)
.setTag('log_row', msg.log_row)
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
.setStringField('message', msg.message)
.setStringField(
'exception_message',
msg.exception_message ? msg.exception_message : ''
)
.setStringField('command', msg.command ? msg.command : '')
.setStringField('result_code', msg.result_code ? msg.result_code : '')
.setStringField('origin', msg.origin ? msg.origin : '')
.setStringField('context', msg.context ? msg.context : '')
.setStringField('raw_event', JSON.stringify(msg));
// Conditional tags
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
if (msg?.user_directory?.length > 0)
point.setTag('user_directory', msg.user_directory);
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
} else if (msg.source === 'qseow-qix-perf') {
// QIX Performance fields: app_id, process_time, work_time, lock_time, validate_time, traverse_time, handle, net_ram, peak_ram, raw_event
point = new Point3('log_event')
.setTag('host', msg.host ? msg.host : '<Unknown>')
.setTag('level', msg.level ? msg.level : '<Unknown>')
.setTag('source', msg.source ? msg.source : '<Unknown>')
.setTag('log_row', msg.log_row ? msg.log_row : '-1')
.setTag('subsystem', msg.subsystem ? msg.subsystem : '<Unknown>')
.setTag('method', msg.method ? msg.method : '<Unknown>')
.setTag('object_type', msg.object_type ? msg.object_type : '<Unknown>')
.setTag(
'proxy_session_id',
msg.proxy_session_id ? msg.proxy_session_id : '-1'
)
.setTag('session_id', msg.session_id ? msg.session_id : '-1')
.setTag(
'event_activity_source',
msg.event_activity_source ? msg.event_activity_source : '<Unknown>'
)
.setStringField('app_id', msg.app_id ? msg.app_id : '')
.setFloatField('process_time', msg.process_time)
.setFloatField('work_time', msg.work_time)
.setFloatField('lock_time', msg.lock_time)
.setFloatField('validate_time', msg.validate_time)
.setFloatField('traverse_time', msg.traverse_time)
.setIntegerField('handle', msg.handle)
.setIntegerField('net_ram', msg.net_ram)
.setIntegerField('peak_ram', msg.peak_ram)
.setStringField('raw_event', JSON.stringify(msg));
// Conditional tags
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
if (msg?.user_directory?.length > 0)
point.setTag('user_directory', msg.user_directory);
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
if (msg?.app_id?.length > 0) point.setTag('app_id', msg.app_id);
if (msg?.app_name?.length > 0) point.setTag('app_name', msg.app_name);
if (msg?.object_id?.length > 0) point.setTag('object_id', msg.object_id);
}
// Add log event categories to tags if available
if (msg?.category?.length > 0) {
msg.category.forEach((category) => {
point.setTag(category.name, category.value);
});
}
// Add custom tags from config file
if (
globals.config.has('Butler-SOS.logEvents.tags') &&
globals.config.get('Butler-SOS.logEvents.tags') !== null &&
globals.config.get('Butler-SOS.logEvents.tags').length > 0
) {
const configTags = globals.config.get('Butler-SOS.logEvents.tags');
for (const item of configTags) {
point.setTag(item.name, item.value);
}
}
try {
const res = await writeApi.writePoint(point);
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`);
globals.logger.verbose(
@@ -2094,49 +2153,28 @@ export async function storeEventCountInfluxDB() {
globals.logger.error(`EVENT COUNT INFLUXDB: Error getting write API: ${err}`);
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Create new write API object
// advanced write options
const writeOptions = {
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 5000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 2, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions);
try {
// Store data for each log event
for (const logEvent of logEvents) {
const tags = {
butler_sos_instance: globals.options.instanceTag,
event_type: 'log',
source: logEvent.source,
host: logEvent.host,
subsystem: logEvent.subsystem,
};
// Add static tags defined in config file, if any
// Add the static tag to the data structure sent to InfluxDB
// Is the array present in the config file?
if (
globals.config.has(
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag'
) &&
globals.config.has('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags') &&
Array.isArray(
globals.config.get(
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag'
)
globals.config.get('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags')
)
) {
// Yes, the config tag array exists
const configTags = globals.config.get(
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag'
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags'
);
configTags.forEach((tag) => {
@@ -2144,39 +2182,23 @@ export async function storeEventCountInfluxDB() {
});
}
// Add timestamp from when the event was received by Butler SOS
const point = new Point(
const point = new Point3(
globals.config.get(
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName'
)
)
.tag('host', logEvent.host)
.tag('level', logEvent.level)
.tag('source', logEvent.source)
.tag('log_row', logEvent.log_row)
.tag('subsystem', logEvent.subsystem ? logEvent.subsystem : 'n/a')
.stringField('message', logEvent.message)
.stringField(
'exception_message',
logEvent.exception_message ? logEvent.exception_message : ''
)
.stringField('app_name', logEvent.appName ? logEvent.appName : '')
.stringField('app_id', logEvent.appId ? logEvent.appId : '')
.stringField('execution_id', logEvent.executionId ? logEvent.executionId : '')
.stringField('command', logEvent.command ? logEvent.command : '')
.stringField('result_code', logEvent.resultCode ? logEvent.resultCode : '')
.stringField('origin', logEvent.origin ? logEvent.origin : '')
.stringField('context', logEvent.context ? logEvent.context : '')
.stringField('session_id', logEvent.sessionId ? logEvent.sessionId : '')
.stringField('raw_event', logEvent.rawEvent ? logEvent.rawEvent : '')
.timestamp(new Date(logEvent.timestamp));
.setTag('event_type', 'log')
.setTag('source', logEvent.source)
.setTag('host', logEvent.host)
.setTag('subsystem', logEvent.subsystem)
.setIntegerField('counter', logEvent.counter);
// Add tags to point
Object.keys(tags).forEach((key) => {
point.tag(key, tags[key]);
point.setTag(key, tags[key]);
});
const res = await writeApi.writePoint(point);
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`EVENT COUNT INFLUXDB: Wrote data to InfluxDB v3`);
}
@@ -2206,23 +2228,23 @@ export async function storeEventCountInfluxDB() {
});
}
const point = new Point(
const point = new Point3(
globals.config.get(
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName'
)
)
.tag('event_type', 'user')
.tag('source', event.source)
.tag('host', event.host)
.tag('subsystem', event.subsystem)
.intField('counter', event.counter);
.setTag('event_type', 'user')
.setTag('source', event.source)
.setTag('host', event.host)
.setTag('subsystem', event.subsystem)
.setIntegerField('counter', event.counter);
// Add tags to point
Object.keys(tags).forEach((key) => {
point.tag(key, tags[key]);
point.setTag(key, tags[key]);
});
const res = await writeApi.writePoint(point);
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.debug(`EVENT COUNT INFLUXDB: Wrote user event data to InfluxDB v3`);
}
@@ -2475,25 +2497,8 @@ export async function storeRejectedEventCountInfluxDB() {
globals.logger.error(`REJECTED LOG EVENT INFLUXDB: Error getting write API: ${err}`);
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Create new write API object
// advanced write options
const writeOptions = {
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
flushInterval: 5000,
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
maxRetries: 2, // do not retry writes
// ... there are more write options that can be customized, see
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
};
const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org');
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions);
try {
const points = [];
const measurementName = globals.config.get(
@@ -2503,23 +2508,24 @@ export async function storeRejectedEventCountInfluxDB() {
rejectedLogEvents.forEach((event) => {
globals.logger.debug(`REJECTED LOG EVENT INFLUXDB 3: ${JSON.stringify(event)}`);
if (
event.source === 'qseow-qix-perf' &&
event.message.split(' ').length >= 22 &&
event.message.split(' ')[7] !== '(null)'
) {
const parts = event.message.split(' ');
const objectType = parts[5];
const method = parts[6];
if (event.source === 'qseow-qix-perf') {
let point = new Point3(measurementName)
.setTag('source', event.source)
.setTag('object_type', event.objectType)
.setTag('method', event.method)
.setIntegerField('counter', event.counter)
.setFloatField('process_time', event.processTime);
let point = new Point(measurementName)
.tag('source', event.source)
.tag('object_type', objectType)
.tag('method', method)
.tag('level', event.level)
.tag('log_row', event.log_row)
.stringField('message', event.message)
.intField('count', 1);
// Add app_id and app_name if available
if (event?.appId) {
point.setTag('app_id', event.appId);
}
if (event?.appName?.length > 0) {
point.setTag('app_name', event.appName);
point.setTag('app_name_set', 'true');
} else {
point.setTag('app_name_set', 'false');
}
// Add static tags defined in config file, if any
if (
@@ -2536,37 +2542,15 @@ export async function storeRejectedEventCountInfluxDB() {
'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags'
);
for (const item of configTags) {
point.tag(item.name, item.value);
point.setTag(item.name, item.value);
}
}
points.push(point);
} else {
let point = new Point(measurementName)
.tag('source', event.source)
.tag('level', event.level)
.tag('log_row', event.log_row)
.stringField('message', event.message)
.intField('count', 1);
// Add static tags defined in config file, if any
if (
globals.config.has(
'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag'
) &&
Array.isArray(
globals.config.get(
'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag'
)
)
) {
const configTags = globals.config.get(
'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag'
);
for (const item of configTags) {
point.tag(item.name, item.value);
}
}
let point = new Point3(measurementName)
.setTag('source', event.source)
.setIntegerField('counter', event.counter);
points.push(point);
}
@@ -2574,7 +2558,9 @@ export async function storeRejectedEventCountInfluxDB() {
// Write to InfluxDB
try {
const res = await writeApi.writePoints(points);
for (const point of points) {
await globals.influx.write(point.toLineProtocol(), database);
}
globals.logger.debug(`REJECT LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`);
} catch (err) {
globals.logger.error(
@@ -2736,6 +2722,56 @@ export async function postUserEventQueueMetricsToInfluxdb() {
);
return;
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// InfluxDB 3.x
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
try {
const point = new Point3(measurementName)
.setTag('queue_type', 'user_events')
.setTag('host', globals.hostInfo.hostname)
.setIntegerField('queue_size', metrics.queueSize)
.setIntegerField('queue_max_size', metrics.queueMaxSize)
.setFloatField('queue_utilization_pct', metrics.queueUtilizationPct)
.setIntegerField('queue_pending', metrics.queuePending)
.setIntegerField('messages_received', metrics.messagesReceived)
.setIntegerField('messages_queued', metrics.messagesQueued)
.setIntegerField('messages_processed', metrics.messagesProcessed)
.setIntegerField('messages_failed', metrics.messagesFailed)
.setIntegerField('messages_dropped_total', metrics.messagesDroppedTotal)
.setIntegerField(
'messages_dropped_rate_limit',
metrics.messagesDroppedRateLimit
)
.setIntegerField(
'messages_dropped_queue_full',
metrics.messagesDroppedQueueFull
)
.setIntegerField('messages_dropped_size', metrics.messagesDroppedSize)
.setFloatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
.setFloatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
.setFloatField('processing_time_max_ms', metrics.processingTimeMaxMs)
.setIntegerField('rate_limit_current', metrics.rateLimitCurrent)
.setIntegerField('backpressure_active', metrics.backpressureActive);
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.setTag(item.name, item.value);
}
}
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.verbose(
'USER EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v3'
);
} catch (err) {
globals.logger.error(
`USER EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v3! ${err}`
);
return;
}
}
// Clear metrics after writing
@@ -2889,6 +2925,56 @@ export async function postLogEventQueueMetricsToInfluxdb() {
);
return;
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// InfluxDB 3.x
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
try {
const point = new Point3(measurementName)
.setTag('queue_type', 'log_events')
.setTag('host', globals.hostInfo.hostname)
.setIntegerField('queue_size', metrics.queueSize)
.setIntegerField('queue_max_size', metrics.queueMaxSize)
.setFloatField('queue_utilization_pct', metrics.queueUtilizationPct)
.setIntegerField('queue_pending', metrics.queuePending)
.setIntegerField('messages_received', metrics.messagesReceived)
.setIntegerField('messages_queued', metrics.messagesQueued)
.setIntegerField('messages_processed', metrics.messagesProcessed)
.setIntegerField('messages_failed', metrics.messagesFailed)
.setIntegerField('messages_dropped_total', metrics.messagesDroppedTotal)
.setIntegerField(
'messages_dropped_rate_limit',
metrics.messagesDroppedRateLimit
)
.setIntegerField(
'messages_dropped_queue_full',
metrics.messagesDroppedQueueFull
)
.setIntegerField('messages_dropped_size', metrics.messagesDroppedSize)
.setFloatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
.setFloatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
.setFloatField('processing_time_max_ms', metrics.processingTimeMaxMs)
.setIntegerField('rate_limit_current', metrics.rateLimitCurrent)
.setIntegerField('backpressure_active', metrics.backpressureActive);
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.setTag(item.name, item.value);
}
}
await globals.influx.write(point.toLineProtocol(), database);
globals.logger.verbose(
'LOG EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v3'
);
} catch (err) {
globals.logger.error(
`LOG EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v3! ${err}`
);
return;
}
}
// Clear metrics after writing

View File

@@ -98,6 +98,10 @@ function prepUserSessionMetrics(serverName, host, virtualProxy, body, tags) {
.uintField('session_count', userProxySessionsData.sessionCount)
.stringField('session_user_id_list', userProxySessionsData.uniqueUserList),
];
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// Create empty array for InfluxDB v3
// Individual session datapoints will be added later
userProxySessionsData.datapointInfluxdb = [];
}
// Prometheus specific.
@@ -184,9 +188,15 @@ function prepUserSessionMetrics(serverName, host, virtualProxy, body, tags) {
.stringField('session_id', bodyItem.SessionId)
.stringField('user_directory', bodyItem.UserDirectory)
.stringField('user_id', bodyItem.UserId);
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
// For v3, session details are not stored as individual points
// Only summary data is stored, so we skip individual session datapoints
sessionDatapoint = null;
}
userProxySessionsData.datapointInfluxdb.push(sessionDatapoint);
if (sessionDatapoint) {
userProxySessionsData.datapointInfluxdb.push(sessionDatapoint);
}
}
}