Merge pull request #1153 from ptarmiganlabs/copilot/sub-pr-1148-again

Fix race condition in UDP queue backpressure detection
This commit is contained in:
Göran Sander
2025-12-07 20:29:34 +01:00
committed by GitHub
2 changed files with 14 additions and 8 deletions

View File

@@ -334,8 +334,8 @@ describe('UdpQueueManager', () => {
}
await Promise.all(promises);
// Check backpressure
await queueManager.checkBackpressure();
// Check backpressure with queue size of 8 (80% threshold)
await queueManager.checkBackpressure(8);
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('Backpressure detected')
@@ -345,7 +345,8 @@ describe('UdpQueueManager', () => {
it('should clear backpressure when utilization drops', async () => {
queueManager.backpressureActive = true;
await queueManager.checkBackpressure();
// Check backpressure with queue size of 0 (below 80% of threshold)
await queueManager.checkBackpressure(0);
expect(mockLogger.info).toHaveBeenCalledWith(
expect.stringContaining('Backpressure cleared')

View File

@@ -265,10 +265,11 @@ export class UdpQueueManager {
/**
* Check backpressure and log warning if threshold exceeded
*
* @param {number} queueSize - The current queue size (captured while holding mutex)
* @returns {Promise<void>}
*/
async checkBackpressure() {
const utilizationPercent = (this.queue.size / this.config.messageQueue.maxSize) * 100;
async checkBackpressure(queueSize) {
const utilizationPercent = (queueSize / this.config.messageQueue.maxSize) * 100;
const threshold = this.config.messageQueue.backpressureThreshold;
if (utilizationPercent >= threshold && !this.backpressureActive) {
@@ -288,7 +289,7 @@ export class UdpQueueManager {
const now = Date.now();
if (this.backpressureActive && now - this.lastBackpressureWarning > 60000) {
this.logger.warn(
`[UDP Queue] Backpressure continues for ${this.queueType}: Queue size ${this.queue.size}/${this.config.messageQueue.maxSize}`
`[UDP Queue] Backpressure continues for ${this.queueType}: Queue size ${queueSize}/${this.config.messageQueue.maxSize}`
);
this.lastBackpressureWarning = now;
}
@@ -317,6 +318,7 @@ export class UdpQueueManager {
* @returns {Promise<boolean>} True if message was queued, false if dropped
*/
async addToQueue(processFunction) {
let queueSize;
const release = await this.metricsMutex.acquire();
try {
this.metrics.messagesReceived++;
@@ -331,12 +333,15 @@ export class UdpQueueManager {
}
this.metrics.messagesQueued++;
// Capture queue size while holding mutex to avoid race condition
queueSize = this.queue.size;
} finally {
release();
}
// Check backpressure
await this.checkBackpressure();
// Check backpressure with captured queue size
await this.checkBackpressure(queueSize);
// Add to queue
this.queue