mirror of
https://github.com/ptarmiganlabs/butler-sos.git
synced 2025-12-19 17:58:18 -05:00
Fix dropStrategy implementation and improve error handling consistency
- Add validation for dropStrategy in constructor (must be 'oldest' or 'newest') - Document limitation: p-queue doesn't support direct queue manipulation, so current message is dropped when queue is full (not oldest/newest) - Fix error handling to use globals.getErrorMessage() for consistency with rest of codebase (important for SEA builds) - Update documentation to clarify dropStrategy limitation - Add test for invalid dropStrategy validation Co-authored-by: mountaindude <1029262+mountaindude@users.noreply.github.com>
This commit is contained in:
@@ -27,9 +27,11 @@ Butler-SOS:
|
||||
**How it works:**
|
||||
- Messages are processed with a maximum of `maxConcurrent` operations at once
|
||||
- Additional messages are queued up to `maxSize`
|
||||
- When the queue is full, messages are dropped according to `dropStrategy`
|
||||
- When the queue is full, incoming messages are dropped (current implementation drops the incoming message; the `dropStrategy` setting is validated but not yet fully implemented due to limitations of the underlying p-queue library)
|
||||
- Dropped messages are logged and counted in metrics
|
||||
|
||||
**Note on dropStrategy:** Due to the architecture of the p-queue library, the full oldest/newest drop strategy is not currently implemented. When the queue is full, the current incoming message is dropped. The `dropStrategy` configuration is validated and logged for future enhancement when a custom queue implementation can be used.
|
||||
|
||||
### Rate Limiting
|
||||
|
||||
Rate limiting prevents message flooding from overwhelming Butler SOS.
|
||||
|
||||
@@ -66,6 +66,24 @@ describe('UdpQueueHandler', () => {
|
||||
expect.stringContaining('TestQueue')
|
||||
);
|
||||
});
|
||||
|
||||
test('should throw error for invalid dropStrategy', () => {
|
||||
const config = {
|
||||
name: 'TestQueue',
|
||||
maxConcurrent: 5,
|
||||
maxSize: 100,
|
||||
dropStrategy: 'invalid',
|
||||
rateLimitEnable: true,
|
||||
maxMessagesPerMinute: 300,
|
||||
violationLogThrottle: 60,
|
||||
maxMessageSize: 65507,
|
||||
backpressureThreshold: 80,
|
||||
};
|
||||
|
||||
expect(() => new UdpQueueHandler(config, mockLogger)).toThrow(
|
||||
/Invalid dropStrategy/
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('sanitizeField', () => {
|
||||
|
||||
@@ -32,12 +32,22 @@ export class UdpQueueHandler {
|
||||
this.logger = logger;
|
||||
this.config = config;
|
||||
|
||||
// Validate dropStrategy
|
||||
if (config.dropStrategy !== 'oldest' && config.dropStrategy !== 'newest') {
|
||||
throw new Error(
|
||||
`Invalid dropStrategy: ${config.dropStrategy}. Must be 'oldest' or 'newest'`
|
||||
);
|
||||
}
|
||||
|
||||
// Initialize message queue
|
||||
this.queue = new PQueue({
|
||||
concurrency: config.maxConcurrent,
|
||||
timeout: 30000, // 30 second timeout per message
|
||||
});
|
||||
|
||||
// Queue for pending messages (used when main queue is full)
|
||||
this.pendingMessages = [];
|
||||
|
||||
// Rate limiting state
|
||||
this.rateLimitEnable = config.rateLimitEnable;
|
||||
this.maxMessagesPerMinute = config.maxMessagesPerMinute;
|
||||
@@ -172,11 +182,15 @@ export class UdpQueueHandler {
|
||||
const now = Date.now();
|
||||
if (!this.lastQueueFullLog || now - this.lastQueueFullLog > 10000) {
|
||||
this.logger.warn(
|
||||
`UDP QUEUE [${this.name}]: Queue full (${this.queue.size}/${this.config.maxSize}), dropping ${this.config.dropStrategy} message`
|
||||
`UDP QUEUE [${this.name}]: Queue full (${this.queue.size}/${this.config.maxSize}), dropping message (strategy: ${this.config.dropStrategy})`
|
||||
);
|
||||
this.lastQueueFullLog = now;
|
||||
}
|
||||
|
||||
// Note: With p-queue, we can't directly access queued items to implement
|
||||
// oldest/newest drop strategy. The current message is dropped.
|
||||
// In a real implementation with full control over the queue, you would
|
||||
// remove the oldest/newest item from the queue before adding the new one.
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -209,14 +223,16 @@ export class UdpQueueHandler {
|
||||
} catch (err) {
|
||||
this.metrics.messagesFailed += 1;
|
||||
this.logger.error(
|
||||
`UDP QUEUE [${this.name}]: Error processing message: ${err.message}`
|
||||
`UDP QUEUE [${this.name}]: Error processing message: ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
// Handle queue timeout or other queue errors
|
||||
this.metrics.messagesFailed += 1;
|
||||
this.logger.error(`UDP QUEUE [${this.name}]: Queue error: ${err.message}`);
|
||||
this.logger.error(
|
||||
`UDP QUEUE [${this.name}]: Queue error: ${globals.getErrorMessage(err)}`
|
||||
);
|
||||
});
|
||||
|
||||
return true;
|
||||
|
||||
@@ -30,7 +30,7 @@ export function udpInitLogEventServer() {
|
||||
|
||||
// Error handler for UDP socket
|
||||
globals.udpServerLogEvents.socket.on('error', (err) => {
|
||||
globals.logger.error(`UDP LOG EVENTS: Socket error: ${err.message}`);
|
||||
globals.logger.error(`UDP LOG EVENTS: Socket error: ${globals.getErrorMessage(err)}`);
|
||||
});
|
||||
|
||||
// Close handler for UDP socket
|
||||
|
||||
@@ -30,7 +30,7 @@ export function udpInitUserActivityServer() {
|
||||
|
||||
// Error handler for UDP socket
|
||||
globals.udpServerUserActivity.socket.on('error', (err) => {
|
||||
globals.logger.error(`UDP USER EVENTS: Socket error: ${err.message}`);
|
||||
globals.logger.error(`UDP USER EVENTS: Socket error: ${globals.getErrorMessage(err)}`);
|
||||
});
|
||||
|
||||
// Close handler for UDP socket
|
||||
|
||||
Reference in New Issue
Block a user