Add UDP message queue infrastructure with rate limiting and backpressure

Co-authored-by: mountaindude <1029262+mountaindude@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-10-07 13:16:58 +00:00
parent 1cc6dac44f
commit 9cd6be244e
9 changed files with 574 additions and 4 deletions

35
package-lock.json generated
View File

@@ -31,6 +31,7 @@
"lodash.clonedeep": "^4.5.0",
"luxon": "^3.7.2",
"mqtt": "^5.14.1",
"p-queue": "^8.1.1",
"posthog-node": "^5.9.1",
"prom-client": "^15.1.3",
"qrs-interact": "^6.3.1",
@@ -4853,6 +4854,12 @@
"node": ">=6"
}
},
"node_modules/eventemitter3": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz",
"integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==",
"license": "MIT"
},
"node_modules/events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
@@ -8059,6 +8066,34 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-queue": {
"version": "8.1.1",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-8.1.1.tgz",
"integrity": "sha512-aNZ+VfjobsWryoiPnEApGGmf5WmNsCo9xu8dfaYamG5qaLP7ClhLN6NgsFe6SwJ2UbLEBK5dv9x8Mn5+RVhMWQ==",
"license": "MIT",
"dependencies": {
"eventemitter3": "^5.0.1",
"p-timeout": "^6.1.2"
},
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-timeout": {
"version": "6.1.4",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.4.tgz",
"integrity": "sha512-MyIV3ZA/PmyBN/ud8vV9XzwTrNtR4jFrObymZYnZqMmW0zA8Z17vnT0rBgFE/TlohB+YCHqXMgZzb3Csp49vqg==",
"license": "MIT",
"engines": {
"node": ">=14.16"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-try": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz",

View File

@@ -68,6 +68,7 @@
"lodash.clonedeep": "^4.5.0",
"luxon": "^3.7.2",
"mqtt": "^5.14.1",
"p-queue": "^8.1.1",
"posthog-node": "^5.9.1",
"prom-client": "^15.1.3",
"qrs-interact": "^6.3.1",

View File

@@ -120,6 +120,17 @@ Butler-SOS:
udpServerConfig:
serverHost: <IP or FQDN> # Host/IP where user event server will listen for events from Sense
portUserActivityEvents: 9997 # Port on which user event server will listen for events from Sense
messageQueue:
maxConcurrent: 5 # Max messages processing simultaneously
maxSize: 100 # Max queued messages before dropping
dropStrategy: oldest # Drop 'oldest' or 'newest' messages when queue is full
rateLimit:
enable: true # Enable rate limiting to prevent message flooding
maxMessagesPerMinute: 300 # Max messages per minute (~5 per second)
violationLogThrottle: 60 # Log rate limit violations once per N seconds
maxMessageSize: 65507 # Max UDP datagram size in bytes
backpressure:
threshold: 80 # Backpressure warning at N% queue utilization
tags: # Tags are added to the data before it's stored in InfluxDB
# - name: env
# value: DEV
@@ -157,6 +168,17 @@ Butler-SOS:
udpServerConfig:
serverHost: <IP or FQDN> # Host/IP where log event server will listen for events from Sense
portLogEvents: 9996 # Port on which log event server will listen for events from Sense
messageQueue:
maxConcurrent: 5 # Max messages processing simultaneously
maxSize: 100 # Max queued messages before dropping
dropStrategy: oldest # Drop 'oldest' or 'newest' messages when queue is full
rateLimit:
enable: true # Enable rate limiting to prevent message flooding
maxMessagesPerMinute: 300 # Max messages per minute (~5 per second)
violationLogThrottle: 60 # Log rate limit violations once per N seconds
maxMessageSize: 65507 # Max UDP datagram size in bytes
backpressure:
threshold: 80 # Backpressure warning at N% queue utilization
tags:
# - name: env
# value: DEV

View File

@@ -17,6 +17,7 @@ import sea from './lib/sea-wrapper.js';
import { getServerTags } from './lib/servertags.js';
import { UdpEvents } from './lib/udp-event.js';
import { UdpQueueHandler } from './lib/udp-queue-handler.js';
import { verifyConfigFileSchema, verifyAppConfig } from './lib/config-file-verify.js';
let instance = null;
@@ -535,6 +536,92 @@ Configuration File:
this.rejectedEvents = null;
}
// ------------------------------------
// Initialize UDP queue handlers
try {
// User events queue handler
if (this.config.get('Butler-SOS.userEvents.enable') === true) {
this.udpQueueHandlerUserEvents = new UdpQueueHandler(
{
name: 'UserEvents',
maxConcurrent: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.messageQueue.maxConcurrent'
),
maxSize: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.messageQueue.maxSize'
),
dropStrategy: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.messageQueue.dropStrategy'
),
rateLimitEnable: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.rateLimit.enable'
),
maxMessagesPerMinute: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.rateLimit.maxMessagesPerMinute'
),
violationLogThrottle: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.rateLimit.violationLogThrottle'
),
maxMessageSize: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.maxMessageSize'
),
backpressureThreshold: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.backpressure.threshold'
),
},
this.logger
);
} else {
this.udpQueueHandlerUserEvents = null;
}
// Log events queue handler
if (
this.config.get('Butler-SOS.logEvents.source.engine.enable') === true ||
this.config.get('Butler-SOS.logEvents.source.proxy.enable') === true ||
this.config.get('Butler-SOS.logEvents.source.repository.enable') === true ||
this.config.get('Butler-SOS.logEvents.source.scheduler.enable') === true ||
this.config.get('Butler-SOS.logEvents.source.qixPerf.enable') === true
) {
this.udpQueueHandlerLogEvents = new UdpQueueHandler(
{
name: 'LogEvents',
maxConcurrent: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.messageQueue.maxConcurrent'
),
maxSize: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.messageQueue.maxSize'
),
dropStrategy: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.messageQueue.dropStrategy'
),
rateLimitEnable: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.rateLimit.enable'
),
maxMessagesPerMinute: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.rateLimit.maxMessagesPerMinute'
),
violationLogThrottle: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.rateLimit.violationLogThrottle'
),
maxMessageSize: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.maxMessageSize'
),
backpressureThreshold: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.backpressure.threshold'
),
},
this.logger
);
} else {
this.udpQueueHandlerLogEvents = null;
}
} catch (err) {
this.logger.error(
`CONFIG: Error setting up UDP queue handlers: ${this.getErrorMessage(err)}`
);
}
// ------------------------------------
// Get info on what servers to monitor
this.serverList = this.config.get('Butler-SOS.serversToMonitor.servers');

