diff --git a/src/lib/post-to-influxdb.js b/src/lib/post-to-influxdb.js index 4d52b9e..36b11c5 100755 --- a/src/lib/post-to-influxdb.js +++ b/src/lib/post-to-influxdb.js @@ -747,6 +747,55 @@ export async function postProxySessionsToInfluxdb(userSessions) { ); } + globals.logger.verbose( + `PROXY SESSIONS: Sent user session data to InfluxDB for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"` + ); + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + // Only write to InfluxDB if the global influxWriteApi object has been initialized + if (!globals.influxWriteApi) { + globals.logger.warn( + 'PROXY SESSIONS: Influxdb write API object not initialized. Data will not be sent to InfluxDB' + ); + return; + } + + // Find writeApi for the specified server + const writeApi = globals.influxWriteApi.find( + (element) => element.serverName === userSessions.serverName + ); + + // Ensure that the writeApi object was found + if (!writeApi) { + globals.logger.warn( + `PROXY SESSIONS: Influxdb v3 write API object not found for host ${userSessions.host}. Data will not be sent to InfluxDB` + ); + return; + } + + // Create data points + const points = [ + new Point('user_session_summary') + .intField('session_count', userSessions.sessionCount) + .stringField('session_user_id_list', userSessions.uniqueUserList), + ]; + + // Write to InfluxDB + try { + const res = await writeApi.writeAPI.writePoints(points); + globals.logger.debug(`PROXY SESSIONS: Wrote data to InfluxDB v3`); + } catch (err) { + globals.logger.error( + `PROXY SESSIONS: Error saving user session data to InfluxDB v3! ${globals.getErrorMessage(err)}` + ); + } + + globals.logger.debug( + `PROXY SESSIONS: Session count for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"": ${userSessions.sessionCount}` + ); + globals.logger.debug( + `PROXY SESSIONS: User list for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"": ${userSessions.uniqueUserList}` + ); + globals.logger.verbose( `PROXY SESSIONS: Sent user session data to InfluxDB for server "${userSessions.host}", virtual proxy "${userSessions.virtualProxy}"` ); @@ -883,6 +932,46 @@ export async function postButlerSOSMemoryUsageToInfluxdb(memory) { ); } + globals.logger.verbose( + 'MEMORY USAGE INFLUXDB: Sent Butler SOS memory usage data to InfluxDB' + ); + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + // Create new write API object + // advanced write options + const writeOptions = { + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 5000, + + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 2, // do not retry writes + + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html + }; + + const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org'); + const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database'); + + const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions); + + const point = new Point('butlersos_memory_usage') + .tag('butler_sos_instance', memory.instanceTag) + .tag('version', butlerVersion) + .floatField('heap_used', memory.heapUsedMByte) + .floatField('heap_total', memory.heapTotalMByte) + .floatField('external', memory.externalMemoryMByte) + .floatField('process_memory', memory.processMemoryMByte); + + try { + const res = await writeApi.writePoint(point); + globals.logger.debug(`MEMORY USAGE INFLUXDB: Wrote data to InfluxDB v3`); + } catch (err) { + globals.logger.error( + `MEMORY USAGE INFLUXDB: Error saving user session data to InfluxDB v3! ${globals.getErrorMessage(err)}` + ); + } + globals.logger.verbose( 'MEMORY USAGE INFLUXDB: Sent Butler SOS memory usage data to InfluxDB' ); @@ -1093,6 +1182,56 @@ export async function postUserEventToInfluxdb(msg) { ); } + globals.logger.verbose( + 'USER EVENT INFLUXDB: Sent Butler SOS user event data to InfluxDB' + ); + } catch (err) { + globals.logger.error( + `USER EVENT INFLUXDB: Error getting write API: ${globals.getErrorMessage(err)}` + ); + } + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + // Create new write API object + // Advanced write options + const writeOptions = { + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 5000, + + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 2, // do not retry writes + + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html + }; + + const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org'); + const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database'); + + const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions); + + const point = new Point('log_event') + .tag('host', msg.host) + .tag('level', msg.level) + .tag('source', msg.source) + .tag('log_row', msg.log_row) + .tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a') + .stringField('message', msg.message) + .stringField('exception_message', msg.exception_message ? msg.exception_message : '') + .stringField('app_name', msg.appName ? msg.appName : '') + .stringField('app_id', msg.appId ? msg.appId : '') + .stringField('execution_id', msg.executionId ? msg.executionId : '') + .stringField('command', msg.command ? msg.command : '') + .stringField('result_code', msg.resultCode ? msg.resultCode : '') + .stringField('origin', msg.origin ? msg.origin : '') + .stringField('context', msg.context ? msg.context : '') + .stringField('session_id', msg.sessionId ? msg.sessionId : '') + .stringField('raw_event', msg.rawEvent ? msg.rawEvent : ''); + + try { + const res = await writeApi.writePoint(point); + globals.logger.debug(`USER EVENT INFLUXDB: Wrote data to InfluxDB v3`); + globals.logger.verbose( 'USER EVENT INFLUXDB: Sent Butler SOS user event data to InfluxDB' ); @@ -1556,6 +1695,153 @@ export async function postLogEventToInfluxdb(msg) { ); } + globals.logger.verbose( + 'LOG EVENT INFLUXDB: Sent Butler SOS log event data to InfluxDB' + ); + } catch (err) { + globals.logger.error( + `LOG EVENT INFLUXDB: Error getting write API: ${globals.getErrorMessage(err)}` + ); + } + } + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + if ( + msg.source === 'qseow-engine' || + msg.source === 'qseow-proxy' || + msg.source === 'qseow-scheduler' || + msg.source === 'qseow-repository' || + msg.source === 'qseow-qix-perf' + ) { + // Create new write API object + // Advanced write options + const writeOptions = { + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 5000, + + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 2, // do not retry writes + + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html + }; + + const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org'); + const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database'); + + const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions); + + const logLevel = 'log_level'; + + const logLevelValue = msg.level; + + // Determine what tags to use for the log event point + // Tags are are part of the data model that will be used in this call. + // Tags are what make for efficient queries in InfluxDB + let point; + + // Does the message have QIX perf data in the message field? + // I.e. is this a log event with performance data from QIX engine? + if ( + msg.source === 'qseow-qix-perf' && + msg.message.split(' ').length >= 22 && + msg.message.split(' ')[7] !== '(null)' + ) { + const parts = msg.message.split(' '); + const objectType = parts[5]; + const method = parts[6]; + const appId = parts[7]; + + if (isNaN(parts[9]) || isNaN(parts[11]) || isNaN(parts[13])) { + // One or more of the performance metric is not a number, this is not a valid QIX perf log event + globals.logger.debug( + `LOG EVENT INFLUXDB v3: Performance metrics not valid: ${parts[9]}, ${parts[11]}, ${parts[13]}` + ); + + point = new Point('log_event') + .tag('host', msg.host) + .tag('level', msg.level) + .tag('source', msg.source) + .tag('log_row', msg.log_row) + .tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a') + .stringField('message', msg.message) + .stringField( + 'exception_message', + msg.exception_message ? msg.exception_message : '' + ) + .stringField('app_name', msg.appName ? msg.appName : '') + .stringField('app_id', msg.appId ? msg.appId : '') + .stringField('execution_id', msg.executionId ? msg.executionId : '') + .stringField('command', msg.command ? msg.command : '') + .stringField('result_code', msg.resultCode ? msg.resultCode : '') + .stringField('origin', msg.origin ? msg.origin : '') + .stringField('context', msg.context ? msg.context : '') + .stringField('session_id', msg.sessionId ? msg.sessionId : '') + .stringField('raw_event', msg.rawEvent ? msg.rawEvent : ''); + } else { + // We have a valid QIX performance log event + + point = new Point('log_event') + .tag('host', msg.host) + .tag('level', msg.level) + .tag('source', msg.source) + .tag('log_row', msg.log_row) + .tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a') + .tag('object_type', objectType) + .tag('method', method) + .stringField('message', msg.message) + .stringField( + 'exception_message', + msg.exception_message ? msg.exception_message : '' + ) + .stringField('app_name', msg.appName ? msg.appName : '') + .stringField('app_id', appId) + .stringField('execution_id', msg.executionId ? msg.executionId : '') + .stringField('command', msg.command ? msg.command : '') + .stringField('result_code', msg.resultCode ? msg.resultCode : '') + .stringField('origin', msg.origin ? msg.origin : '') + .stringField('context', msg.context ? msg.context : '') + .stringField('session_id', msg.sessionId ? msg.sessionId : '') + .stringField('raw_event', msg.rawEvent ? msg.rawEvent : '') + + // engine performance fields + .floatField('process_time', parseFloat(parts[9])) + .floatField('work_time', parseFloat(parts[11])) + .floatField('lock_time', parseFloat(parts[13])) + .floatField('validate_time', parseFloat(parts[15])) + .floatField('traverse_time', parseFloat(parts[17])) + .intField('handle', parseInt(parts[19], 10)) + .intField('net_ram', parseInt(parts[20], 10)) + .intField('peak_ram', parseInt(parts[21], 10)); + } + } else { + // No QIX perf data, use standard log event format + point = new Point('log_event') + .tag('host', msg.host) + .tag('level', msg.level) + .tag('source', msg.source) + .tag('log_row', msg.log_row) + .tag('subsystem', msg.subsystem ? msg.subsystem : 'n/a') + .stringField('message', msg.message) + .stringField( + 'exception_message', + msg.exception_message ? msg.exception_message : '' + ) + .stringField('app_name', msg.appName ? msg.appName : '') + .stringField('app_id', msg.appId ? msg.appId : '') + .stringField('execution_id', msg.executionId ? msg.executionId : '') + .stringField('command', msg.command ? msg.command : '') + .stringField('result_code', msg.resultCode ? msg.resultCode : '') + .stringField('origin', msg.origin ? msg.origin : '') + .stringField('context', msg.context ? msg.context : '') + .stringField('session_id', msg.sessionId ? msg.sessionId : '') + .stringField('raw_event', msg.rawEvent ? msg.rawEvent : ''); + } + + try { + const res = await writeApi.writePoint(point); + globals.logger.debug(`LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`); + globals.logger.verbose( 'LOG EVENT INFLUXDB: Sent Butler SOS log event data to InfluxDB' ); @@ -1801,6 +2087,99 @@ export async function storeEventCountInfluxDB() { return; } + globals.logger.verbose( + 'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB' + ); + } catch (err) { + globals.logger.error(`EVENT COUNT INFLUXDB: Error getting write API: ${err}`); + } + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + // Create new write API object + // advanced write options + const writeOptions = { + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 5000, + + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 2, // do not retry writes + + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html + }; + + const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org'); + const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database'); + + const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions); + + try { + // Store data for each log event + + for (const logEvent of logEvents) { + const tags = { + butler_sos_instance: globals.options.instanceTag, + }; + + // Add static tags defined in config file, if any + // Add the static tag to the data structure sent to InfluxDB + // Is the array present in the config file? + if ( + globals.config.has( + 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag' + ) && + Array.isArray( + globals.config.get( + 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag' + ) + ) + ) { + // Yes, the config tag array exists + const configTags = globals.config.get( + 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.staticTag' + ); + + configTags.forEach((tag) => { + tags[tag.name] = tag.value; + }); + } + + // Add timestamp from when the event was received by Butler SOS + const point = new Point( + globals.config.get( + 'Butler-SOS.qlikSenseEvents.eventCount.influxdb.measurementName' + ) + ) + .tag('host', logEvent.host) + .tag('level', logEvent.level) + .tag('source', logEvent.source) + .tag('log_row', logEvent.log_row) + .tag('subsystem', logEvent.subsystem ? logEvent.subsystem : 'n/a') + .stringField('message', logEvent.message) + .stringField( + 'exception_message', + logEvent.exception_message ? logEvent.exception_message : '' + ) + .stringField('app_name', logEvent.appName ? logEvent.appName : '') + .stringField('app_id', logEvent.appId ? logEvent.appId : '') + .stringField('execution_id', logEvent.executionId ? logEvent.executionId : '') + .stringField('command', logEvent.command ? logEvent.command : '') + .stringField('result_code', logEvent.resultCode ? logEvent.resultCode : '') + .stringField('origin', logEvent.origin ? logEvent.origin : '') + .stringField('context', logEvent.context ? logEvent.context : '') + .stringField('session_id', logEvent.sessionId ? logEvent.sessionId : '') + .stringField('raw_event', logEvent.rawEvent ? logEvent.rawEvent : '') + .timestamp(new Date(logEvent.timestamp)); + + // Add tags to point + Object.keys(tags).forEach((key) => { + point.tag(key, tags[key]); + }); + + const res = await writeApi.writePoint(point); + globals.logger.debug(`EVENT COUNT INFLUXDB: Wrote data to InfluxDB v3`); + } + globals.logger.verbose( 'EVENT COUNT INFLUXDB: Sent Butler SOS event count data to InfluxDB' ); @@ -2043,6 +2422,121 @@ export async function storeRejectedEventCountInfluxDB() { return; } + globals.logger.verbose( + 'REJECT LOG EVENT INFLUXDB: Sent Butler SOS rejected event count data to InfluxDB' + ); + } catch (err) { + globals.logger.error(`REJECTED LOG EVENT INFLUXDB: Error getting write API: ${err}`); + } + } else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 3) { + // Create new write API object + // advanced write options + const writeOptions = { + /* maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush */ + flushInterval: 5000, + + /* the count of internally-scheduled retries upon write failure, the delays between write attempts follow an exponential backoff strategy if there is no Retry-After HTTP header */ + maxRetries: 2, // do not retry writes + + // ... there are more write options that can be customized, see + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeoptions.html and + // https://influxdata.github.io/influxdb-client-js/influxdb-client.writeretryoptions.html + }; + + const org = globals.config.get('Butler-SOS.influxdbConfig.v3Config.org'); + const database = globals.config.get('Butler-SOS.influxdbConfig.v3Config.database'); + + const writeApi = globals.influx.getWriteApi(org, database, 'ns', writeOptions); + + try { + const points = []; + const measurementName = globals.config.get( + 'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.measurementName' + ); + + rejectedLogEvents.forEach((event) => { + globals.logger.debug(`REJECTED LOG EVENT INFLUXDB 3: ${JSON.stringify(event)}`); + + if ( + event.source === 'qseow-qix-perf' && + event.message.split(' ').length >= 22 && + event.message.split(' ')[7] !== '(null)' + ) { + const parts = event.message.split(' '); + const objectType = parts[5]; + const method = parts[6]; + + let point = new Point(measurementName) + .tag('source', event.source) + .tag('object_type', objectType) + .tag('method', method) + .tag('level', event.level) + .tag('log_row', event.log_row) + .stringField('message', event.message) + .intField('count', 1); + + // Add static tags defined in config file, if any + if ( + globals.config.has( + 'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags' + ) && + Array.isArray( + globals.config.get( + 'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags' + ) + ) + ) { + const configTags = globals.config.get( + 'Butler-SOS.logEvents.enginePerformanceMonitor.trackRejectedEvents.tags' + ); + for (const item of configTags) { + point.tag(item.name, item.value); + } + } + + points.push(point); + } else { + let point = new Point(measurementName) + .tag('source', event.source) + .tag('level', event.level) + .tag('log_row', event.log_row) + .stringField('message', event.message) + .intField('count', 1); + + // Add static tags defined in config file, if any + if ( + globals.config.has( + 'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag' + ) && + Array.isArray( + globals.config.get( + 'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag' + ) + ) + ) { + const configTags = globals.config.get( + 'Butler-SOS.qlikSenseEvents.rejectedEventCount.influxdb.staticTag' + ); + for (const item of configTags) { + point.tag(item.name, item.value); + } + } + + points.push(point); + } + }); + + // Write to InfluxDB + try { + const res = await writeApi.writePoints(points); + globals.logger.debug(`REJECT LOG EVENT INFLUXDB: Wrote data to InfluxDB v3`); + } catch (err) { + globals.logger.error( + `REJECTED LOG EVENT INFLUXDB: Error saving data to InfluxDB v3! ${err}` + ); + return; + } + globals.logger.verbose( 'REJECT LOG EVENT INFLUXDB: Sent Butler SOS rejected event count data to InfluxDB' );