mirror of
https://github.com/ptarmiganlabs/butler-sos.git
synced 2025-12-19 09:47:53 -05:00
Revmove old InfluxDB v3 code and tests
This commit is contained in:
@@ -504,7 +504,9 @@ Butler-SOS:
|
||||
enable: true
|
||||
# Feature flag to enable refactored InfluxDB code (recommended for better maintainability)
|
||||
# Set to true to use the new modular implementation, false for legacy code
|
||||
useRefactoredCode: false
|
||||
# Note: v3 always uses refactored code (legacy v3 code has been removed)
|
||||
# This flag only affects v1 and v2 implementations
|
||||
useRefactoredCode: true
|
||||
# Items below are mandatory if influxdbConfig.enable=true
|
||||
host: influxdb.mycompany.com # InfluxDB host, hostname, FQDN or IP address
|
||||
port: 8086 # Port where InfluxDBdb is listening, usually 8086
|
||||
|
||||
@@ -245,108 +245,6 @@ describe('post-to-influxdb', () => {
|
||||
);
|
||||
});
|
||||
|
||||
test('should store log events to InfluxDB (InfluxDB v3)', async () => {
|
||||
// Setup
|
||||
globals.config.get = jest.fn((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 3;
|
||||
if (key === 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName') {
|
||||
return 'events_log';
|
||||
}
|
||||
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
|
||||
return undefined;
|
||||
});
|
||||
globals.config.has = jest.fn().mockReturnValue(false);
|
||||
const mockLogEvents = [
|
||||
{
|
||||
source: 'test-source',
|
||||
host: 'test-host',
|
||||
subsystem: 'test-subsystem',
|
||||
counter: 5,
|
||||
timestamp: '2023-01-01T00:00:00.000Z',
|
||||
message: 'test message',
|
||||
appName: 'test-app',
|
||||
appId: 'test-app-id',
|
||||
executionId: 'test-exec',
|
||||
command: 'test-cmd',
|
||||
resultCode: '200',
|
||||
origin: 'test-origin',
|
||||
context: 'test-context',
|
||||
sessionId: 'test-session',
|
||||
rawEvent: 'test-raw',
|
||||
level: 'INFO',
|
||||
log_row: '1',
|
||||
},
|
||||
];
|
||||
globals.udpEvents = {
|
||||
getLogEvents: jest.fn().mockResolvedValue(mockLogEvents),
|
||||
getUserEvents: jest.fn().mockResolvedValue([]),
|
||||
};
|
||||
globals.options = { instanceTag: 'test-instance' };
|
||||
// Mock v3 client write method
|
||||
globals.influx.write = jest.fn().mockResolvedValue(undefined);
|
||||
|
||||
// Execute
|
||||
await influxdb.storeEventCountInfluxDB();
|
||||
|
||||
// Verify
|
||||
expect(globals.influx.write).toHaveBeenCalled();
|
||||
expect(globals.logger.verbose).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
|
||||
)
|
||||
);
|
||||
});
|
||||
|
||||
test('should store user events to InfluxDB (InfluxDB v3)', async () => {
|
||||
// Setup
|
||||
globals.config.get = jest.fn((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 3;
|
||||
if (key === 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName') {
|
||||
return 'events_user';
|
||||
}
|
||||
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
|
||||
return undefined;
|
||||
});
|
||||
globals.config.has = jest.fn().mockReturnValue(false);
|
||||
const mockUserEvents = [
|
||||
{
|
||||
source: 'test-source',
|
||||
host: 'test-host',
|
||||
subsystem: 'test-subsystem',
|
||||
counter: 3,
|
||||
timestamp: '2023-01-01T00:00:00.000Z',
|
||||
message: 'test message',
|
||||
appName: 'test-app',
|
||||
appId: 'test-app-id',
|
||||
executionId: 'test-exec',
|
||||
command: 'test-cmd',
|
||||
resultCode: '200',
|
||||
origin: 'test-origin',
|
||||
context: 'test-context',
|
||||
sessionId: 'test-session',
|
||||
rawEvent: 'test-raw',
|
||||
},
|
||||
];
|
||||
globals.udpEvents = {
|
||||
getLogEvents: jest.fn().mockResolvedValue([]),
|
||||
getUserEvents: jest.fn().mockResolvedValue(mockUserEvents),
|
||||
};
|
||||
globals.options = { instanceTag: 'test-instance' };
|
||||
// Mock v3 client write method
|
||||
globals.influx.write = jest.fn().mockResolvedValue(undefined);
|
||||
|
||||
// Execute
|
||||
await influxdb.storeEventCountInfluxDB();
|
||||
|
||||
// Verify
|
||||
expect(globals.influx.write).toHaveBeenCalled();
|
||||
expect(globals.logger.verbose).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
|
||||
)
|
||||
);
|
||||
});
|
||||
|
||||
test('should handle errors gracefully (InfluxDB v1)', async () => {
|
||||
// Setup
|
||||
globals.config.get = jest.fn((key) => {
|
||||
@@ -719,53 +617,6 @@ describe('post-to-influxdb', () => {
|
||||
|
||||
expect(globals.influxWriteApi[0].writeAPI.writePoints).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('should post health metrics to InfluxDB v3', async () => {
|
||||
globals.config.get = jest.fn((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 3;
|
||||
if (key === 'Butler-SOS.influxdbConfig.includeFields.activeDocs') return false;
|
||||
if (key === 'Butler-SOS.influxdbConfig.includeFields.loadedDocs') return false;
|
||||
if (key === 'Butler-SOS.influxdbConfig.includeFields.inMemoryDocs') return false;
|
||||
if (key === 'Butler-SOS.appNames.enableAppNameExtract') return false;
|
||||
if (key === 'Butler-SOS.influxdbConfig.v3Config.database') return 'test-database';
|
||||
return undefined;
|
||||
});
|
||||
// Mock v3 client write method
|
||||
const mockWrite = jest.fn().mockResolvedValue(undefined);
|
||||
globals.influxWriteApi = [
|
||||
{
|
||||
serverName: 'testserver',
|
||||
writeAPI: mockWrite,
|
||||
database: 'test-database',
|
||||
},
|
||||
];
|
||||
globals.influx = {
|
||||
write: mockWrite,
|
||||
};
|
||||
const serverName = 'testserver';
|
||||
const host = 'testhost';
|
||||
const serverTags = { host: 'testhost', server_name: 'testserver' };
|
||||
const healthBody = {
|
||||
version: '1.0.0',
|
||||
started: '20220801T121212.000Z',
|
||||
apps: {
|
||||
active_docs: [],
|
||||
loaded_docs: [],
|
||||
in_memory_docs: [],
|
||||
calls: 100,
|
||||
selections: 50,
|
||||
},
|
||||
cache: { added: 0, hits: 10, lookups: 15, replaced: 2, bytes_added: 1000 },
|
||||
cpu: { total: 25 },
|
||||
mem: { committed: 1000, allocated: 800, free: 200 },
|
||||
session: { active: 5, total: 10 },
|
||||
users: { active: 3, total: 8 },
|
||||
};
|
||||
|
||||
await influxdb.postHealthMetricsToInfluxdb(serverName, host, healthBody, serverTags);
|
||||
|
||||
expect(mockWrite).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('postProxySessionsToInfluxdb', () => {
|
||||
|
||||
@@ -59,27 +59,48 @@ describe('InfluxDB v3 Shared Utils', () => {
|
||||
});
|
||||
|
||||
describe('useRefactoredInfluxDb', () => {
|
||||
test('should return true when feature flag is enabled', () => {
|
||||
globals.config.get.mockReturnValue(true);
|
||||
test('should always return true for InfluxDB v3 (legacy code removed)', () => {
|
||||
globals.config.get.mockImplementation((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 3;
|
||||
if (key === 'Butler-SOS.influxdbConfig.useRefactoredCode') return false;
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const result = utils.useRefactoredInfluxDb();
|
||||
|
||||
expect(result).toBe(true);
|
||||
expect(globals.config.get).toHaveBeenCalledWith(
|
||||
'Butler-SOS.influxdbConfig.useRefactoredCode'
|
||||
);
|
||||
});
|
||||
|
||||
test('should return false when feature flag is disabled', () => {
|
||||
globals.config.get.mockReturnValue(false);
|
||||
test('should return true when feature flag is enabled for v1/v2', () => {
|
||||
globals.config.get.mockImplementation((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 1;
|
||||
if (key === 'Butler-SOS.influxdbConfig.useRefactoredCode') return true;
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const result = utils.useRefactoredInfluxDb();
|
||||
|
||||
expect(result).toBe(true);
|
||||
});
|
||||
|
||||
test('should return false when feature flag is disabled for v1/v2', () => {
|
||||
globals.config.get.mockImplementation((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 2;
|
||||
if (key === 'Butler-SOS.influxdbConfig.useRefactoredCode') return false;
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const result = utils.useRefactoredInfluxDb();
|
||||
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
test('should return false when feature flag is undefined', () => {
|
||||
globals.config.get.mockReturnValue(undefined);
|
||||
test('should return false when feature flag is undefined for v1/v2', () => {
|
||||
globals.config.get.mockImplementation((key) => {
|
||||
if (key === 'Butler-SOS.influxdbConfig.version') return 1;
|
||||
if (key === 'Butler-SOS.influxdbConfig.useRefactoredCode') return undefined;
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const result = utils.useRefactoredInfluxDb();
|
||||
|
||||
|
||||
@@ -159,10 +159,20 @@ export function getInfluxDbVersion() {
|
||||
/**
|
||||
* Checks if the refactored InfluxDB code path should be used.
|
||||
*
|
||||
* For v3: Always returns true (legacy code removed)
|
||||
* For v1/v2: Uses feature flag for gradual migration
|
||||
*
|
||||
* @returns {boolean} True if refactored code should be used
|
||||
*/
|
||||
export function useRefactoredInfluxDb() {
|
||||
// Feature flag to enable/disable refactored code path
|
||||
const version = getInfluxDbVersion();
|
||||
|
||||
// v3 always uses refactored code (legacy implementation removed)
|
||||
if (version === 3) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// v1/v2 use feature flag for gradual migration
|
||||
// Default to false for backward compatibility
|
||||
return globals.config.get('Butler-SOS.influxdbConfig.useRefactoredCode') === true;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { Point } from '@influxdata/influxdb-client';
|
||||
import { Point as Point3 } from '@influxdata/influxdb3-client';
|
||||
|
||||
import globals from '../globals.js';
|
||||
import { logError } from './log-error.js';
|
||||
@@ -548,141 +547,6 @@ export async function postHealthMetricsToInfluxdb(serverName, host, body, server
|
||||
`HEALTH METRICS: Error saving health data to InfluxDB v2! ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
// Only write to InfluxDB if the global influxWriteApi object has been initialized
|
||||
if (!globals.influxWriteApi) {
|
||||
globals.logger.warn(
|
||||
'HEALTH METRICS: Influxdb write API object not initialized. Data will not be sent to InfluxDB'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Find writeApi for the server specified by serverName
|
||||
const writeApi = globals.influxWriteApi.find(
|
||||
(element) => element.serverName === serverName
|
||||
);
|
||||
|
||||
// Ensure that the writeApi object was found
|
||||
if (!writeApi) {
|
||||
globals.logger.warn(
|
||||
`HEALTH METRICS: Influxdb write API object not found for host ${host}. Data will not be sent to InfluxDB`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get database from config
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
// Create a new point with the data to be written to InfluxDB v3
|
||||
const points = [
|
||||
new Point3('sense_server')
|
||||
.setStringField('version', body.version)
|
||||
.setStringField('started', body.started)
|
||||
.setStringField('uptime', formattedTime),
|
||||
|
||||
new Point3('mem')
|
||||
.setFloatField('comitted', body.mem.committed)
|
||||
.setFloatField('allocated', body.mem.allocated)
|
||||
.setFloatField('free', body.mem.free),
|
||||
|
||||
new Point3('apps')
|
||||
.setIntegerField('active_docs_count', body.apps.active_docs.length)
|
||||
.setIntegerField('loaded_docs_count', body.apps.loaded_docs.length)
|
||||
.setIntegerField('in_memory_docs_count', body.apps.in_memory_docs.length)
|
||||
.setStringField(
|
||||
'active_docs',
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
|
||||
? body.apps.active_docs
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'active_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
|
||||
? appNamesActive.toString()
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'active_session_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.activeDocs')
|
||||
? sessionAppNamesActive.toString()
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'loaded_docs',
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
|
||||
? body.apps.loaded_docs
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'loaded_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
|
||||
? appNamesLoaded.toString()
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'loaded_session_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.loadedDocs')
|
||||
? sessionAppNamesLoaded.toString()
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'in_memory_docs',
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
|
||||
? body.apps.in_memory_docs
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'in_memory_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
|
||||
? appNamesInMemory.toString()
|
||||
: ''
|
||||
)
|
||||
.setStringField(
|
||||
'in_memory_session_docs_names',
|
||||
globals.config.get('Butler-SOS.appNames.enableAppNameExtract') &&
|
||||
globals.config.get('Butler-SOS.influxdbConfig.includeFields.inMemoryDocs')
|
||||
? sessionAppNamesInMemory.toString()
|
||||
: ''
|
||||
)
|
||||
.setIntegerField('calls', body.apps.calls)
|
||||
.setIntegerField('selections', body.apps.selections),
|
||||
|
||||
new Point3('cpu').setIntegerField('total', body.cpu.total),
|
||||
|
||||
new Point3('session')
|
||||
.setIntegerField('active', body.session.active)
|
||||
.setIntegerField('total', body.session.total),
|
||||
|
||||
new Point3('users')
|
||||
.setIntegerField('active', body.users.active)
|
||||
.setIntegerField('total', body.users.total),
|
||||
|
||||
new Point3('cache')
|
||||
.setIntegerField('hits', body.cache.hits)
|
||||
.setIntegerField('lookups', body.cache.lookups)
|
||||
.setIntegerField('added', body.cache.added)
|
||||
.setIntegerField('replaced', body.cache.replaced)
|
||||
.setIntegerField('bytes_added', body.cache.bytes_added),
|
||||
|
||||
new Point3('saturated').setBooleanField('saturated', body.saturated),
|
||||
];
|
||||
|
||||
// Write to InfluxDB
|
||||
try {
|
||||
for (const point of points) {
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
}
|
||||
globals.logger.debug(`HEALTH METRICS: Wrote data to InfluxDB v3`);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`HEALTH METRICS: Error saving health data to InfluxDB v3! ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -777,56 +641,6 @@ export async function postProxySessionsToInfluxdb(userSessions) {
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
`PROXY SESSIONS: Sent user session data to InfluxDB for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"`
|
||||
);
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
// Only write to InfluxDB if the global influxWriteApi object has been initialized
|
||||
if (!globals.influxWriteApi) {
|
||||
globals.logger.warn(
|
||||
'PROXY SESSIONS: Influxdb write API object not initialized. Data will not be sent to InfluxDB'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Find writeApi for the specified server
|
||||
const writeApi = globals.influxWriteApi.find(
|
||||
(element) => element.serverName === userSessions.serverName
|
||||
);
|
||||
|
||||
// Ensure that the writeApi object was found
|
||||
if (!writeApi) {
|
||||
globals.logger.warn(
|
||||
`PROXY SESSIONS: Influxdb v3 write API object not found for host ${userSessions.host}. Data will not be sent to InfluxDB`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get database from config
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
// Create data points
|
||||
const point = new Point3('user_session_summary')
|
||||
.setIntegerField('session_count', userSessions.sessionCount)
|
||||
.setStringField('session_user_id_list', userSessions.uniqueUserList);
|
||||
|
||||
// Write to InfluxDB
|
||||
try {
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`PROXY SESSIONS: Wrote data to InfluxDB v3`);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`PROXY SESSIONS: Error saving user session data to InfluxDB v3! ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.debug(
|
||||
`PROXY SESSIONS: Session count for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"": ${userSessions.sessionCount}`
|
||||
);
|
||||
globals.logger.debug(
|
||||
`PROXY SESSIONS: User list for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"": ${userSessions.uniqueUserList}`
|
||||
);
|
||||
|
||||
globals.logger.verbose(
|
||||
`PROXY SESSIONS: Sent user session data to InfluxDB for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"`
|
||||
);
|
||||
@@ -963,45 +777,6 @@ export async function postButlerSOSMemoryUsageToInfluxdb(memory) {
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'MEMORY USAGE INFLUXDB: Sent Butler SOS memory usage data to InfluxDB'
|
||||
);
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
// Create new write API object
|
||||
// advanced write options
|
||||
const writeOptions = {
|
||||
/* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */
|
||||
flushInterval: 5000,
|
||||
|
||||
/* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */
|
||||
maxRetries: 2, // do not retry writes
|
||||
|
||||
// ... there are more write options that can be customized, see
|
||||
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and
|
||||
// https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html
|
||||
};
|
||||
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
// v3 uses client.write() directly, not getWriteApi()
|
||||
const point = new Point3('butlersos_memory_usage')
|
||||
.setTag('butler_sos_instance', memory.instanceTag)
|
||||
.setTag('version', butlerVersion)
|
||||
.setFloatField('heap_used', memory.heapUsedMByte)
|
||||
.setFloatField('heap_total', memory.heapTotalMByte)
|
||||
.setFloatField('external', memory.externalMemoryMByte)
|
||||
.setFloatField('process_memory', memory.processMemoryMByte);
|
||||
|
||||
try {
|
||||
// Convert point to line protocol and write directly
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`MEMORY USAGE INFLUXDB: Wrote data to InfluxDB v3`);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`MEMORY USAGE INFLUXDB: Error saving user session data to InfluxDB v3! ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'MEMORY USAGE INFLUXDB: Sent Butler SOS memory usage data to InfluxDB'
|
||||
);
|
||||
@@ -1212,39 +987,6 @@ export async function postUserEventToInfluxdb(msg) {
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'USER EVENT INFLUXDB: Sent Butler SOS user event data to InfluxDB'
|
||||
);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`USER EVENT INFLUXDB: Error getting write API: ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
const point = new Point3('log_event')
|
||||
.setTag('host', msg.host)
|
||||
.setTag('level', msg.level)
|
||||
.setTag('source', msg.source)
|
||||
.setTag('log_row', msg.log_row)
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
|
||||
.setStringField('message', msg.message)
|
||||
.setStringField('exception_message', msg.exception_message ? msg.exception_message : '')
|
||||
.setStringField('app_name', msg.appName ? msg.appName : '')
|
||||
.setStringField('app_id', msg.appId ? msg.appId : '')
|
||||
.setStringField('execution_id', msg.executionId ? msg.executionId : '')
|
||||
.setStringField('command', msg.command ? msg.command : '')
|
||||
.setStringField('result_code', msg.resultCode ? msg.resultCode : '')
|
||||
.setStringField('origin', msg.origin ? msg.origin : '')
|
||||
.setStringField('context', msg.context ? msg.context : '')
|
||||
.setStringField('session_id', msg.sessionId ? msg.sessionId : '')
|
||||
.setStringField('raw_event', msg.rawEvent ? msg.rawEvent : '');
|
||||
|
||||
try {
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`USER EVENT INFLUXDB: Wrote data to InfluxDB v3`);
|
||||
|
||||
globals.logger.verbose(
|
||||
'USER EVENT INFLUXDB: Sent Butler SOS user event data to InfluxDB'
|
||||
);
|
||||
@@ -1708,200 +1450,6 @@ export async function postLogEventToInfluxdb(msg) {
|
||||
);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'LOG EVENT INFLUXDB: Sent Butler SOS log event data to InfluxDB'
|
||||
);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`LOG EVENT INFLUXDB: Error getting write API: ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
if (
|
||||
msg.source === 'qseow-engine' ||
|
||||
msg.source === 'qseow-proxy' ||
|
||||
msg.source === 'qseow-scheduler' ||
|
||||
msg.source === 'qseow-repository' ||
|
||||
msg.source === 'qseow-qix-perf'
|
||||
) {
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
let point;
|
||||
|
||||
// Handle each message type with its specific fields
|
||||
if (msg.source === 'qseow-engine') {
|
||||
// Engine fields: message, exception_message, command, result_code, origin, context, session_id, raw_event
|
||||
point = new Point3('log_event')
|
||||
.setTag('host', msg.host)
|
||||
.setTag('level', msg.level)
|
||||
.setTag('source', msg.source)
|
||||
.setTag('log_row', msg.log_row)
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
|
||||
.setStringField('message', msg.message)
|
||||
.setStringField(
|
||||
'exception_message',
|
||||
msg.exception_message ? msg.exception_message : ''
|
||||
)
|
||||
.setStringField('command', msg.command ? msg.command : '')
|
||||
.setStringField('result_code', msg.result_code ? msg.result_code : '')
|
||||
.setStringField('origin', msg.origin ? msg.origin : '')
|
||||
.setStringField('context', msg.context ? msg.context : '')
|
||||
.setStringField('session_id', msg.session_id ? msg.session_id : '')
|
||||
.setStringField('raw_event', JSON.stringify(msg));
|
||||
|
||||
// Conditional tags
|
||||
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
|
||||
if (msg?.user_directory?.length > 0)
|
||||
point.setTag('user_directory', msg.user_directory);
|
||||
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
|
||||
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
|
||||
if (msg?.windows_user?.length > 0)
|
||||
point.setTag('windows_user', msg.windows_user);
|
||||
if (msg?.task_id?.length > 0) point.setTag('task_id', msg.task_id);
|
||||
if (msg?.task_name?.length > 0) point.setTag('task_name', msg.task_name);
|
||||
if (msg?.app_id?.length > 0) point.setTag('app_id', msg.app_id);
|
||||
if (msg?.app_name?.length > 0) point.setTag('app_name', msg.app_name);
|
||||
if (msg?.engine_exe_version?.length > 0)
|
||||
point.setTag('engine_exe_version', msg.engine_exe_version);
|
||||
} else if (msg.source === 'qseow-proxy') {
|
||||
// Proxy fields: message, exception_message, command, result_code, origin, context, raw_event (NO session_id)
|
||||
point = new Point3('log_event')
|
||||
.setTag('host', msg.host)
|
||||
.setTag('level', msg.level)
|
||||
.setTag('source', msg.source)
|
||||
.setTag('log_row', msg.log_row)
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
|
||||
.setStringField('message', msg.message)
|
||||
.setStringField(
|
||||
'exception_message',
|
||||
msg.exception_message ? msg.exception_message : ''
|
||||
)
|
||||
.setStringField('command', msg.command ? msg.command : '')
|
||||
.setStringField('result_code', msg.result_code ? msg.result_code : '')
|
||||
.setStringField('origin', msg.origin ? msg.origin : '')
|
||||
.setStringField('context', msg.context ? msg.context : '')
|
||||
.setStringField('raw_event', JSON.stringify(msg));
|
||||
|
||||
// Conditional tags
|
||||
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
|
||||
if (msg?.user_directory?.length > 0)
|
||||
point.setTag('user_directory', msg.user_directory);
|
||||
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
|
||||
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
|
||||
} else if (msg.source === 'qseow-scheduler') {
|
||||
// Scheduler fields: message, exception_message, app_name, app_id, execution_id, raw_event
|
||||
point = new Point3('log_event')
|
||||
.setTag('host', msg.host)
|
||||
.setTag('level', msg.level)
|
||||
.setTag('source', msg.source)
|
||||
.setTag('log_row', msg.log_row)
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
|
||||
.setStringField('message', msg.message)
|
||||
.setStringField(
|
||||
'exception_message',
|
||||
msg.exception_message ? msg.exception_message : ''
|
||||
)
|
||||
.setStringField('app_name', msg.app_name ? msg.app_name : '')
|
||||
.setStringField('app_id', msg.app_id ? msg.app_id : '')
|
||||
.setStringField('execution_id', msg.execution_id ? msg.execution_id : '')
|
||||
.setStringField('raw_event', JSON.stringify(msg));
|
||||
|
||||
// Conditional tags
|
||||
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
|
||||
if (msg?.user_directory?.length > 0)
|
||||
point.setTag('user_directory', msg.user_directory);
|
||||
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
|
||||
if (msg?.task_id?.length > 0) point.setTag('task_id', msg.task_id);
|
||||
if (msg?.task_name?.length > 0) point.setTag('task_name', msg.task_name);
|
||||
} else if (msg.source === 'qseow-repository') {
|
||||
// Repository fields: message, exception_message, command, result_code, origin, context, raw_event
|
||||
point = new Point3('log_event')
|
||||
.setTag('host', msg.host)
|
||||
.setTag('level', msg.level)
|
||||
.setTag('source', msg.source)
|
||||
.setTag('log_row', msg.log_row)
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : 'n/a')
|
||||
.setStringField('message', msg.message)
|
||||
.setStringField(
|
||||
'exception_message',
|
||||
msg.exception_message ? msg.exception_message : ''
|
||||
)
|
||||
.setStringField('command', msg.command ? msg.command : '')
|
||||
.setStringField('result_code', msg.result_code ? msg.result_code : '')
|
||||
.setStringField('origin', msg.origin ? msg.origin : '')
|
||||
.setStringField('context', msg.context ? msg.context : '')
|
||||
.setStringField('raw_event', JSON.stringify(msg));
|
||||
|
||||
// Conditional tags
|
||||
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
|
||||
if (msg?.user_directory?.length > 0)
|
||||
point.setTag('user_directory', msg.user_directory);
|
||||
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
|
||||
if (msg?.result_code?.length > 0) point.setTag('result_code', msg.result_code);
|
||||
} else if (msg.source === 'qseow-qix-perf') {
|
||||
// QIX Performance fields: app_id, process_time, work_time, lock_time, validate_time, traverse_time, handle, net_ram, peak_ram, raw_event
|
||||
point = new Point3('log_event')
|
||||
.setTag('host', msg.host ? msg.host : '<Unknown>')
|
||||
.setTag('level', msg.level ? msg.level : '<Unknown>')
|
||||
.setTag('source', msg.source ? msg.source : '<Unknown>')
|
||||
.setTag('log_row', msg.log_row ? msg.log_row : '-1')
|
||||
.setTag('subsystem', msg.subsystem ? msg.subsystem : '<Unknown>')
|
||||
.setTag('method', msg.method ? msg.method : '<Unknown>')
|
||||
.setTag('object_type', msg.object_type ? msg.object_type : '<Unknown>')
|
||||
.setTag(
|
||||
'proxy_session_id',
|
||||
msg.proxy_session_id ? msg.proxy_session_id : '-1'
|
||||
)
|
||||
.setTag('session_id', msg.session_id ? msg.session_id : '-1')
|
||||
.setTag(
|
||||
'event_activity_source',
|
||||
msg.event_activity_source ? msg.event_activity_source : '<Unknown>'
|
||||
)
|
||||
.setStringField('app_id', msg.app_id ? msg.app_id : '')
|
||||
.setFloatField('process_time', msg.process_time)
|
||||
.setFloatField('work_time', msg.work_time)
|
||||
.setFloatField('lock_time', msg.lock_time)
|
||||
.setFloatField('validate_time', msg.validate_time)
|
||||
.setFloatField('traverse_time', msg.traverse_time)
|
||||
.setIntegerField('handle', msg.handle)
|
||||
.setIntegerField('net_ram', msg.net_ram)
|
||||
.setIntegerField('peak_ram', msg.peak_ram)
|
||||
.setStringField('raw_event', JSON.stringify(msg));
|
||||
|
||||
// Conditional tags
|
||||
if (msg?.user_full?.length > 0) point.setTag('user_full', msg.user_full);
|
||||
if (msg?.user_directory?.length > 0)
|
||||
point.setTag('user_directory', msg.user_directory);
|
||||
if (msg?.user_id?.length > 0) point.setTag('user_id', msg.user_id);
|
||||
if (msg?.app_id?.length > 0) point.setTag('app_id', msg.app_id);
|
||||
if (msg?.app_name?.length > 0) point.setTag('app_name', msg.app_name);
|
||||
if (msg?.object_id?.length > 0) point.setTag('object_id', msg.object_id);
|
||||
}
|
||||
|
||||
// Add log event categories to tags if available
|
||||
if (msg?.category?.length > 0) {
|
||||
msg.category.forEach((category) => {
|
||||
point.setTag(category.name, category.value);
|
||||
});
|
||||
}
|
||||
|
||||
// Add custom tags from config file
|
||||
if (
|
||||
globals.config.has('Butler-SOS.logEvents.tags') &&
|
||||
globals.config.get('Butler-SOS.logEvents.tags') !== null &&
|
||||
globals.config.get('Butler-SOS.logEvents.tags').length > 0
|
||||
) {
|
||||
const configTags = globals.config.get('Butler-SOS.logEvents.tags');
|
||||
for (const item of configTags) {
|
||||
point.setTag(item.name, item.value);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`);
|
||||
|
||||
globals.logger.verbose(
|
||||
'LOG EVENT INFLUXDB: Sent Butler SOS log event data to InfluxDB'
|
||||
);
|
||||
@@ -2147,108 +1695,6 @@ export async function storeEventCountInfluxDB() {
|
||||
return;
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
|
||||
);
|
||||
} catch (err) {
|
||||
logError('EVENT COUNT INFLUXDB: Error getting write API', err);
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
try {
|
||||
// Store data for each log event
|
||||
for (const logEvent of logEvents) {
|
||||
const tags = {
|
||||
butler_sos_instance: globals.options.instanceTag,
|
||||
event_type: 'log',
|
||||
source: logEvent.source,
|
||||
host: logEvent.host,
|
||||
subsystem: logEvent.subsystem,
|
||||
};
|
||||
|
||||
// Add static tags defined in config file, if any
|
||||
if (
|
||||
globals.config.has('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags') &&
|
||||
Array.isArray(
|
||||
globals.config.get('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags')
|
||||
)
|
||||
) {
|
||||
const configTags = globals.config.get(
|
||||
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags'
|
||||
);
|
||||
|
||||
configTags.forEach((tag) => {
|
||||
tags[tag.name] = tag.value;
|
||||
});
|
||||
}
|
||||
|
||||
const point = new Point3(
|
||||
globals.config.get(
|
||||
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName'
|
||||
)
|
||||
)
|
||||
.setTag('event_type', 'log')
|
||||
.setTag('source', logEvent.source)
|
||||
.setTag('host', logEvent.host)
|
||||
.setTag('subsystem', logEvent.subsystem)
|
||||
.setIntegerField('counter', logEvent.counter);
|
||||
|
||||
// Add tags to point
|
||||
Object.keys(tags).forEach((key) => {
|
||||
point.setTag(key, tags[key]);
|
||||
});
|
||||
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`EVENT COUNT INFLUXDB: Wrote data to InfluxDB v3`);
|
||||
}
|
||||
|
||||
// Loop through data in user events and create datapoints.
|
||||
for (const event of userEvents) {
|
||||
const tags = {
|
||||
butler_sos_instance: globals.options.instanceTag,
|
||||
event_type: 'user',
|
||||
source: event.source,
|
||||
host: event.host,
|
||||
subsystem: event.subsystem,
|
||||
};
|
||||
|
||||
// Add static tags defined in config file, if any
|
||||
if (
|
||||
globals.config.has('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags') &&
|
||||
Array.isArray(
|
||||
globals.config.get('Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags')
|
||||
)
|
||||
) {
|
||||
const configTags = globals.config.get(
|
||||
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.tags'
|
||||
);
|
||||
|
||||
configTags.forEach((tag) => {
|
||||
tags[tag.name] = tag.value;
|
||||
});
|
||||
}
|
||||
|
||||
const point = new Point3(
|
||||
globals.config.get(
|
||||
'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName'
|
||||
)
|
||||
)
|
||||
.setTag('event_type', 'user')
|
||||
.setTag('source', event.source)
|
||||
.setTag('host', event.host)
|
||||
.setTag('subsystem', event.subsystem)
|
||||
.setIntegerField('counter', event.counter);
|
||||
|
||||
// Add tags to point
|
||||
Object.keys(tags).forEach((key) => {
|
||||
point.setTag(key, tags[key]);
|
||||
});
|
||||
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
globals.logger.debug(`EVENT COUNT INFLUXDB: Wrote user event data to InfluxDB v3`);
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB'
|
||||
);
|
||||
@@ -2491,85 +1937,6 @@ export async function storeRejectedEventCountInfluxDB() {
|
||||
return;
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'REJECT LOG EVENT INFLUXDB: Sent Butler SOS rejected event count data to InfluxDB'
|
||||
);
|
||||
} catch (err) {
|
||||
logError('REJECTED LOG EVENT INFLUXDB: Error getting write API', err);
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
try {
|
||||
const points = [];
|
||||
const measurementName = globals.config.get(
|
||||
'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.measurementName'
|
||||
);
|
||||
|
||||
rejectedLogEvents.forEach((event) => {
|
||||
globals.logger.debug(`REJECTED LOG EVENT INFLUXDB 3: ${JSON.stringify(event)}`);
|
||||
|
||||
if (event.source === 'qseow-qix-perf') {
|
||||
let point = new Point3(measurementName)
|
||||
.setTag('source', event.source)
|
||||
.setTag('object_type', event.objectType)
|
||||
.setTag('method', event.method)
|
||||
.setIntegerField('counter', event.counter)
|
||||
.setFloatField('process_time', event.processTime);
|
||||
|
||||
// Add app_id and app_name if available
|
||||
if (event?.appId) {
|
||||
point.setTag('app_id', event.appId);
|
||||
}
|
||||
if (event?.appName?.length > 0) {
|
||||
point.setTag('app_name', event.appName);
|
||||
point.setTag('app_name_set', 'true');
|
||||
} else {
|
||||
point.setTag('app_name_set', 'false');
|
||||
}
|
||||
|
||||
// Add static tags defined in config file, if any
|
||||
if (
|
||||
globals.config.has(
|
||||
'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags'
|
||||
) &&
|
||||
Array.isArray(
|
||||
globals.config.get(
|
||||
'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags'
|
||||
)
|
||||
)
|
||||
) {
|
||||
const configTags = globals.config.get(
|
||||
'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags'
|
||||
);
|
||||
for (const item of configTags) {
|
||||
point.setTag(item.name, item.value);
|
||||
}
|
||||
}
|
||||
|
||||
points.push(point);
|
||||
} else {
|
||||
let point = new Point3(measurementName)
|
||||
.setTag('source', event.source)
|
||||
.setIntegerField('counter', event.counter);
|
||||
|
||||
points.push(point);
|
||||
}
|
||||
});
|
||||
|
||||
// Write to InfluxDB
|
||||
try {
|
||||
for (const point of points) {
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
}
|
||||
globals.logger.debug(`REJECT LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`REJECTED LOG EVENT INFLUXDB: Error saving data to InfluxDB v3! ${err}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
globals.logger.verbose(
|
||||
'REJECT LOG EVENT INFLUXDB: Sent Butler SOS rejected event count data to InfluxDB'
|
||||
);
|
||||
@@ -2723,56 +2090,6 @@ export async function postUserEventQueueMetricsToInfluxdb() {
|
||||
);
|
||||
return;
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
// InfluxDB 3.x
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
try {
|
||||
const point = new Point3(measurementName)
|
||||
.setTag('queue_type', 'user_events')
|
||||
.setTag('host', globals.hostInfo.hostname)
|
||||
.setIntegerField('queue_size', metrics.queueSize)
|
||||
.setIntegerField('queue_max_size', metrics.queueMaxSize)
|
||||
.setFloatField('queue_utilization_pct', metrics.queueUtilizationPct)
|
||||
.setIntegerField('queue_pending', metrics.queuePending)
|
||||
.setIntegerField('messages_received', metrics.messagesReceived)
|
||||
.setIntegerField('messages_queued', metrics.messagesQueued)
|
||||
.setIntegerField('messages_processed', metrics.messagesProcessed)
|
||||
.setIntegerField('messages_failed', metrics.messagesFailed)
|
||||
.setIntegerField('messages_dropped_total', metrics.messagesDroppedTotal)
|
||||
.setIntegerField(
|
||||
'messages_dropped_rate_limit',
|
||||
metrics.messagesDroppedRateLimit
|
||||
)
|
||||
.setIntegerField(
|
||||
'messages_dropped_queue_full',
|
||||
metrics.messagesDroppedQueueFull
|
||||
)
|
||||
.setIntegerField('messages_dropped_size', metrics.messagesDroppedSize)
|
||||
.setFloatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
|
||||
.setFloatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
|
||||
.setFloatField('processing_time_max_ms', metrics.processingTimeMaxMs)
|
||||
.setIntegerField('rate_limit_current', metrics.rateLimitCurrent)
|
||||
.setIntegerField('backpressure_active', metrics.backpressureActive);
|
||||
|
||||
// Add static tags from config file
|
||||
if (configTags && configTags.length > 0) {
|
||||
for (const item of configTags) {
|
||||
point.setTag(item.name, item.value);
|
||||
}
|
||||
}
|
||||
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
|
||||
globals.logger.verbose(
|
||||
'USER EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v3'
|
||||
);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`USER EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v3! ${err}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear metrics after writing
|
||||
@@ -2926,56 +2243,6 @@ export async function postLogEventQueueMetricsToInfluxdb() {
|
||||
);
|
||||
return;
|
||||
}
|
||||
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) {
|
||||
// InfluxDB 3.x
|
||||
const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database');
|
||||
|
||||
try {
|
||||
const point = new Point3(measurementName)
|
||||
.setTag('queue_type', 'log_events')
|
||||
.setTag('host', globals.hostInfo.hostname)
|
||||
.setIntegerField('queue_size', metrics.queueSize)
|
||||
.setIntegerField('queue_max_size', metrics.queueMaxSize)
|
||||
.setFloatField('queue_utilization_pct', metrics.queueUtilizationPct)
|
||||
.setIntegerField('queue_pending', metrics.queuePending)
|
||||
.setIntegerField('messages_received', metrics.messagesReceived)
|
||||
.setIntegerField('messages_queued', metrics.messagesQueued)
|
||||
.setIntegerField('messages_processed', metrics.messagesProcessed)
|
||||
.setIntegerField('messages_failed', metrics.messagesFailed)
|
||||
.setIntegerField('messages_dropped_total', metrics.messagesDroppedTotal)
|
||||
.setIntegerField(
|
||||
'messages_dropped_rate_limit',
|
||||
metrics.messagesDroppedRateLimit
|
||||
)
|
||||
.setIntegerField(
|
||||
'messages_dropped_queue_full',
|
||||
metrics.messagesDroppedQueueFull
|
||||
)
|
||||
.setIntegerField('messages_dropped_size', metrics.messagesDroppedSize)
|
||||
.setFloatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
|
||||
.setFloatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
|
||||
.setFloatField('processing_time_max_ms', metrics.processingTimeMaxMs)
|
||||
.setIntegerField('rate_limit_current', metrics.rateLimitCurrent)
|
||||
.setIntegerField('backpressure_active', metrics.backpressureActive);
|
||||
|
||||
// Add static tags from config file
|
||||
if (configTags && configTags.length > 0) {
|
||||
for (const item of configTags) {
|
||||
point.setTag(item.name, item.value);
|
||||
}
|
||||
}
|
||||
|
||||
await globals.influx.write(point.toLineProtocol(), database);
|
||||
|
||||
globals.logger.verbose(
|
||||
'LOG EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v3'
|
||||
);
|
||||
} catch (err) {
|
||||
globals.logger.error(
|
||||
`LOG EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v3! ${err}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear metrics after writing
|
||||
|
||||
Reference in New Issue
Block a user