View File

@@ -22,8 +22,47 @@ export const logEventsSchema = {
format: 'hostname',
},
portLogEvents: { type: 'number' },
messageQueue: {
type: 'object',
properties: {
maxConcurrent: { type: 'number', minimum: 1 },
maxSize: { type: 'number', minimum: 1 },
dropStrategy: {
type: 'string',
enum: ['oldest', 'newest'],
},
},
required: ['maxConcurrent', 'maxSize', 'dropStrategy'],
additionalProperties: false,
},
rateLimit: {
type: 'object',
properties: {
enable: { type: 'boolean' },
maxMessagesPerMinute: { type: 'number', minimum: 1 },
violationLogThrottle: { type: 'number', minimum: 1 },
},
required: ['enable', 'maxMessagesPerMinute', 'violationLogThrottle'],
additionalProperties: false,
},
maxMessageSize: { type: 'number', minimum: 1, maximum: 65507 },
backpressure: {
type: 'object',
properties: {
threshold: { type: 'number', minimum: 1, maximum: 100 },
},
required: ['threshold'],
additionalProperties: false,
},
},
required: ['serverHost', 'portLogEvents'],
required: [
'serverHost',
'portLogEvents',
'messageQueue',
'rateLimit',
'maxMessageSize',
'backpressure',
],
additionalProperties: false,
},
tags: {

View File

@@ -34,8 +34,47 @@ export const userEventsSchema = {
format: 'hostname',
},
portUserActivityEvents: { type: 'number' },
messageQueue: {
type: 'object',
properties: {
maxConcurrent: { type: 'number', minimum: 1 },
maxSize: { type: 'number', minimum: 1 },
dropStrategy: {
type: 'string',
enum: ['oldest', 'newest'],
},
},
required: ['maxConcurrent', 'maxSize', 'dropStrategy'],
additionalProperties: false,
},
rateLimit: {
type: 'object',
properties: {
enable: { type: 'boolean' },
maxMessagesPerMinute: { type: 'number', minimum: 1 },
violationLogThrottle: { type: 'number', minimum: 1 },
},
required: ['enable', 'maxMessagesPerMinute', 'violationLogThrottle'],
additionalProperties: false,
},
maxMessageSize: { type: 'number', minimum: 1, maximum: 65507 },
backpressure: {
type: 'object',
properties: {
threshold: { type: 'number', minimum: 1, maximum: 100 },
},
required: ['threshold'],
additionalProperties: false,
},
},
required: ['serverHost', 'portUserActivityEvents'],
required: [
'serverHost',
'portUserActivityEvents',
'messageQueue',
'rateLimit',
'maxMessageSize',
'backpressure',
],
additionalProperties: false,
},
tags: {

View File

@@ -0,0 +1,313 @@
import PQueue from 'p-queue';
import globals from '../globals.js';
/**
* Class for handling UDP message queuing, rate limiting, and backpressure.
*
* This class provides:
* - Message queuing with configurable concurrency
* - Rate limiting to prevent message flooding
* - Message size validation
* - Backpressure monitoring
* - Metrics tracking (messages received, dropped, processing time)
*/
export class UdpQueueHandler {
/**
* Creates a new UdpQueueHandler instance.
*
* @param {object} config - Configuration object
* @param {string} config.name - Name of this handler (for logging)
* @param {number} config.maxConcurrent - Max concurrent message processing
* @param {number} config.maxSize - Max queue size
* @param {string} config.dropStrategy - 'oldest' or 'newest' when queue is full
* @param {boolean} config.rateLimitEnable - Enable rate limiting
* @param {number} config.maxMessagesPerMinute - Max messages per minute
* @param {number} config.violationLogThrottle - Log violations once per N seconds
* @param {number} config.maxMessageSize - Max message size in bytes
* @param {number} config.backpressureThreshold - Backpressure warning at N% utilization
* @param {object} logger - Logger object
*/
constructor(config, logger) {
this.name = config.name;
this.logger = logger;
this.config = config;
// Initialize message queue
this.queue = new PQueue({
concurrency: config.maxConcurrent,
timeout: 30000, // 30 second timeout per message
});
// Rate limiting state
this.rateLimitEnable = config.rateLimitEnable;
this.maxMessagesPerMinute = config.maxMessagesPerMinute;
this.violationLogThrottle = config.violationLogThrottle;
this.messageTimestamps = [];
this.lastViolationLog = 0;
// Metrics
this.metrics = {
messagesReceived: 0,
messagesQueued: 0,
messagesProcessed: 0,
messagesDroppedRateLimit: 0,
messagesDroppedQueueFull: 0,
messagesDroppedSize: 0,
messagesFailed: 0,
processingTimes: [],
maxProcessingTimeMs: 0,
};
// Backpressure state
this.backpressureActive = false;
this.logger.info(
`UDP QUEUE [${this.name}]: Initialized with maxConcurrent=${config.maxConcurrent}, maxSize=${config.maxSize}, rateLimit=${config.rateLimitEnable ? config.maxMessagesPerMinute + '/min' : 'disabled'}`
);
}
/**
* Sanitizes a message field by removing control characters and limiting length.
*
* @param {string} field - Field to sanitize
* @param {number} maxLength - Maximum field length
* @returns {string} Sanitized field
*/
sanitizeField(field, maxLength = 500) {
if (typeof field !== 'string') {
return String(field).slice(0, maxLength);
}
// Remove control characters and limit length
return field.replace(/[\x00-\x1F\x7F]/g, '').slice(0, maxLength);
}
/**
* Checks if rate limit is exceeded.
*
* @returns {boolean} True if rate limit exceeded
*/
checkRateLimit() {
if (!this.rateLimitEnable) {
return false;
}
const now = Date.now();
const oneMinuteAgo = now - 60000;
// Remove timestamps older than 1 minute
this.messageTimestamps = this.messageTimestamps.filter((ts) => ts > oneMinuteAgo);
// Check if limit exceeded
if (this.messageTimestamps.length >= this.maxMessagesPerMinute) {
// Log violation if throttle period has passed
if (now - this.lastViolationLog > this.violationLogThrottle * 1000) {
this.logger.warn(
`UDP QUEUE [${this.name}]: Rate limit exceeded (${this.messageTimestamps.length} messages in last minute, max ${this.maxMessagesPerMinute})`
);
this.lastViolationLog = now;
}
return true;
}
// Add current timestamp
this.messageTimestamps.push(now);
return false;
}
/**
* Checks for backpressure condition.
*
* @returns {void}
*/
checkBackpressure() {
const queueUtilization = (this.queue.size / this.config.maxSize) * 100;
if (queueUtilization >= this.config.backpressureThreshold && !this.backpressureActive) {
this.backpressureActive = true;
this.logger.warn(
`UDP QUEUE [${this.name}]: Backpressure detected - queue at ${queueUtilization.toFixed(1)}% (${this.queue.size}/${this.config.maxSize})`
);
} else if (
queueUtilization < this.config.backpressureThreshold &&
this.backpressureActive
) {
this.backpressureActive = false;
this.logger.info(
`UDP QUEUE [${this.name}]: Backpressure relieved - queue at ${queueUtilization.toFixed(1)}%`
);
}
}
/**
* Adds a message to the queue for processing.
*
* @param {Buffer} message - The UDP message
* @param {object} remote - Remote sender info
* @param {Function} handler - Message handler function
* @returns {Promise<boolean>} True if message was queued, false if dropped
*/
async addMessage(message, remote, handler) {
this.metrics.messagesReceived += 1;
// Check message size
if (message.length > this.config.maxMessageSize) {
this.metrics.messagesDroppedSize += 1;
this.logger.warn(
`UDP QUEUE [${this.name}]: Message size ${message.length} exceeds limit ${this.config.maxMessageSize}, dropping`
);
return false;
}
// Check rate limit
if (this.checkRateLimit()) {
this.metrics.messagesDroppedRateLimit += 1;
return false;
}
// Check if queue is full
if (this.queue.size >= this.config.maxSize) {
this.metrics.messagesDroppedQueueFull += 1;
// Log with throttling (max once per 10 seconds)
const now = Date.now();
if (!this.lastQueueFullLog || now - this.lastQueueFullLog > 10000) {
this.logger.warn(
`UDP QUEUE [${this.name}]: Queue full (${this.queue.size}/${this.config.maxSize}), dropping ${this.config.dropStrategy} message`
);
this.lastQueueFullLog = now;
}
return false;
}
// Add to queue
this.metrics.messagesQueued += 1;
this.checkBackpressure();
// Queue the message processing
this.queue
.add(async () => {
const startTime = Date.now();
try {
await handler(message, remote);
this.metrics.messagesProcessed += 1;
const processingTime = Date.now() - startTime;
this.metrics.processingTimes.push(processingTime);
if (processingTime > this.metrics.maxProcessingTimeMs) {
this.metrics.maxProcessingTimeMs = processingTime;
}
// Keep only last 1000 processing times for stats
if (this.metrics.processingTimes.length > 1000) {
this.metrics.processingTimes.shift();
}
this.logger.debug(
`UDP QUEUE [${this.name}]: Message processed in ${processingTime}ms (queue: ${this.queue.size}/${this.config.maxSize})`
);
} catch (err) {
this.metrics.messagesFailed += 1;
this.logger.error(
`UDP QUEUE [${this.name}]: Error processing message: ${err.message}`
);
}
})
.catch((err) => {
// Handle queue timeout or other queue errors
this.metrics.messagesFailed += 1;
this.logger.error(`UDP QUEUE [${this.name}]: Queue error: ${err.message}`);
});
return true;
}
/**
* Gets current queue metrics.
*
* @returns {object} Metrics object
*/
getMetrics() {
const processingTimes = this.metrics.processingTimes;
const avgProcessingTime =
processingTimes.length > 0
? processingTimes.reduce((a, b) => a + b, 0) / processingTimes.length
: 0;
// Calculate p95
let p95ProcessingTime = 0;
if (processingTimes.length > 0) {
const sorted = [...processingTimes].sort((a, b) => a - b);
const p95Index = Math.floor(sorted.length * 0.95);
p95ProcessingTime = sorted[p95Index] || 0;
}
return {
name: this.name,
queue: {
currentSize: this.queue.size,
maxSize: this.config.maxSize,
utilizationPercent: (this.queue.size / this.config.maxSize) * 100,
pendingCount: this.queue.pending,
maxConcurrent: this.config.maxConcurrent,
},
messages: {
received: this.metrics.messagesReceived,
queued: this.metrics.messagesQueued,
processed: this.metrics.messagesProcessed,
failed: this.metrics.messagesFailed,
},
dropped: {
total:
this.metrics.messagesDroppedRateLimit +
this.metrics.messagesDroppedQueueFull +
this.metrics.messagesDroppedSize,
rateLimit: this.metrics.messagesDroppedRateLimit,
queueFull: this.metrics.messagesDroppedQueueFull,
messageSize: this.metrics.messagesDroppedSize,
},
processingTime: {
avgMs: Math.round(avgProcessingTime),
p95Ms: Math.round(p95ProcessingTime),
maxMs: this.metrics.maxProcessingTimeMs,
},
rateLimit: {
enabled: this.rateLimitEnable,
maxPerMinute: this.maxMessagesPerMinute,
currentRate: this.messageTimestamps.length,
},
backpressure: this.backpressureActive,
};
}
/**
* Clears metrics (for periodic reset).
*
* @returns {void}
*/
clearMetrics() {
// Keep cumulative counters, only clear processing times
this.metrics.processingTimes = [];
this.metrics.maxProcessingTimeMs = 0;
}
/**
* Waits for all queued messages to complete processing.
*
* @returns {Promise<void>}
*/
async waitForEmpty() {
await this.queue.onEmpty();
await this.queue.onIdle();
}
/**
* Gets queue status summary for logging.
*
* @returns {string} Status summary
*/
getStatus() {
const metrics = this.getMetrics();
return `Queue: ${metrics.queue.currentSize}/${metrics.queue.maxSize} (${metrics.queue.utilizationPercent.toFixed(1)}%), Processed: ${metrics.messages.processed}, Dropped: ${metrics.dropped.total}, Rate: ${metrics.rateLimit.currentRate}/min`;
}
}

View File

@@ -19,5 +19,22 @@ export function udpInitLogEventServer() {
globals.udpServerLogEvents.socket.on('listening', listeningEventHandler);
// Handler for UDP messages relating to log events
globals.udpServerLogEvents.socket.on('message', messageEventHandler);
// Wrap with queue handler if available
if (globals.udpQueueHandlerLogEvents) {
globals.udpServerLogEvents.socket.on('message', (message, remote) => {
globals.udpQueueHandlerLogEvents.addMessage(message, remote, messageEventHandler);
});
} else {
globals.udpServerLogEvents.socket.on('message', messageEventHandler);
}
// Error handler for UDP socket
globals.udpServerLogEvents.socket.on('error', (err) => {
globals.logger.error(`UDP LOG EVENTS: Socket error: ${err.message}`);
});
// Close handler for UDP socket
globals.udpServerLogEvents.socket.on('close', () => {
globals.logger.warn('UDP LOG EVENTS: Socket closed');
});
}

View File

@@ -19,5 +19,22 @@ export function udpInitUserActivityServer() {
globals.udpServerUserActivity.socket.on('listening', listeningEventHandler);
// Handler for UDP messages relating to user activity events
globals.udpServerUserActivity.socket.on('message', messageEventHandler);
// Wrap with queue handler if available
if (globals.udpQueueHandlerUserEvents) {
globals.udpServerUserActivity.socket.on('message', (message, remote) => {
globals.udpQueueHandlerUserEvents.addMessage(message, remote, messageEventHandler);
});
} else {
globals.udpServerUserActivity.socket.on('message', messageEventHandler);
}
// Error handler for UDP socket
globals.udpServerUserActivity.socket.on('error', (err) => {
globals.logger.error(`UDP USER EVENTS: Socket error: ${err.message}`);
});
// Close handler for UDP socket
globals.udpServerUserActivity.socket.on('close', () => {
globals.logger.warn('UDP USER EVENTS: Socket closed');
});
}