Merge pull request #1148 from ptarmiganlabs/better-udp-queue

better udp queue
This commit is contained in:
Göran Sander
2025-12-09 15:46:01 +01:00
committed by GitHub
32 changed files with 3264 additions and 614 deletions

View File

@@ -20,7 +20,7 @@ jobs:
- name: Show github.ref
run: echo "$GITHUB_REF"
- uses: googleapis/release-please-action@v4
- uses: googleapis/release-please-action@16a9c90856f42705d54a6fda1823352bdc62cf38 # v4.4.0
id: release
if: github.repository_owner == 'ptarmiganlabs'
with:
@@ -62,10 +62,10 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: 22
@@ -90,7 +90,7 @@ jobs:
find ./build -type f -name "*.json" -o -name "*.spdx*" -exec ls -la {} \;
- name: Upload SBOM to Release
uses: ncipollo/release-action@v1
uses: ncipollo/release-action@b7eabc95ff50cbeeedec83973935c8f306dfcd0b # v1.20.0
with:
allowUpdates: true
omitBodyDuringUpdate: true
@@ -102,7 +102,7 @@ jobs:
tag: ${{ needs.release-please.outputs.release_tag_name }}
- name: Upload SBOM as Workflow Artifact (backup)
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: sbom-${{ needs.release-please.outputs.release_version }}
path: './build/'
@@ -136,10 +136,10 @@ jobs:
echo "upload_url : ${{ needs.release-please.outputs.release_upload_url }}"
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: lts/*
@@ -264,7 +264,7 @@ jobs:
ls -la
- name: Upload to existing release
uses: ncipollo/release-action@v1
uses: ncipollo/release-action@b7eabc95ff50cbeeedec83973935c8f306dfcd0b # v1.20.0
with:
allowUpdates: true
omitBodyDuringUpdate: true
@@ -309,10 +309,10 @@ jobs:
Write-Output 'upload_url : ${{ needs.release-please.outputs.release_upload_url }}'
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: lts/*
@@ -401,7 +401,7 @@ jobs:
dir
- name: Upload to existing release
uses: ncipollo/release-action@v1
uses: ncipollo/release-action@b7eabc95ff50cbeeedec83973935c8f306dfcd0b # v1.20.0
with:
allowUpdates: true
omitBodyDuringUpdate: true
@@ -437,10 +437,10 @@ jobs:
echo "upload_url : ${{ needs.release-please.outputs.release_upload_url }}"
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: lts/*
@@ -482,7 +482,7 @@ jobs:
ls -la
- name: Upload to existing release
uses: ncipollo/release-action@v1
uses: ncipollo/release-action@b7eabc95ff50cbeeedec83973935c8f306dfcd0b # v1.20.0
with:
allowUpdates: true
omitBodyDuringUpdate: true

View File

@@ -3,7 +3,7 @@
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
name: "CodeQL"
name: 'CodeQL'
on:
workflow_dispatch:
@@ -34,43 +34,43 @@ jobs:
# https://docs.github.com/en/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#overriding-automatic-language-detection
steps:
- name: Checkout repository
uses: actions/checkout@v5
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2
# If this run was triggered by a pull request event, then checkout
# the head of the pull request instead of the merge commit.
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
# If this run was triggered by a pull request event, then checkout
# the head of the pull request instead of the merge commit.
- run: git checkout HEAD^2
if: ${{ github.event_name == 'pull_request' }}
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7

View File

@@ -39,25 +39,25 @@ jobs:
# payload: '{ "type":"ci-test", "repo": "${{ github.repository }}", "job": "${{ github.job }}", "workflow": "${{ github.workflow }}", "nodeVersion": "${{ env.NODE_VERSION }}","status": "in_progress","conclusion":"${{ env.JOB_CONCLUSION }}" }'
# username: ${{ secrets.PUBLIC_MQTT_BROKER_USER }}
# connectTimeout: 30000
- name: Show input values
run: |
echo "Inputs: ${{ github.event.inputs }}"
- name: Checkout repository
uses: actions/checkout@v5
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'
@@ -66,16 +66,16 @@ jobs:
# https://github.com/marketplace/actions/docker-login
# https://docs.github.com/en/actions/reference/context-and-expression-syntax-for-github-actions#github-context
- name: Login to Docker Hub
uses: docker/login-action@v3
uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef # v3.6.0
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
# Docker tag has format: refs/tags/v1.2.3
# We need to extract the tag semver from the full tag
# We need to extract the tag semver from the full tag
# Store the tag in GITHUB_ENV environment variable
- name: Create clean tag for Docker
run: |
@@ -120,9 +120,9 @@ jobs:
# Extract metadata (tags, labels) for Docker
# https://github.com/marketplace/actions/docker-metadata-action
- name: Extract Docker metadata
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051 # v5.10.0
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'
@@ -150,7 +150,7 @@ jobs:
- name: Build and push
id: docker_build
uses: docker/build-push-action@v6
uses: docker/build-push-action@263435318d21b8e681c14492fe198d362a7d2c83 # v6.18.0
if: |
github.event_name != 'pull_request' &&
github.repository_owner == 'ptarmiganlabs'

View File

@@ -255,10 +255,10 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- name: Checkout repository
uses: actions/checkout@v5
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Setup Node.js
uses: actions/setup-node@v5
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: 22
@@ -279,7 +279,7 @@ jobs:
github.repository_owner == 'ptarmiganlabs' &&
matrix.os == 'ubuntu-latest'
continue-on-error: true # To make sure that SARIF upload gets called
uses: snyk/actions/node@master
uses: snyk/actions/node@cdb760004ba9ea4d525f2e043745dfe85bb9077e # master
env:
# This is where you will need to introduce the Snyk API token created with your Snyk account
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
@@ -292,7 +292,7 @@ jobs:
github.repository_owner == 'ptarmiganlabs' &&
matrix.os == 'ubuntu-latest'
continue-on-error: true
uses: github/codeql-action/upload-sarif@v3
uses: github/codeql-action/upload-sarif@cf1bb45a277cb3c205638b2cd5c984db1c46a412 # v4.31.7
with:
sarif_file: snyk.sarif
@@ -316,7 +316,7 @@ jobs:
${{ matrix.build }}
- name: Upload insider build artifacts to GitHub
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: ${{ matrix.artifact_insider }}
path: ${{ matrix.artifact_insider }}
@@ -332,7 +332,7 @@ jobs:
BUTLER_SOS_INSIDER_DOWNLOAD_PATH: ${{ vars.BUTLER_SOS_INSIDER_DOWNLOAD_PATH || './download' }}
steps:
- name: Download Windows insider build artifact
uses: actions/download-artifact@v5
uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0
with:
name: butler-sos--win-x64--${{ github.sha }}.zip
path: ${{ env.BUTLER_SOS_INSIDER_DOWNLOAD_PATH }}

View File

@@ -18,4 +18,4 @@ permissions:
jobs:
scan-scheduled:
uses: 'google/osv-scanner-action/.github/workflows/osv-scanner-reusable.yml@v2.2.2'
uses: 'google/osv-scanner-action/.github/workflows/osv-scanner-reusable.yml@90b209d0ea55cea1da9fc0c4e65782cc6acb6e2e' # v2.2.2

View File

@@ -13,7 +13,7 @@ jobs:
steps:
- name: VirusTotal Scan
uses: crazy-max/ghaction-virustotal@v4
uses: crazy-max/ghaction-virustotal@d34968c958ae283fe976efed637081b9f9dcf74f # v4.2.0
with:
vt_api_key: ${{ secrets.VIRUSTOTAL_API_KEY }}
request_rate: 4

View File

@@ -4,19 +4,24 @@ services:
image: ptarmiganlabs/butler-sos:latest
container_name: butler-sos
restart: always
command:
- 'node'
- 'src/butler-sos.js'
- '--configfile'
- '/nodeapp/config/production.yaml'
ports:
- "9997:9997" # UDP user events
- "9996:9996" # UDP log events
- "9842:9842" # Prometheus metrics
- "3100:3100" # Config file visualization
- '9997:9997' # UDP user events
- '9996:9996' # UDP log events
- '9842:9842' # Prometheus metrics
- '3100:3100' # Config file visualization
volumes:
# Make config file accessible outside of container
- "./config:/nodeapp/config"
- "./log:/nodeapp/log"
- './config:/nodeapp/config'
- './log:/nodeapp/log'
environment:
- "NODE_ENV=production" # Means that Butler SOS will read config data from production.yaml
- 'NODE_ENV=production' # Means that Butler SOS will read config data from production.yaml
logging:
driver: "json-file"
driver: 'json-file'
options:
max-file: "5"
max-size: "5m"
max-file: '5'
max-size: '5m'

View File

@@ -0,0 +1,529 @@
# UDP Message Queue Handling Improvements
## Overview
Butler SOS receives continuous streams of UDP messages from Qlik Sense Enterprise on Windows (QSEoW) containing log events and user activity information. During high-usage periods or when Sense experiences issues, these message rates can spike significantly, potentially overwhelming Butler SOS and downstream systems.
This document describes the new UDP message queue handling features that provide protection against message flooding, enable better monitoring of message processing, and ensure Butler SOS remains stable under high load conditions.
### Key Features
- **Always-On Message Queuing**: All UDP messages flow through managed queues with configurable concurrency limits
- **Optional Rate Limiting**: Protect against DoS conditions by limiting messages per minute
- **Message Size Validation**: Automatically reject oversized messages
- **Backpressure Detection**: Warnings when queue utilization exceeds thresholds
- **Comprehensive Metrics**: Track queue health, dropped messages, and processing performance
- **InfluxDB Integration**: Store queue metrics for monitoring and alerting
## Architecture
### Message Flow
```
UDP Socket → Size Validation → Rate Limit Check → Input Sanitization → Message Queue → Concurrent Processing → Destinations
↓ ↓ ↓
Drop (size) Drop (rate limit) Drop (queue full)
```
### Components
1. **UdpQueueManager**: Core class managing queuing, rate limiting, and metrics
2. **Circular Buffer**: Tracks last 1000 processing times for percentile calculations
3. **Rate Limiter**: Fixed-window counter (resets per minute)
4. **Metrics Collector**: Thread-safe counters and timing data
5. **InfluxDB Writer**: Periodic metrics storage (configurable interval)
### Queue Behavior
- Messages are processed with controlled concurrency (default: 10 concurrent)
- Queue has maximum size limit (default: 200 messages)
- When queue is full, messages are dropped using configured strategy (oldest/newest)
- Backpressure warnings trigger at configurable utilization threshold (default: 80%)
- Processing times are tracked in circular buffer for performance analysis
## Configuration Reference
### User Events Queue Configuration
```yaml
Butler-SOS:
userEvents:
udpServerConfig:
serverHost: <IP or FQDN>
portUserActivityEvents: 9997
# Message queue settings
messageQueue:
maxConcurrent: 10 # Max concurrent message processing
maxSize: 200 # Max queue size before rejecting
backpressureThreshold: 80 # Warn at this % utilization
# Rate limiting (optional)
rateLimit:
enable: false # Enable rate limiting
maxMessagesPerMinute: 600 # ~10 messages/second
# Message size validation
maxMessageSize: 65507 # UDP maximum datagram size
# Queue metrics storage
queueMetrics:
influxdb:
enable: false # Store metrics in InfluxDB
writeFrequency: 20000 # Write interval (ms)
measurementName: user_events_queue
tags:
- name: env
value: prod
```
### Log Events Queue Configuration
```yaml
Butler-SOS:
logEvents:
udpServerConfig:
serverHost: <IP or FQDN>
portLogEvents: 9996
# Same structure as userEvents.udpServerConfig
messageQueue:
maxConcurrent: 10
maxSize: 200
backpressureThreshold: 80
rateLimit:
enable: false
maxMessagesPerMinute: 600
maxMessageSize: 65507
queueMetrics:
influxdb:
enable: false
writeFrequency: 20000
measurementName: log_events_queue
tags: []
```
### Configuration Properties Explained
#### messageQueue
- **maxConcurrent** (default: 10): Number of messages processed simultaneously. Higher values = more throughput but more CPU/memory usage. Recommended: 5-20 depending on server capacity.
- **maxSize** (default: 200): Maximum queue size. When exceeded, new messages are rejected and dropped. Higher values provide more buffer during spikes but use more memory. Recommended: 100-500. Note: Queue size only counts pending messages (not currently processing), so total capacity is maxSize + maxConcurrent.
- **backpressureThreshold** (default: 80): Queue utilization percentage that triggers warnings. Recommended: 70-90%.
#### rateLimit
- **enable** (default: false): Enable rate limiting to prevent message flooding.
- **maxMessagesPerMinute** (default: 600): Maximum messages allowed per minute (~10/second). Uses fixed-window counter that resets each minute. Recommended values:
- Light usage: 300 (5/sec)
- Normal usage: 600 (10/sec)
- Heavy usage: 1200 (20/sec)
#### maxMessageSize
- **value** (default: 65507): Maximum UDP message size in bytes. The default is the UDP maximum datagram size. Messages exceeding this size are rejected.
#### queueMetrics.influxdb
- **enable** (default: false): Store queue metrics in InfluxDB for monitoring.
- **writeFrequency** (default: 20000): How often to write metrics in milliseconds. Lower values = more frequent updates but more InfluxDB writes.
- **measurementName**: InfluxDB measurement name. Defaults: `user_events_queue` or `log_events_queue`.
- **tags**: Optional tags added to all queue metrics points.
## Breaking Changes
### YAML Configuration Structure
**Breaking Change**: New required sections under `udpServerConfig` for both `userEvents` and `logEvents`:
- `messageQueue` (required)
- `rateLimit` (required)
- `maxMessageSize` (required)
- `queueMetrics` (required)
**Impact**: Existing Butler SOS v13.x configuration files will fail validation without these new sections.
### Migration Guide
1. **Backup your current config file**:
```bash
cp config/production.yaml config/production.yaml.backup
```
2. **Add new sections** to your config file under both `userEvents.udpServerConfig` and `logEvents.udpServerConfig`:
```yaml
# Add to userEvents.udpServerConfig:
messageQueue:
maxConcurrent: 10
maxSize: 200
backpressureThreshold: 80
rateLimit:
enable: false
maxMessagesPerMinute: 600
maxMessageSize: 65507
queueMetrics:
influxdb:
enable: false
writeFrequency: 20000
measurementName: user_events_queue
tags: []
# Add same structure to logEvents.udpServerConfig with measurementName: log_events_queue
```
3. **Validate your config**:
```bash
node src/butler-sos.js --configfile config/production.yaml --check-config
```
4. **Test with defaults first**: The default values are conservative and safe for most environments. Adjust after monitoring queue metrics.
### Default Behavior
- **Queues are always enabled**: Cannot be disabled, ensures message processing stability
- **Rate limiting is disabled**: Must be explicitly enabled if needed
- **Queue metrics storage is disabled**: Must be explicitly enabled to store metrics in InfluxDB
- **All messages flow through queues**: Even with rate limiting disabled, messages are queued and processed with controlled concurrency
## InfluxDB Queue Metrics Schema
### Measurements
Two separate measurements for the two UDP servers:
- `user_events_queue` (configurable via `userEvents.udpServerConfig.queueMetrics.influxdb.measurementName`)
- `log_events_queue` (configurable via `logEvents.udpServerConfig.queueMetrics.influxdb.measurementName`)
### Tags
| Tag | Type | Description | Example |
| ------------ | ------ | ------------------------ | --------------------------- |
| `queue_type` | string | Queue identifier | `user_events`, `log_events` |
| `host` | string | Butler SOS hostname | `butler-sos-prod` |
| Custom tags | string | From config `tags` array | `env=prod` |
### Fields
#### Queue Status Fields
| Field | Type | Description |
| ----------------------- | ------- | ------------------------------------ |
| `queue_size` | integer | Current number of messages in queue |
| `queue_max_size` | integer | Maximum queue capacity |
| `queue_utilization_pct` | float | Queue utilization percentage (0-100) |
| `queue_pending` | integer | Messages currently being processed |
#### Message Counter Fields
| Field | Type | Description |
| -------------------- | ------- | ------------------------------------------ |
| `messages_received` | integer | Total messages received (since last write) |
| `messages_queued` | integer | Messages added to queue |
| `messages_processed` | integer | Messages successfully processed |
| `messages_failed` | integer | Messages that failed processing |
#### Dropped Message Fields
| Field | Type | Description |
| ----------------------------- | ------- | ------------------------------ |
| `messages_dropped_total` | integer | Total dropped messages |
| `messages_dropped_rate_limit` | integer | Dropped due to rate limit |
| `messages_dropped_queue_full` | integer | Dropped due to full queue |
| `messages_dropped_size` | integer | Dropped due to size validation |
#### Performance Fields
| Field | Type | Description |
| ------------------------ | ----- | -------------------------------------- |
| `processing_time_avg_ms` | float | Average processing time (milliseconds) |
| `processing_time_p95_ms` | float | 95th percentile processing time |
| `processing_time_max_ms` | float | Maximum processing time |
#### Rate Limit Fields
| Field | Type | Description |
| --------------------- | ------- | ------------------------------------------ |
| `rate_limit_current` | integer | Current message rate (messages/minute) |
| `backpressure_active` | integer | Backpressure status (0=inactive, 1=active) |
### Example Grafana Queries
**Queue Utilization Over Time**:
```flux
from(bucket: "butler-sos")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "user_events_queue" or r["_measurement"] == "log_events_queue")
|> filter(fn: (r) => r["_field"] == "queue_utilization_pct")
```
**Messages Dropped by Reason**:
```flux
from(bucket: "butler-sos")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "user_events_queue")
|> filter(fn: (r) => r["_field"] =~ /messages_dropped_/)
|> aggregateWindow(every: 1m, fn: sum)
```
**Processing Time Percentiles**:
```flux
from(bucket: "butler-sos")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "log_events_queue")
|> filter(fn: (r) => r["_field"] == "processing_time_p95_ms" or r["_field"] == "processing_time_avg_ms")
```
**Backpressure Events**:
```flux
from(bucket: "butler-sos")
|> range(start: -24h)
|> filter(fn: (r) => r["_field"] == "backpressure_active")
|> filter(fn: (r) => r["_value"] == 1)
```
## Performance Tuning
### Sizing Guidelines
#### Small Environment (< 50 users, < 10 apps)
```yaml
messageQueue:
maxConcurrent: 5
maxSize: 100
rateLimit:
enable: false
```
#### Medium Environment (50-200 users, 10-50 apps)
```yaml
messageQueue:
maxConcurrent: 10
maxSize: 200
rateLimit:
enable: false # Enable if experiencing issues
maxMessagesPerMinute: 600
```
#### Large Environment (200+ users, 50+ apps)
```yaml
messageQueue:
maxConcurrent: 20
maxSize: 500
rateLimit:
enable: true
maxMessagesPerMinute: 1200
```
### Tuning Based on Metrics
Monitor these metrics to adjust configuration:
1. **High Queue Utilization** (consistently > 80%):
- Increase `maxConcurrent` (more parallel processing)
- Increase `maxSize` (more buffer capacity)
- Check if downstream systems (InfluxDB, MQTT) are bottleneck
2. **Frequent Dropped Messages** (`messages_dropped_queue_full` > 0):
- Increase `maxSize`
- Increase `maxConcurrent`
- Consider enabling rate limiting at Sense side
3. **High Processing Times** (p95 > 1000ms):
- Decrease `maxConcurrent` (reduce resource contention)
- Check downstream system performance
- Review network latency
4. **Rate Limit Violations** (`messages_dropped_rate_limit` > 0):
- Increase `maxMessagesPerMinute` if capacity allows
- Investigate why Sense is sending excessive messages
- Consider this normal during high-activity periods
### Resource Considerations
**Memory Usage**:
- Each queued message: ~1-5 KB
- `maxSize: 200` ≈ 200-1000 KB per queue
- Two queues (user + log events) ≈ 400-2000 KB total
- Circular buffer: ~50 KB per queue
**CPU Usage**:
- Higher `maxConcurrent` = more CPU cores utilized
- Recommended: Set `maxConcurrent` ≤ number of CPU cores
- Rate limiting has minimal CPU overhead
**InfluxDB Load**:
- Each queue writes metrics at `writeFrequency` interval
- Default 20 seconds = 3 writes/minute per queue = 6 writes/minute total
- Consider increasing interval if InfluxDB is under load
## SEA (Single Executable Application) Compatibility
The `p-queue` package (v8.0.1) used for message queuing is fully compatible with Node.js Single Executable Applications (SEA). Butler SOS can be packaged as a standalone executable without issues.
### Verified Compatibility
- ✅ p-queue v8.0.1 works with Node.js SEA
- ✅ All queue manager features functional in SEA mode
- ✅ No dynamic imports or eval() usage
- ✅ No native dependencies beyond Node.js built-ins
## Troubleshooting
### Issue: Backpressure Warnings
**Symptom**: Log messages like:
```
WARN: [UDP Queue] Backpressure detected for user_events: Queue utilization 85.5% (threshold: 80%)
```
**Causes**:
- Message rate exceeds processing capacity
- Downstream systems (InfluxDB/MQTT) slow to respond
- Insufficient `maxConcurrent` setting
**Solutions**:
1. Monitor queue metrics to identify pattern
2. Increase `maxConcurrent` if CPU/memory available
3. Increase `maxSize` for more buffer capacity
4. Check downstream system performance
5. Enable rate limiting if messages coming too fast
### Issue: Messages Being Dropped
**Symptom**: `messages_dropped_*` counters increasing
**Dropped due to queue full** (`messages_dropped_queue_full`):
- Queue size too small for message bursts
- Increase `maxSize`
- Increase `maxConcurrent` for faster processing
**Dropped due to rate limit** (`messages_dropped_rate_limit`):
- Rate limit too restrictive
- Increase `maxMessagesPerMinute`
- Disable rate limiting if appropriate
- Investigate why Sense is sending so many messages
**Dropped due to size** (`messages_dropped_size`):
- Messages exceed UDP datagram size
- Usually indicates malformed messages from Sense
- Check Sense log appender configuration
### Issue: High Processing Times
**Symptom**: `processing_time_p95_ms` > 1000ms
**Causes**:
- Downstream systems slow (InfluxDB write latency, MQTT broker delays)
- Network latency
- Too many concurrent operations causing resource contention
**Solutions**:
1. Check InfluxDB query performance
2. Check MQTT broker responsiveness
3. Reduce `maxConcurrent` to decrease resource contention
4. Review network latency between Butler SOS and destinations
### Issue: Config Validation Errors
**Symptom**: Butler SOS fails to start with config validation errors
**Cause**: Missing required queue configuration sections
**Solution**: Follow migration guide above to add all required sections to config file
### Issue: No Queue Metrics in InfluxDB
**Symptom**: Queue metrics not appearing in InfluxDB
**Checklist**:
1. ✅ `queueMetrics.influxdb.enable: true` in config?
2. ✅ `Butler-SOS.influxdbConfig.enable: true`?
3. ✅ InfluxDB connection working (check logs)?
4. ✅ Correct measurement name configured?
5. ✅ Wait for `writeFrequency` interval to elapse
### Debug Logging
Enable verbose logging to troubleshoot queue issues:
```yaml
Butler-SOS:
logLevel: verbose # or 'debug' for even more detail
```
Look for log messages with these prefixes:
- `[UDP Queue]` - Queue operations and status
- `UDP QUEUE METRICS INFLUXDB` - Metrics storage operations
- `USER EVENT QUEUE METRICS` / `LOG EVENT QUEUE METRICS` - Per-queue status
## Monitoring Best Practices
### Essential Alerts
1. **Queue Full Alert**: Trigger when `queue_utilization_pct > 90` for >5 minutes
2. **Dropped Messages Alert**: Trigger when `messages_dropped_total > 100` per minute
3. **Backpressure Alert**: Trigger when `backpressure_active = 1` for >10 minutes
4. **Processing Degradation**: Trigger when `processing_time_p95_ms > 2000`
### Recommended Dashboard Panels
1. Queue utilization percentage (line chart, both queues)
2. Messages received vs processed (line chart)
3. Dropped messages by reason (stacked area chart)
4. Processing time percentiles (line chart: avg, p95, max)
5. Backpressure status (state timeline)
6. Current queue size (gauge)
### Proactive Monitoring
- Review queue metrics weekly during normal operations
- Establish baseline processing times for your environment
- Set alerts based on your baseline + margin
- Test queue behavior during peak usage periods
- Adjust thresholds after observing patterns
## Additional Resources
- [Butler SOS Documentation](https://butler-sos.ptarmiganlabs.com/)
- [QSEoW Log Appender Configuration](https://help.qlik.com/en-US/sense/Subsystems/Hub/Content/Sense_Hub/Introduction/configure-log-appender.htm)
- [InfluxDB Best Practices](https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/)
- [Grafana Dashboard Creation](https://grafana.com/docs/grafana/latest/dashboards/)
## Support
For issues, questions, or feature requests related to UDP queue handling:
- GitHub Issues: https://github.com/ptarmiganlabs/butler-sos/issues
- Discussion Forum: https://github.com/ptarmiganlabs/butler-sos/discussions
- Email: info@ptarmiganlabs.com

962
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -48,52 +48,53 @@
"dependencies": {
"@breejs/later": "^4.2.0",
"@fastify/rate-limit": "^10.3.0",
"@fastify/sensible": "^6.0.3",
"@fastify/static": "^8.2.0",
"@fastify/sensible": "^6.0.4",
"@fastify/static": "^8.3.0",
"@influxdata/influxdb-client": "^1.35.0",
"@influxdata/influxdb-client-apis": "^1.35.0",
"ajv": "^8.17.1",
"ajv-keywords": "^5.1.0",
"async-mutex": "^0.5.0",
"axios": "^1.12.2",
"commander": "^14.0.1",
"axios": "^1.13.2",
"commander": "^14.0.2",
"config": "^4.1.1",
"fastify": "^5.6.1",
"fastify": "^5.6.2",
"fastify-healthcheck": "^5.1.0",
"fastify-metrics": "^12.1.0",
"fs-extra": "^11.3.2",
"handlebars": "^4.7.8",
"influx": "^5.11.0",
"js-yaml": "^4.1.0",
"js-yaml": "^4.1.1",
"lodash.clonedeep": "^4.5.0",
"luxon": "^3.7.2",
"mqtt": "^5.14.1",
"posthog-node": "^5.9.1",
"p-queue": "^9.0.1",
"posthog-node": "^5.17.2",
"prom-client": "^15.1.3",
"qrs-interact": "^6.3.1",
"systeminformation": "^5.27.10",
"ua-parser-js": "^2.0.5",
"systeminformation": "^5.27.11",
"ua-parser-js": "^2.0.6",
"uuid": "^13.0.0",
"winston": "^3.17.0",
"winston": "^3.19.0",
"winston-daily-rotate-file": "^5.0.0"
},
"devDependencies": {
"@babel/eslint-parser": "^7.28.4",
"@babel/eslint-parser": "^7.28.5",
"@babel/plugin-syntax-import-assertions": "^7.27.1",
"@eslint/js": "^9.36.0",
"@eslint/js": "^9.39.1",
"audit-ci": "^7.1.0",
"esbuild": "^0.25.10",
"esbuild": "^0.27.1",
"eslint-config-prettier": "^10.1.8",
"eslint-formatter-table": "^7.32.1",
"eslint-plugin-jsdoc": "^60.3.0",
"eslint-plugin-jsdoc": "^61.5.0",
"eslint-plugin-prettier": "^5.5.4",
"globals": "^16.4.0",
"globals": "^16.5.0",
"jest": "^30.1.3",
"jsdoc-to-markdown": "^9.1.2",
"jsdoc-to-markdown": "^9.1.3",
"license-checker-rseidelsohn": "^4.4.2",
"lockfile-lint": "^4.14.1",
"npm-check-updates": "^18.3.0",
"prettier": "^3.6.2",
"snyk": "^1.1299.1"
"npm-check-updates": "^19.1.2",
"prettier": "^3.7.4",
"snyk": "^1.1301.0"
}
}

View File

@@ -24,6 +24,7 @@ import { setupAnonUsageReportTimer } from './lib/telemetry.js';
import { setupPromClient } from './lib/prom-client.js';
import { setupConfigVisServer } from './lib/config-visualise.js';
import { setupUdpEventsStorage } from './lib/udp-event.js';
import { setupUdpQueueMetricsStorage } from './lib/post-to-influxdb.js';
// Suppress experimental warnings
// https://stackoverflow.com/questions/55778283/how-to-disable-warnings-when-node-is-launched-via-a-global-shell-script
@@ -225,7 +226,9 @@ async function mainScript() {
if (
globals.config.get('Butler-SOS.logEvents.source.repository.enable') ||
globals.config.get('Butler-SOS.logEvents.source.scheduler.enable') ||
globals.config.get('Butler-SOS.logEvents.source.proxy.enable')
globals.config.get('Butler-SOS.logEvents.source.proxy.enable') ||
globals.config.get('Butler-SOS.logEvents.source.engine.enable') ||
globals.config.get('Butler-SOS.logEvents.source.qixPerf.enable')
) {
udpInitLogEventServer();
@@ -325,8 +328,11 @@ async function mainScript() {
// Set up rejected user/log events storage, if enabled
if (globals.config.get('Butler-SOS.qlikSenseEvents.rejectedEventCount.enable') === true) {
globals.logger.verbose('MAIN: Rejected events storage enabled');
await setupUdpEventsStorage();
const udpEventsStorageIntervalId = setupUdpEventsStorage();
}
// Set up UDP queue metrics storage, if enabled
const udpQueueMetricsIntervalIds = setupUdpQueueMetricsStorage();
}
mainScript();

View File

@@ -120,6 +120,25 @@ Butler-SOS:
udpServerConfig:
serverHost: <IP or FQDN> # Host/IP where user event server will listen for events from Sense
portUserActivityEvents: 9997 # Port on which user event server will listen for events from Sense
# Message queue settings for handling incoming UDP messages
messageQueue:
maxConcurrent: 10 # Max number of messages being processed simultaneously (default: 10)
maxSize: 200 # Max queue size before messages are dropped (default: 200)
backpressureThreshold: 80 # Warn when queue utilization reaches this % (default: 80)
# Rate limiting to prevent message flooding
rateLimit:
enable: false # Enable rate limiting (default: false)
maxMessagesPerMinute: 600 # Max messages per minute, ~10/sec (default: 600)
maxMessageSize: 65507 # Max UDP message size in bytes (default: 65507, UDP max)
# Queue metrics storage in InfluxDB
queueMetrics:
influxdb:
enable: false # Store queue metrics in InfluxDB (default: false)
writeFrequency: 20000 # How often to write metrics, milliseconds (default: 20000)
measurementName: user_events_queue # InfluxDB measurement name (default: user_events_queue)
tags: # Optional tags added to queue metrics
# - name: env
# value: prod
tags: # Tags are added to the data before it's stored in InfluxDB
# - name: env
# value: DEV
@@ -157,6 +176,25 @@ Butler-SOS:
udpServerConfig:
serverHost: <IP or FQDN> # Host/IP where log event server will listen for events from Sense
portLogEvents: 9996 # Port on which log event server will listen for events from Sense
# Message queue settings for handling incoming UDP messages
messageQueue:
maxConcurrent: 10 # Max number of messages being processed simultaneously (default: 10)
maxSize: 200 # Max queue size before messages are dropped (default: 200)
backpressureThreshold: 80 # Warn when queue utilization reaches this % (default: 80)
# Rate limiting to prevent message flooding
rateLimit:
enable: false # Enable rate limiting (default: false)
maxMessagesPerMinute: 600 # Max messages per minute, ~10/sec (default: 600)
maxMessageSize: 65507 # Max UDP message size in bytes (default: 65507, UDP max)
# Queue metrics storage in InfluxDB
queueMetrics:
influxdb:
enable: false # Store queue metrics in InfluxDB (default: false)
writeFrequency: 20000 # How often to write metrics, milliseconds (default: 20000)
measurementName: log_events_queue # InfluxDB measurement name (default: log_events_queue)
tags: # Optional tags added to queue metrics
# - name: env
# value: prod
tags:
# - name: env
# value: DEV

View File

@@ -17,6 +17,7 @@ import sea from './lib/sea-wrapper.js';
import { getServerTags } from './lib/servertags.js';
import { UdpEvents } from './lib/udp-event.js';
import { UdpQueueManager } from './lib/udp-queue-manager.js';
import { verifyConfigFileSchema, verifyAppConfig } from './lib/config-file-verify.js';
let instance = null;
@@ -483,7 +484,7 @@ Configuration File:
// Prepare to listen on port X for incoming UDP connections regarding user activity events
this.udpServerUserActivity.socket = dgram.createSocket({
type: 'udp4',
reuseAddr: true,
reuseAddr: false,
});
this.udpServerUserActivity.portUserActivity = this.config.get(
@@ -507,7 +508,7 @@ Configuration File:
// Prepare to listen on port X for incoming UDP connections regarding user activity events
this.udpServerLogEvents.socket = dgram.createSocket({
type: 'udp4',
reuseAddr: true,
reuseAddr: false,
});
this.udpServerLogEvents.port = this.config.get(
@@ -519,6 +520,44 @@ Configuration File:
);
}
// ------------------------------------
// Initialize UDP queue managers
try {
// User activity queue manager
const userActivityQueueConfig = {
messageQueue: this.config.get('Butler-SOS.userEvents.udpServerConfig.messageQueue'),
rateLimit: this.config.get('Butler-SOS.userEvents.udpServerConfig.rateLimit'),
maxMessageSize: this.config.get(
'Butler-SOS.userEvents.udpServerConfig.maxMessageSize'
),
};
this.udpQueueManagerUserActivity = new UdpQueueManager(
userActivityQueueConfig,
this.logger,
'user_events'
);
// Log events queue manager
const logEventsQueueConfig = {
messageQueue: this.config.get('Butler-SOS.logEvents.udpServerConfig.messageQueue'),
rateLimit: this.config.get('Butler-SOS.logEvents.udpServerConfig.rateLimit'),
maxMessageSize: this.config.get(
'Butler-SOS.logEvents.udpServerConfig.maxMessageSize'
),
};
this.udpQueueManagerLogEvents = new UdpQueueManager(
logEventsQueueConfig,
this.logger,
'log_events'
);
this.logger.info('CONFIG: UDP queue managers initialized');
} catch (err) {
this.logger.error(
`CONFIG: Error initializing UDP queue managers: ${this.getErrorMessage(err)}`
);
}
// ------------------------------------
// Track user events and log event counts
if (this.config.get('Butler-SOS.qlikSenseEvents.eventCount.enable') === true) {

View File

@@ -90,6 +90,24 @@ describe('config-file-schema', () => {
udpServerConfig: {
serverHost: 'localhost',
portUserActivityEvents: 9999,
messageQueue: {
maxConcurrent: 10,
maxSize: 200,
backpressureThreshold: 80,
},
rateLimit: {
enable: false,
maxMessagesPerMinute: 600,
},
maxMessageSize: 65507,
queueMetrics: {
influxdb: {
enable: false,
writeFrequency: 20000,
measurementName: 'user_events_queue',
tags: [],
},
},
},
tags: null,
},
@@ -106,6 +124,24 @@ describe('config-file-schema', () => {
udpServerConfig: {
serverHost: 'localhost',
portLogEvents: 9998,
messageQueue: {
maxConcurrent: 10,
maxSize: 200,
backpressureThreshold: 80,
},
rateLimit: {
enable: false,
maxMessagesPerMinute: 600,
},
maxMessageSize: 65507,
queueMetrics: {
influxdb: {
enable: false,
writeFrequency: 20000,
measurementName: 'log_events_queue',
tags: [],
},
},
},
source: {
engine: {

View File

@@ -0,0 +1,415 @@
/**
* Tests for UDP Queue Manager
*/
import { describe, it, expect, beforeEach, jest, afterEach } from '@jest/globals';
import { UdpQueueManager, sanitizeField } from '../udp-queue-manager.js';
describe('sanitizeField', () => {
it('should remove control characters from string', () => {
const input = 'Hello\x00World\x1FTest\x7F';
const result = sanitizeField(input);
expect(result).toBe('HelloWorldTest');
});
it('should limit string length to default 500 characters', () => {
const input = 'a'.repeat(1000);
const result = sanitizeField(input);
expect(result).toHaveLength(500);
});
it('should limit string length to custom maxLength', () => {
const input = 'a'.repeat(1000);
const result = sanitizeField(input, 100);
expect(result).toHaveLength(100);
});
it('should handle non-string input by converting to string', () => {
const result = sanitizeField(12345);
expect(result).toBe('12345');
});
it('should handle empty string', () => {
const result = sanitizeField('');
expect(result).toBe('');
});
it('should remove newlines and carriage returns', () => {
const input = 'Line1\nLine2\rLine3';
const result = sanitizeField(input);
expect(result).toBe('Line1Line2Line3');
});
it('should preserve normal characters', () => {
const input = 'Hello World! 123 @#$%';
const result = sanitizeField(input);
expect(result).toBe('Hello World! 123 @#$%');
});
});
describe('UdpQueueManager', () => {
let queueManager;
let mockLogger;
let config;
beforeEach(() => {
mockLogger = {
warn: jest.fn(),
info: jest.fn(),
error: jest.fn(),
debug: jest.fn(),
};
config = {
messageQueue: {
maxConcurrent: 5,
maxSize: 10,
backpressureThreshold: 80,
},
rateLimit: {
enable: false,
maxMessagesPerMinute: 60,
},
maxMessageSize: 1024,
};
queueManager = new UdpQueueManager(config, mockLogger, 'test-queue');
});
afterEach(() => {
jest.clearAllMocks();
});
describe('constructor', () => {
it('should initialize with correct config', () => {
expect(queueManager.config).toEqual(config);
expect(queueManager.logger).toEqual(mockLogger);
expect(queueManager.queueType).toBe('test-queue');
});
it('should initialize rate limiter when enabled', () => {
const configWithRateLimit = {
...config,
rateLimit: { enable: true, maxMessagesPerMinute: 60 },
};
const qm = new UdpQueueManager(configWithRateLimit, mockLogger, 'test');
expect(qm.rateLimiter).toBeTruthy();
});
it('should not initialize rate limiter when disabled', () => {
expect(queueManager.rateLimiter).toBeNull();
});
});
describe('validateMessageSize', () => {
it('should accept message within size limit', () => {
const message = Buffer.from('small message');
expect(queueManager.validateMessageSize(message)).toBe(true);
});
it('should reject message exceeding size limit', () => {
const message = Buffer.alloc(2000);
expect(queueManager.validateMessageSize(message)).toBe(false);
});
it('should handle string messages', () => {
const message = 'test string';
expect(queueManager.validateMessageSize(message)).toBe(true);
});
});
describe('checkRateLimit', () => {
it('should return true when rate limiting is disabled', () => {
expect(queueManager.checkRateLimit()).toBe(true);
});
it('should respect rate limit when enabled', () => {
const configWithRateLimit = {
...config,
rateLimit: { enable: true, maxMessagesPerMinute: 5 },
};
const qm = new UdpQueueManager(configWithRateLimit, mockLogger, 'test');
// Should accept first 5 messages
for (let i = 0; i < 5; i++) {
expect(qm.checkRateLimit()).toBe(true);
}
// Should reject 6th message
expect(qm.checkRateLimit()).toBe(false);
});
it('should reset rate limit after 1 minute', async () => {
const configWithRateLimit = {
...config,
rateLimit: { enable: true, maxMessagesPerMinute: 2 },
};
const qm = new UdpQueueManager(configWithRateLimit, mockLogger, 'test');
// Fill up the rate limit
expect(qm.checkRateLimit()).toBe(true);
expect(qm.checkRateLimit()).toBe(true);
expect(qm.checkRateLimit()).toBe(false);
// Fast-forward time by 61 seconds
jest.useFakeTimers();
jest.advanceTimersByTime(61000);
// Should accept messages again
expect(qm.checkRateLimit()).toBe(true);
jest.useRealTimers();
});
});
describe('addToQueue', () => {
it('should queue and process messages', async () => {
const processFunction = jest.fn().mockResolvedValue();
const result = await queueManager.addToQueue(processFunction);
expect(result).toBe(true);
// Wait for queue to process
await queueManager.queue.onIdle();
expect(processFunction).toHaveBeenCalled();
});
it('should reject messages when queue is full', async () => {
// Use very slow processing to ensure queue fills up
const processFunction = jest.fn().mockImplementation(
() =>
new Promise((resolve) => {
// Never resolve during test - keep queue full
})
);
// Rapidly add more messages than queue can hold
// maxConcurrent: 5, maxSize: 10
// Queue.size only counts pending (not currently processing)
// So: 5 processing + 5 pending (queue.size=5) = 10 total capacity
// When we try to add 20, some should be rejected
const promises = [];
for (let i = 0; i < 20; i++) {
promises.push(queueManager.addToQueue(processFunction));
}
// Wait for all attempts to complete
const results = await Promise.all(promises);
// Count rejections and acceptances
const rejectedCount = results.filter((r) => r === false).length;
const acceptedCount = results.filter((r) => r === true).length;
// We should have some rejections (at least a few)
expect(rejectedCount).toBeGreaterThanOrEqual(5);
// And total should be 20
expect(acceptedCount + rejectedCount).toBe(20);
});
it('should track metrics for processed messages', async () => {
const processFunction = jest.fn().mockResolvedValue();
await queueManager.addToQueue(processFunction);
await queueManager.queue.onIdle();
const metrics = await queueManager.getMetrics();
expect(metrics.messagesReceived).toBe(1);
expect(metrics.messagesQueued).toBe(1);
expect(metrics.messagesProcessed).toBe(1);
});
it('should track failed messages', async () => {
const processFunction = jest.fn().mockRejectedValue(new Error('Test error'));
await queueManager.addToQueue(processFunction);
await queueManager.queue.onIdle();
const metrics = await queueManager.getMetrics();
expect(metrics.messagesFailed).toBe(1);
});
});
describe('handleRateLimitDrop', () => {
it('should increment rate limit drop counter', async () => {
await queueManager.handleRateLimitDrop();
const metrics = await queueManager.getMetrics();
expect(metrics.messagesDroppedRateLimit).toBe(1);
expect(metrics.messagesDroppedTotal).toBe(1);
});
});
describe('handleSizeDrop', () => {
it('should increment size drop counter', async () => {
await queueManager.handleSizeDrop();
const metrics = await queueManager.getMetrics();
expect(metrics.messagesDroppedSize).toBe(1);
expect(metrics.messagesDroppedTotal).toBe(1);
});
});
describe('getMetrics', () => {
it('should return all metrics', async () => {
const metrics = await queueManager.getMetrics();
expect(metrics).toHaveProperty('queueSize');
expect(metrics).toHaveProperty('queueMaxSize');
expect(metrics).toHaveProperty('queueUtilizationPct');
expect(metrics).toHaveProperty('messagesReceived');
expect(metrics).toHaveProperty('messagesQueued');
expect(metrics).toHaveProperty('messagesProcessed');
expect(metrics).toHaveProperty('messagesFailed');
expect(metrics).toHaveProperty('messagesDroppedTotal');
expect(metrics).toHaveProperty('messagesDroppedRateLimit');
expect(metrics).toHaveProperty('messagesDroppedQueueFull');
expect(metrics).toHaveProperty('messagesDroppedSize');
expect(metrics).toHaveProperty('processingTimeAvgMs');
expect(metrics).toHaveProperty('processingTimeP95Ms');
expect(metrics).toHaveProperty('processingTimeMaxMs');
expect(metrics).toHaveProperty('backpressureActive');
});
it('should calculate queue utilization correctly', async () => {
const processFunction = jest.fn().mockImplementation(
() =>
new Promise((resolve) => {
// Never resolve - keep in pending state
})
);
// Add messages that will be pending
for (let i = 0; i < 5; i++) {
await queueManager.addToQueue(processFunction);
}
// Check metrics - pending count should be > 0
const metrics = await queueManager.getMetrics();
expect(metrics.queuePending).toBeGreaterThan(0);
expect(metrics.messagesQueued).toBe(5);
});
});
describe('clearMetrics', () => {
it('should reset all metrics', async () => {
// Generate some metrics
const processFunction = jest.fn().mockResolvedValue();
await queueManager.addToQueue(processFunction);
await queueManager.handleRateLimitDrop();
await queueManager.handleSizeDrop();
await queueManager.queue.onIdle();
// Clear metrics
await queueManager.clearMetrics();
const metrics = await queueManager.getMetrics();
expect(metrics.messagesReceived).toBe(0);
expect(metrics.messagesQueued).toBe(0);
expect(metrics.messagesProcessed).toBe(0);
expect(metrics.messagesFailed).toBe(0);
expect(metrics.messagesDroppedTotal).toBe(0);
expect(metrics.messagesDroppedRateLimit).toBe(0);
expect(metrics.messagesDroppedQueueFull).toBe(0);
expect(metrics.messagesDroppedSize).toBe(0);
});
});
describe('checkBackpressure', () => {
it('should activate backpressure when threshold exceeded', async () => {
const processFunction = jest.fn().mockImplementation(
() =>
new Promise((resolve) => {
// Never resolve - keep queue full
})
);
// Fill queue beyond backpressure threshold (80% of maxSize=10 means 8)
// Queue.size only counts pending items (not currently processing)
// So to get queue.size = 8, we need: 5 processing + 8 pending = 13 total
const promises = [];
for (let i = 0; i < 13; i++) {
promises.push(queueManager.addToQueue(processFunction));
}
await Promise.all(promises);
// Check backpressure with queue size of 8 (80% threshold)
await queueManager.checkBackpressure(8);
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('Backpressure detected')
);
});
it('should clear backpressure when utilization drops', async () => {
queueManager.backpressureActive = true;
// Check backpressure with queue size of 0 (below 80% of threshold)
await queueManager.checkBackpressure(0);
expect(mockLogger.info).toHaveBeenCalledWith(
expect.stringContaining('Backpressure cleared')
);
});
});
describe('CircularBuffer', () => {
it('should track processing times', async () => {
const processFunction = jest.fn().mockImplementation(
() =>
new Promise((resolve) => {
setTimeout(resolve, 10);
})
);
await queueManager.addToQueue(processFunction);
await queueManager.queue.onIdle();
const metrics = await queueManager.getMetrics();
expect(metrics.processingTimeAvgMs).toBeGreaterThan(0);
expect(metrics.processingTimeMaxMs).toBeGreaterThan(0);
});
it('should calculate 95th percentile', async () => {
// Add messages with varying processing times
for (let i = 0; i < 20; i++) {
const processFunction = jest.fn().mockImplementation(
() =>
new Promise((resolve) => {
setTimeout(resolve, i * 5);
})
);
await queueManager.addToQueue(processFunction);
}
await queueManager.queue.onIdle();
const metrics = await queueManager.getMetrics();
expect(metrics.processingTimeP95Ms).toBeGreaterThan(0);
});
});
describe('logDroppedMessages', () => {
it('should log dropped messages after 60 seconds', async () => {
jest.useFakeTimers();
queueManager.droppedSinceLastLog = 5;
jest.advanceTimersByTime(61000);
queueManager.logDroppedMessages();
expect(mockLogger.warn).toHaveBeenCalledWith(
expect.stringContaining('Dropped 5 messages')
);
jest.useRealTimers();
});
it('should not log if no messages dropped', () => {
queueManager.logDroppedMessages();
expect(mockLogger.warn).not.toHaveBeenCalled();
});
});
});

View File

@@ -22,8 +22,68 @@ export const logEventsSchema = {
format: 'hostname',
},
portLogEvents: { type: 'number' },
messageQueue: {
type: 'object',
properties: {
maxConcurrent: { type: 'number', default: 10 },
maxSize: { type: 'number', default: 200 },
backpressureThreshold: { type: 'number', default: 80 },
},
required: ['maxConcurrent', 'maxSize', 'backpressureThreshold'],
additionalProperties: false,
},
rateLimit: {
type: 'object',
properties: {
enable: { type: 'boolean', default: false },
maxMessagesPerMinute: { type: 'number', default: 600 },
},
required: ['enable', 'maxMessagesPerMinute'],
additionalProperties: false,
},
maxMessageSize: { type: 'number', default: 65507 },
queueMetrics: {
type: 'object',
properties: {
influxdb: {
type: 'object',
properties: {
enable: { type: 'boolean', default: false },
writeFrequency: { type: 'number', default: 20000 },
measurementName: {
type: 'string',
default: 'log_events_queue',
},
tags: {
type: ['array', 'null'],
items: {
type: 'object',
properties: {
name: { type: 'string' },
value: { type: 'string' },
},
required: ['name', 'value'],
additionalProperties: false,
},
},
},
required: ['enable', 'writeFrequency', 'measurementName', 'tags'],
additionalProperties: false,
},
},
required: ['influxdb'],
additionalProperties: false,
},
},
required: ['serverHost', 'portLogEvents'],
required: [
'serverHost',
'portLogEvents',
'messageQueue',
'rateLimit',
'maxMessageSize',
'queueMetrics',
],
additionalProperties: false,
},
tags: {

View File

@@ -34,8 +34,68 @@ export const userEventsSchema = {
format: 'hostname',
},
portUserActivityEvents: { type: 'number' },
messageQueue: {
type: 'object',
properties: {
maxConcurrent: { type: 'number', default: 10 },
maxSize: { type: 'number', default: 200 },
backpressureThreshold: { type: 'number', default: 80 },
},
required: ['maxConcurrent', 'maxSize', 'backpressureThreshold'],
additionalProperties: false,
},
rateLimit: {
type: 'object',
properties: {
enable: { type: 'boolean', default: false },
maxMessagesPerMinute: { type: 'number', default: 600 },
},
required: ['enable', 'maxMessagesPerMinute'],
additionalProperties: false,
},
maxMessageSize: { type: 'number', default: 65507 },
queueMetrics: {
type: 'object',
properties: {
influxdb: {
type: 'object',
properties: {
enable: { type: 'boolean', default: false },
writeFrequency: { type: 'number', default: 20000 },
measurementName: {
type: 'string',
default: 'user_events_queue',
},
tags: {
type: ['array', 'null'],
items: {
type: 'object',
properties: {
name: { type: 'string' },
value: { type: 'string' },
},
required: ['name', 'value'],
additionalProperties: false,
},
},
},
required: ['enable', 'writeFrequency', 'measurementName', 'tags'],
additionalProperties: false,
},
},
required: ['influxdb'],
additionalProperties: false,
},
},
required: ['serverHost', 'portUserActivityEvents'],
required: [
'serverHost',
'portUserActivityEvents',
'messageQueue',
'rateLimit',
'maxMessageSize',
'queueMetrics',
],
additionalProperties: false,
},
tags: {

View File

@@ -1944,3 +1944,398 @@ export async function storeRejectedEventCountInfluxDB() {
}
}
}
/**
* Store user event queue metrics to InfluxDB
*
* This function retrieves metrics from the user event queue manager and stores them
* in InfluxDB for monitoring queue health, backpressure, dropped messages, and
* processing performance.
*
* @returns {Promise<void>} A promise that resolves when metrics are stored
*/
export async function postUserEventQueueMetricsToInfluxdb() {
try {
// Check if queue metrics are enabled
if (
!globals.config.get(
'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.enable'
)
) {
return;
}
// Get metrics from queue manager
const queueManager = globals.udpQueueManagerUserActivity;
if (!queueManager) {
globals.logger.warn('USER EVENT QUEUE METRICS INFLUXDB: Queue manager not initialized');
return;
}
const metrics = await queueManager.getMetrics();
// Get configuration
const measurementName = globals.config.get(
'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.measurementName'
);
const configTags = globals.config.get(
'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.tags'
);
// InfluxDB 1.x
if (globals.config.get('Butler-SOS.influxdbConfig.version') === 1) {
const point = {
measurement: measurementName,
tags: {
queue_type: 'user_events',
host: globals.hostInfo.hostname,
},
fields: {
queue_size: metrics.queueSize,
queue_max_size: metrics.queueMaxSize,
queue_utilization_pct: metrics.queueUtilizationPct,
queue_pending: metrics.queuePending,
messages_received: metrics.messagesReceived,
messages_queued: metrics.messagesQueued,
messages_processed: metrics.messagesProcessed,
messages_failed: metrics.messagesFailed,
messages_dropped_total: metrics.messagesDroppedTotal,
messages_dropped_rate_limit: metrics.messagesDroppedRateLimit,
messages_dropped_queue_full: metrics.messagesDroppedQueueFull,
messages_dropped_size: metrics.messagesDroppedSize,
processing_time_avg_ms: metrics.processingTimeAvgMs,
processing_time_p95_ms: metrics.processingTimeP95Ms,
processing_time_max_ms: metrics.processingTimeMaxMs,
rate_limit_current: metrics.rateLimitCurrent,
backpressure_active: metrics.backpressureActive,
},
};
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.tags[item.name] = item.value;
}
}
try {
await globals.influx.writePoints([point]);
globals.logger.verbose(
'USER EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v1'
);
} catch (err) {
globals.logger.error(
`USER EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v1! ${err}`
);
return;
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 2) {
// InfluxDB 2.x
const writeOptions = {
flushInterval: 5000,
maxRetries: 2,
};
try {
const org = globals.config.get('Butler-SOS.influxdbConfig.v2Config.org');
const bucketName = globals.config.get('Butler-SOS.influxdbConfig.v2Config.bucket');
const writeApi = globals.influx.getWriteApi(org, bucketName, 'ns', writeOptions);
if (!writeApi) {
globals.logger.warn(
'USER EVENT QUEUE METRICS INFLUXDB: Influxdb write API object not found'
);
return;
}
const point = new Point(measurementName)
.tag('queue_type', 'user_events')
.tag('host', globals.hostInfo.hostname)
.intField('queue_size', metrics.queueSize)
.intField('queue_max_size', metrics.queueMaxSize)
.floatField('queue_utilization_pct', metrics.queueUtilizationPct)
.intField('queue_pending', metrics.queuePending)
.intField('messages_received', metrics.messagesReceived)
.intField('messages_queued', metrics.messagesQueued)
.intField('messages_processed', metrics.messagesProcessed)
.intField('messages_failed', metrics.messagesFailed)
.intField('messages_dropped_total', metrics.messagesDroppedTotal)
.intField('messages_dropped_rate_limit', metrics.messagesDroppedRateLimit)
.intField('messages_dropped_queue_full', metrics.messagesDroppedQueueFull)
.intField('messages_dropped_size', metrics.messagesDroppedSize)
.floatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
.floatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
.floatField('processing_time_max_ms', metrics.processingTimeMaxMs)
.intField('rate_limit_current', metrics.rateLimitCurrent)
.intField('backpressure_active', metrics.backpressureActive);
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.tag(item.name, item.value);
}
}
writeApi.writePoint(point);
await writeApi.close();
globals.logger.verbose(
'USER EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v2'
);
} catch (err) {
globals.logger.error(
`USER EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v2! ${err}`
);
return;
}
}
// Clear metrics after writing
await queueManager.clearMetrics();
} catch (err) {
globals.logger.error(
`USER EVENT QUEUE METRICS INFLUXDB: Error posting queue metrics: ${err}`
);
}
}
/**
* Store log event queue metrics to InfluxDB
*
* This function retrieves metrics from the log event queue manager and stores them
* in InfluxDB for monitoring queue health, backpressure, dropped messages, and
* processing performance.
*
* @returns {Promise<void>} A promise that resolves when metrics are stored
*/
export async function postLogEventQueueMetricsToInfluxdb() {
try {
// Check if queue metrics are enabled
if (
!globals.config.get('Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.enable')
) {
return;
}
// Get metrics from queue manager
const queueManager = globals.udpQueueManagerLogEvents;
if (!queueManager) {
globals.logger.warn('LOG EVENT QUEUE METRICS INFLUXDB: Queue manager not initialized');
return;
}
const metrics = await queueManager.getMetrics();
// Get configuration
const measurementName = globals.config.get(
'Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.measurementName'
);
const configTags = globals.config.get(
'Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.tags'
);
// InfluxDB 1.x
if (globals.config.get('Butler-SOS.influxdbConfig.version') === 1) {
const point = {
measurement: measurementName,
tags: {
queue_type: 'log_events',
host: globals.hostInfo.hostname,
},
fields: {
queue_size: metrics.queueSize,
queue_max_size: metrics.queueMaxSize,
queue_utilization_pct: metrics.queueUtilizationPct,
queue_pending: metrics.queuePending,
messages_received: metrics.messagesReceived,
messages_queued: metrics.messagesQueued,
messages_processed: metrics.messagesProcessed,
messages_failed: metrics.messagesFailed,
messages_dropped_total: metrics.messagesDroppedTotal,
messages_dropped_rate_limit: metrics.messagesDroppedRateLimit,
messages_dropped_queue_full: metrics.messagesDroppedQueueFull,
messages_dropped_size: metrics.messagesDroppedSize,
processing_time_avg_ms: metrics.processingTimeAvgMs,
processing_time_p95_ms: metrics.processingTimeP95Ms,
processing_time_max_ms: metrics.processingTimeMaxMs,
rate_limit_current: metrics.rateLimitCurrent,
backpressure_active: metrics.backpressureActive,
},
};
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.tags[item.name] = item.value;
}
}
try {
await globals.influx.writePoints([point]);
globals.logger.verbose(
'LOG EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v1'
);
} catch (err) {
globals.logger.error(
`LOG EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v1! ${err}`
);
return;
}
} else if (globals.config.get('Butler-SOS.influxdbConfig.version') === 2) {
// InfluxDB 2.x
const writeOptions = {
flushInterval: 5000,
maxRetries: 2,
};
try {
const org = globals.config.get('Butler-SOS.influxdbConfig.v2Config.org');
const bucketName = globals.config.get('Butler-SOS.influxdbConfig.v2Config.bucket');
const writeApi = globals.influx.getWriteApi(org, bucketName, 'ns', writeOptions);
if (!writeApi) {
globals.logger.warn(
'LOG EVENT QUEUE METRICS INFLUXDB: Influxdb write API object not found'
);
return;
}
const point = new Point(measurementName)
.tag('queue_type', 'log_events')
.tag('host', globals.hostInfo.hostname)
.intField('queue_size', metrics.queueSize)
.intField('queue_max_size', metrics.queueMaxSize)
.floatField('queue_utilization_pct', metrics.queueUtilizationPct)
.intField('queue_pending', metrics.queuePending)
.intField('messages_received', metrics.messagesReceived)
.intField('messages_queued', metrics.messagesQueued)
.intField('messages_processed', metrics.messagesProcessed)
.intField('messages_failed', metrics.messagesFailed)
.intField('messages_dropped_total', metrics.messagesDroppedTotal)
.intField('messages_dropped_rate_limit', metrics.messagesDroppedRateLimit)
.intField('messages_dropped_queue_full', metrics.messagesDroppedQueueFull)
.intField('messages_dropped_size', metrics.messagesDroppedSize)
.floatField('processing_time_avg_ms', metrics.processingTimeAvgMs)
.floatField('processing_time_p95_ms', metrics.processingTimeP95Ms)
.floatField('processing_time_max_ms', metrics.processingTimeMaxMs)
.intField('rate_limit_current', metrics.rateLimitCurrent)
.intField('backpressure_active', metrics.backpressureActive);
// Add static tags from config file
if (configTags && configTags.length > 0) {
for (const item of configTags) {
point.tag(item.name, item.value);
}
}
writeApi.writePoint(point);
await writeApi.close();
globals.logger.verbose(
'LOG EVENT QUEUE METRICS INFLUXDB: Sent queue metrics data to InfluxDB v2'
);
} catch (err) {
globals.logger.error(
`LOG EVENT QUEUE METRICS INFLUXDB: Error saving data to InfluxDB v2! ${err}`
);
return;
}
}
// Clear metrics after writing
await queueManager.clearMetrics();
} catch (err) {
globals.logger.error(
`LOG EVENT QUEUE METRICS INFLUXDB: Error posting queue metrics: ${err}`
);
}
}
/**
* Set up timers for storing UDP queue metrics to InfluxDB
*
* This function sets up separate intervals for user events and log events queue metrics
* based on their individual configurations. Each queue can have its own write frequency.
*
* @returns {object} Object containing interval IDs for both queues
*/
export function setupUdpQueueMetricsStorage() {
const intervalIds = {
userEvents: null,
logEvents: null,
};
// Check if InfluxDB is enabled
if (globals.config.get('Butler-SOS.influxdbConfig.enable') !== true) {
globals.logger.info(
'UDP QUEUE METRICS: InfluxDB is disabled. Skipping setup of queue metrics storage'
);
return intervalIds;
}
// Set up user events queue metrics storage
if (
globals.config.get('Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.enable') ===
true
) {
const writeFrequency = globals.config.get(
'Butler-SOS.userEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency'
);
intervalIds.userEvents = setInterval(async () => {
try {
globals.logger.verbose(
'UDP QUEUE METRICS: Timer for storing user event queue metrics to InfluxDB triggered'
);
await postUserEventQueueMetricsToInfluxdb();
} catch (err) {
globals.logger.error(
`UDP QUEUE METRICS: Error storing user event queue metrics to InfluxDB: ${err && err.stack ? err.stack : err}`
);
}
}, writeFrequency);
globals.logger.info(
`UDP QUEUE METRICS: Set up timer for storing user event queue metrics to InfluxDB (interval: ${writeFrequency}ms)`
);
} else {
globals.logger.info(
'UDP QUEUE METRICS: User event queue metrics storage to InfluxDB is disabled'
);
}
// Set up log events queue metrics storage
if (
globals.config.get('Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.enable') ===
true
) {
const writeFrequency = globals.config.get(
'Butler-SOS.logEvents.udpServerConfig.queueMetrics.influxdb.writeFrequency'
);
intervalIds.logEvents = setInterval(async () => {
try {
globals.logger.verbose(
'UDP QUEUE METRICS: Timer for storing log event queue metrics to InfluxDB triggered'
);
await postLogEventQueueMetricsToInfluxdb();
} catch (err) {
globals.logger.error(
`UDP QUEUE METRICS: Error storing log event queue metrics to InfluxDB: ${err && err.stack ? err.stack : err}`
);
}
}, writeFrequency);
globals.logger.info(
`UDP QUEUE METRICS: Set up timer for storing log event queue metrics to InfluxDB (interval: ${writeFrequency}ms)`
);
} else {
globals.logger.info(
'UDP QUEUE METRICS: Log event queue metrics storage to InfluxDB is disabled'
);
}
return intervalIds;
}

View File

@@ -15,7 +15,7 @@ export class UdpEvents {
/**
* Creates a new UdpEvents instance.
*
* @param {object} logger - The logger object to use for logging
* @param {object} logger - Logger instance with error, debug, info, and verbose methods
*/
constructor(logger) {
this.logger = logger;
@@ -51,7 +51,6 @@ export class UdpEvents {
* @param {string} event.host - The host where the event originated
* @param {string} event.subsystem - The subsystem that generated the event
* @returns {Promise<void>}
* @throws {Error} If the event object doesn't have the required properties
*/
async addLogEvent(event) {
// Ensure the passed event is an object with properties:
@@ -159,7 +158,6 @@ export class UdpEvents {
* @param {string} event.host - The host where the event originated
* @param {string} event.subsystem - The subsystem that generated the event
* @returns {Promise<void>}
* @throws {Error} If the event object doesn't have the required properties
*/
async addUserEvent(event) {
// Ensure the passed event is an object with properties:
@@ -211,7 +209,7 @@ export class UdpEvents {
/**
* Retrieves all tracked log events.
*
* @returns {Promise<Array>} Array of tracked log events
* @returns {Promise<Array<{source: string, host: string, subsystem: string, counter: number}>>} Array of tracked log events
*/
async getLogEvents() {
const release = await this.logMutex.acquire();
@@ -226,7 +224,7 @@ export class UdpEvents {
/**
* Retrieves all tracked rejected log events.
*
* @returns {Promise<Array>} Array of tracked rejected log events
* @returns {Promise<Array<{source: string, counter: number, appId?: string, appName?: string, method?: string, objectType?: string, processTime?: number}>>} Array of tracked rejected log events
*/
async getRejectedLogEvents() {
const release = await this.rejectedLogMutex.acquire();
@@ -240,7 +238,7 @@ export class UdpEvents {
/**
* Retrieves all tracked user events.
*
* @returns {Promise<Array>} Array of tracked user events
* @returns {Promise<Array<{source: string, host: string, subsystem: string, counter: number}>>} Array of tracked user events
*/
async getUserEvents() {
const release = await this.userMutex.acquire();
@@ -267,7 +265,6 @@ export class UdpEvents {
* @param {string} [event.objectType] - The object type (for performance log events)
* @param {number} [event.processTime] - The process time (for performance log events)
* @returns {Promise<void>}
* @throws {Error} If the event object doesn't have the required properties
*/
async addRejectedLogEvent(event) {
// Ensure the passed event is an object with properties:
@@ -361,7 +358,7 @@ export class UdpEvents {
* 2. Stores rejected event counts to InfluxDB
* 3. Clears event counters after they've been stored
*
* @param {Function} [callbackForTest] - Optional callback function used for testing
* @param {() => void} [callbackForTest] - Optional callback function used for testing
* @returns {number|undefined} The interval ID if the timer was set up, or undefined if disabled
*/
export function setupUdpEventsStorage(callbackForTest) {

View File

@@ -0,0 +1,469 @@
/**
* UDP Queue Manager
*
* Manages UDP message processing with queuing, rate limiting, and metrics tracking.
* Provides protection against message flooding and resource exhaustion.
*/
import PQueue from 'p-queue';
import { Mutex } from 'async-mutex';
/**
* Circular buffer for tracking processing times
*/
class CircularBuffer {
/**
* Create a circular buffer
*
* @param {number} size - Maximum number of items to store
*/
constructor(size) {
this.buffer = new Array(size);
this.size = size;
this.index = 0;
this.count = 0;
}
/**
* Add a value to the buffer
*
* @param {number} value - Value to add
*/
add(value) {
this.buffer[this.index] = value;
this.index = (this.index + 1) % this.size;
if (this.count < this.size) {
this.count++;
}
}
/**
* Get all values currently in the buffer
*
* @returns {number[]} Array of values
*/
getValues() {
if (this.count === 0) return [];
if (this.count < this.size) {
return this.buffer.slice(0, this.count);
}
// Buffer is full, need to reorder to get chronological order
return [...this.buffer.slice(this.index), ...this.buffer.slice(0, this.index)];
}
/**
* Calculate the 95th percentile of values in the buffer
*
* @returns {number|null} 95th percentile value or null if buffer is empty
*/
getPercentile95() {
if (this.count === 0) return null;
const values = this.getValues().sort((a, b) => a - b);
const index = Math.ceil(values.length * 0.95) - 1;
return values[index];
}
/**
* Calculate the average of values in the buffer
*
* @returns {number|null} Average value or null if buffer is empty
*/
getAverage() {
if (this.count === 0) return null;
const values = this.getValues();
const sum = values.reduce((acc, val) => acc + val, 0);
return sum / values.length;
}
/**
* Get the maximum value in the buffer
*
* @returns {number|null} Maximum value or null if buffer is empty
*/
getMax() {
if (this.count === 0) return null;
const values = this.getValues();
return Math.max(...values);
}
/**
* Clear the buffer
*
* @returns {void}
*/
clear() {
this.buffer = new Array(this.size);
this.index = 0;
this.count = 0;
}
}
/**
* Fixed-window rate limiter
*/
class RateLimiter {
/**
* Create a rate limiter
*
* @param {number} maxMessagesPerMinute - Maximum messages allowed per minute
*/
constructor(maxMessagesPerMinute) {
this.maxMessagesPerMinute = maxMessagesPerMinute;
this.messageCount = 0;
this.windowStart = Date.now();
}
/**
* Check if a message can be processed within the rate limit
*
* @returns {boolean} True if message can be processed, false otherwise
*/
checkLimit() {
const now = Date.now();
const windowDuration = 60000; // 1 minute in milliseconds
// Check if we need to reset the window
if (now - this.windowStart >= windowDuration) {
this.messageCount = 0;
this.windowStart = now;
}
// Check if we're under the limit
if (this.messageCount < this.maxMessagesPerMinute) {
this.messageCount++;
return true;
}
return false;
}
/**
* Get current rate (messages per minute in current window)
*
* @returns {number} Current message rate
*/
getCurrentRate() {
const now = Date.now();
const windowDuration = 60000;
// If window has expired, rate is 0
if (now - this.windowStart >= windowDuration) {
return 0;
}
// Calculate rate based on time elapsed in current window
const elapsedSeconds = (now - this.windowStart) / 1000;
if (elapsedSeconds === 0) return 0;
return Math.round((this.messageCount / elapsedSeconds) * 60);
}
}
/**
* Sanitize input field by removing control characters and limiting length
*
* @param {string} field - Field to sanitize
* @param {number} maxLength - Maximum length (default: 500)
* @returns {string} Sanitized field
*/
export function sanitizeField(field, maxLength = 500) {
if (typeof field !== 'string') {
return String(field).slice(0, maxLength);
}
return field
.replace(/[\x00-\x1F\x7F]/g, '') // Remove control characters
.slice(0, maxLength);
}
/**
* UDP Queue Manager
* Manages message queue, rate limiting, metrics tracking, and input validation
*/
export class UdpQueueManager {
/**
* Create a UDP queue manager
*
* @param {object} config - Configuration object
* @param {object} config.messageQueue - Queue configuration
* @param {number} config.messageQueue.maxConcurrent - Maximum concurrent operations
* @param {number} config.messageQueue.maxSize - Maximum queue size
* @param {number} config.messageQueue.backpressureThreshold - Backpressure threshold percentage
* @param {object} config.rateLimit - Rate limit configuration
* @param {boolean} config.rateLimit.enable - Enable rate limiting
* @param {number} config.rateLimit.maxMessagesPerMinute - Max messages per minute
* @param {number} config.maxMessageSize - Maximum message size in bytes
* @param {object} logger - Logger instance
* @param {string} queueType - Type of queue ('user_events' or 'log_events')
*/
constructor(config, logger, queueType) {
this.config = config;
this.logger = logger;
this.queueType = queueType;
// Initialize message queue
this.queue = new PQueue({
concurrency: config.messageQueue.maxConcurrent,
});
// Initialize rate limiter if enabled
this.rateLimiter = config.rateLimit.enable
? new RateLimiter(config.rateLimit.maxMessagesPerMinute)
: null;
// Initialize metrics tracking
this.metrics = {
messagesReceived: 0,
messagesQueued: 0,
messagesProcessed: 0,
messagesFailed: 0,
messagesDroppedTotal: 0,
messagesDroppedRateLimit: 0,
messagesDroppedQueueFull: 0,
messagesDroppedSize: 0,
};
// Circular buffer for processing times (last 1000 messages)
this.processingTimeBuffer = new CircularBuffer(1000);
// Mutex for thread-safe metrics updates
this.metricsMutex = new Mutex();
// Track backpressure state
this.backpressureActive = false;
this.lastBackpressureWarning = 0;
// Drop tracking for logging
this.droppedSinceLastLog = 0;
this.lastDropLog = Date.now();
}
/**
* Validate message size
*
* @param {Buffer|string} message - Message to validate
* @returns {boolean} True if message size is valid
*/
validateMessageSize(message) {
const size = Buffer.isBuffer(message) ? message.length : Buffer.byteLength(message);
return size <= this.config.maxMessageSize;
}
/**
* Check if rate limit allows processing
*
* @returns {boolean} True if message can be processed
*/
checkRateLimit() {
if (!this.rateLimiter) return true;
return this.rateLimiter.checkLimit();
}
/**
* Check backpressure and log warning if threshold exceeded
*
* @param {number} queueSize - The current queue size (captured while holding mutex)
* @returns {Promise<void>}
*/
async checkBackpressure(queueSize) {
const utilizationPercent = (queueSize / this.config.messageQueue.maxSize) * 100;
const threshold = this.config.messageQueue.backpressureThreshold;
if (utilizationPercent >= threshold && !this.backpressureActive) {
this.backpressureActive = true;
this.logger.warn(
`[UDP Queue] Backpressure detected for ${this.queueType}: Queue utilization ${utilizationPercent.toFixed(1)}% (threshold: ${threshold}%)`
);
} else if (utilizationPercent < threshold * 0.8 && this.backpressureActive) {
// Clear backpressure when utilization drops below 80% of threshold
this.backpressureActive = false;
this.logger.info(
`[UDP Queue] Backpressure cleared for ${this.queueType}: Queue utilization ${utilizationPercent.toFixed(1)}%`
);
}
// Log warning every 60 seconds if backpressure is active
const now = Date.now();
if (this.backpressureActive && now - this.lastBackpressureWarning > 60000) {
this.logger.warn(
`[UDP Queue] Backpressure continues for ${this.queueType}: Queue size ${queueSize}/${this.config.messageQueue.maxSize}`
);
this.lastBackpressureWarning = now;
}
}
/**
* Log dropped messages periodically (not individual messages)
*
* @returns {void}
*/
logDroppedMessages() {
const now = Date.now();
if (this.droppedSinceLastLog > 0 && now - this.lastDropLog > 60000) {
this.logger.warn(
`[UDP Queue] Dropped ${this.droppedSinceLastLog} messages for ${this.queueType} in the last minute`
);
this.droppedSinceLastLog = 0;
this.lastDropLog = now;
}
}
/**
* Add message to queue for processing
*
* @param {() => Promise<void>} processFunction - Async function that processes the message
* @returns {Promise<boolean>} True if message was queued, false if dropped
*/
async addToQueue(processFunction) {
let queueSize;
const release = await this.metricsMutex.acquire();
try {
this.metrics.messagesReceived++;
// Check if queue is full
if (this.queue.size >= this.config.messageQueue.maxSize) {
this.metrics.messagesDroppedTotal++;
this.metrics.messagesDroppedQueueFull++;
this.droppedSinceLastLog++;
this.logDroppedMessages();
return false;
}
this.metrics.messagesQueued++;
// Capture queue size while holding mutex to avoid race condition
queueSize = this.queue.size;
} finally {
release();
}
// Check backpressure with captured queue size
await this.checkBackpressure(queueSize);
// Add to queue
this.queue
.add(async () => {
const startTime = Date.now();
try {
await processFunction();
const processingTime = Date.now() - startTime;
const release2 = await this.metricsMutex.acquire();
try {
this.metrics.messagesProcessed++;
this.processingTimeBuffer.add(processingTime);
} finally {
release2();
}
} catch (error) {
const release2 = await this.metricsMutex.acquire();
try {
this.metrics.messagesFailed++;
} finally {
release2();
}
throw error;
}
})
.catch((error) => {
this.logger.error(
`[UDP Queue] Error processing message for ${this.queueType}: ${error.message}`
);
});
return true;
}
/**
* Handle message drop due to rate limiting
*
* @returns {Promise<void>}
*/
async handleRateLimitDrop() {
const release = await this.metricsMutex.acquire();
try {
this.metrics.messagesReceived++;
this.metrics.messagesDroppedTotal++;
this.metrics.messagesDroppedRateLimit++;
this.droppedSinceLastLog++;
} finally {
release();
}
this.logDroppedMessages();
}
/**
* Handle message drop due to size validation
*
* @returns {Promise<void>}
*/
async handleSizeDrop() {
const release = await this.metricsMutex.acquire();
try {
this.metrics.messagesReceived++;
this.metrics.messagesDroppedTotal++;
this.metrics.messagesDroppedSize++;
this.droppedSinceLastLog++;
} finally {
release();
}
this.logDroppedMessages();
}
/**
* Get current metrics
*
* @returns {Promise<object>} Current metrics
*/
async getMetrics() {
const release = await this.metricsMutex.acquire();
try {
return {
queueSize: this.queue.size,
queueMaxSize: this.config.messageQueue.maxSize,
queueUtilizationPct: (this.queue.size / this.config.messageQueue.maxSize) * 100,
queuePending: this.queue.pending,
messagesReceived: this.metrics.messagesReceived,
messagesQueued: this.metrics.messagesQueued,
messagesProcessed: this.metrics.messagesProcessed,
messagesFailed: this.metrics.messagesFailed,
messagesDroppedTotal: this.metrics.messagesDroppedTotal,
messagesDroppedRateLimit: this.metrics.messagesDroppedRateLimit,
messagesDroppedQueueFull: this.metrics.messagesDroppedQueueFull,
messagesDroppedSize: this.metrics.messagesDroppedSize,
processingTimeAvgMs: this.processingTimeBuffer.getAverage() || 0,
processingTimeP95Ms: this.processingTimeBuffer.getPercentile95() || 0,
processingTimeMaxMs: this.processingTimeBuffer.getMax() || 0,
rateLimitCurrent: this.rateLimiter ? this.rateLimiter.getCurrentRate() : 0,
backpressureActive: this.backpressureActive ? 1 : 0,
};
} finally {
release();
}
}
/**
* Clear metrics (called after writing to InfluxDB)
*
* @returns {Promise<void>}
*/
async clearMetrics() {
const release = await this.metricsMutex.acquire();
try {
this.metrics.messagesReceived = 0;
this.metrics.messagesQueued = 0;
this.metrics.messagesProcessed = 0;
this.metrics.messagesFailed = 0;
this.metrics.messagesDroppedTotal = 0;
this.metrics.messagesDroppedRateLimit = 0;
this.metrics.messagesDroppedQueueFull = 0;
this.metrics.messagesDroppedSize = 0;
this.processingTimeBuffer.clear();
} finally {
release();
}
}
}

View File

@@ -0,0 +1,329 @@
/**
* Tests for input sanitization in log event handlers
*/
import { describe, it, expect, beforeEach, jest } from '@jest/globals';
import { sanitizeField } from '../../../udp-queue-manager.js';
// Mock globals before importing handlers
jest.unstable_mockModule('../../../../globals.js', () => ({
default: {
logger: {
verbose: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
},
config: {
get: jest.fn().mockReturnValue(false), // Disable performance monitor by default
},
appNames: [],
},
}));
// Import handlers after mocking
const { processEngineEvent } = await import('../handlers/engine-handler.js');
const { processProxyEvent } = await import('../handlers/proxy-handler.js');
const { processRepositoryEvent } = await import('../handlers/repository-handler.js');
const { processSchedulerEvent } = await import('../handlers/scheduler-handler.js');
const { processQixPerfEvent } = await import('../handlers/qix-perf-handler.js');
describe('Log Event Handler Sanitization', () => {
beforeEach(() => {
jest.clearAllMocks();
});
describe('Engine Event Handler', () => {
it('should sanitize control characters in message field', () => {
const msg = [
'/qseow-engine/',
'1',
'2021-11-09T15:37:26.028+0200',
'2021-11-09 15:37:26,028',
'ERROR',
'hostname.example.com',
'System.Engine',
'DOMAIN\\user',
'Test message with\x00control\x1Fcharacters\x7F', // Field 8: message
'550e8400-e29b-41d4-a716-446655440000',
'INTERNAL',
'sa_scheduler',
'2021-11-09T15:37:26.028+0200',
'550e8400-e29b-41d4-a716-446655440000',
'12.345.0',
'2021-11-09T15:37:26.028+0200',
'Traffic',
'550e8400-e29b-41d4-a716-446655440000',
'550e8400-e29b-41d4-a716-446655440000',
];
const result = processEngineEvent(msg);
expect(result.message).not.toContain('\x00');
expect(result.message).not.toContain('\x1F');
expect(result.message).not.toContain('\x7F');
expect(result.message).toBe('Test message withcontrolcharacters');
});
it('should limit message length to 1000 characters', () => {
const longMessage = 'x'.repeat(2000);
const msg = [
'/qseow-engine/',
'1',
'2021-11-09T15:37:26.028+0200',
'2021-11-09 15:37:26,028',
'ERROR',
'hostname.example.com',
'System.Engine',
'DOMAIN\\user',
longMessage,
'550e8400-e29b-41d4-a716-446655440000',
'INTERNAL',
'sa_scheduler',
'2021-11-09T15:37:26.028+0200',
'550e8400-e29b-41d4-a716-446655440000',
'12.345.0',
'2021-11-09T15:37:26.028+0200',
'Traffic',
'550e8400-e29b-41d4-a716-446655440000',
'550e8400-e29b-41d4-a716-446655440000',
];
const result = processEngineEvent(msg);
expect(result.message).toHaveLength(1000);
});
it('should sanitize all string fields', () => {
const msg = [
'/qseow-engine/\x00',
'1',
'2021-11-09T15:37:26.028+0200',
'2021-11-09 15:37:26,028',
'ERROR\x01',
'hostname\x02.example.com',
'System.Engine\x03',
'DOMAIN\\user\x04',
'Message\x05',
'550e8400-e29b-41d4-a716-446655440000',
'INTERNAL\x06',
'sa_scheduler\x07',
'2021-11-09T15:37:26.028+0200',
'550e8400-e29b-41d4-a716-446655440000',
'12.345.0\x08',
'2021-11-09T15:37:26.028+0200',
'Traffic\x09',
'550e8400-e29b-41d4-a716-446655440000',
'550e8400-e29b-41d4-a716-446655440000',
];
const result = processEngineEvent(msg);
// Check no control characters remain
expect(result.source).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.level).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.host).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.subsystem).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.windows_user).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.message).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.user_directory).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.user_id).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.engine_exe_version).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.entry_type).not.toMatch(/[\x00-\x1F\x7F]/);
});
});
describe('Proxy Event Handler', () => {
it('should sanitize exception message field', () => {
const msg = [
'/qseow-proxy/',
'1',
'2021-11-09T15:37:26.028+0200',
'2021-11-09 15:37:26,028',
'ERROR',
'hostname.example.com',
'Service.Proxy',
'DOMAIN\\user',
'Test message',
'Exception:\x00Test\x1FError\x7F', // Field 9: exception_message
'INTERNAL',
'sa_scheduler',
'TestCommand',
'500',
'Origin',
'/context',
];
const result = processProxyEvent(msg);
expect(result.exception_message).not.toContain('\x00');
expect(result.exception_message).not.toContain('\x1F');
expect(result.exception_message).not.toContain('\x7F');
});
});
describe('Repository Event Handler', () => {
it('should sanitize command and result_code fields', () => {
const msg = [
'/qseow-repository/',
'1',
'2021-11-09T15:37:26.028+0200',
'2021-11-09 15:37:26,028',
'WARN',
'hostname.example.com',
'Service.Repository',
'DOMAIN\\user',
'Test message',
'Exception message',
'INTERNAL',
'sa_scheduler',
'Check\x00service\x01status', // Field 12: command
'500\x02', // Field 13: result_code
'Origin',
'/context',
];
const result = processRepositoryEvent(msg);
expect(result.command).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.result_code).not.toMatch(/[\x00-\x1F\x7F]/);
});
});
describe('Scheduler Event Handler', () => {
it('should sanitize task and app names', () => {
const msg = [
'/qseow-scheduler/',
'1',
'2021-11-09T19:37:44.331+0100',
'2021-11-09 19:37:44,331',
'ERROR',
'hostname.example.com',
'System.Scheduler',
'DOMAIN\\user',
'Reload failed',
'Exception',
'LAB',
'goran',
'LAB\\goran',
'Task\x00Name\x01Test', // Field 13: task_name
'App\x02Name\x03Test', // Field 14: app_name
'dec2a02a-1680-44ef-8dc2-e2bfb180af87',
'e7af59a0-c243-480d-9571-08727551a66f',
'4831c6a5-34f6-45bb-9d40-73a6e6992670',
];
const result = processSchedulerEvent(msg);
expect(result.task_name).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.app_name).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.task_name).toBe('TaskNameTest');
expect(result.app_name).toBe('AppNameTest');
});
});
describe('QIX Performance Event Handler', () => {
it('should sanitize method and object_type fields', () => {
const msg = [
'/qseow-qix-perf/',
'1',
'2021-11-09T19:37:44.331+0100',
'2021-11-09 19:37:44,331',
'INFO',
'hostname.example.com',
'System.Engine',
'DOMAIN\\user',
'550e8400-e29b-41d4-a716-446655440000',
'LAB',
'goran',
'2021-11-09T19:37:44.331+01:00',
'550e8400-e29b-41d4-a716-446655440000',
'550e8400-e29b-41d4-a716-446655440000',
'123',
'Global::\x00OpenApp\x01', // Field 15: method
'100',
'90',
'5',
'3',
'2',
'1',
'objId123',
'1024000',
'2048000',
'linechart\x02', // Field 25: object_type
];
const result = processQixPerfEvent(msg);
if (result) {
expect(result.method).not.toMatch(/[\x00-\x1F\x7F]/);
expect(result.object_type).not.toMatch(/[\x00-\x1F\x7F]/);
}
});
});
describe('sanitizeField edge cases', () => {
it('should handle null values', () => {
const result = sanitizeField(null);
expect(result).toBe('null');
});
it('should handle undefined values', () => {
const result = sanitizeField(undefined);
expect(result).toBe('undefined');
});
it('should handle numbers', () => {
const result = sanitizeField(12345);
expect(result).toBe('12345');
});
it('should handle objects by converting to string', () => {
const result = sanitizeField({ test: 'value' });
expect(result).toContain('[object Object]');
});
it('should handle arrays by converting to string', () => {
const result = sanitizeField([1, 2, 3]);
expect(result).toBe('1,2,3');
});
it('should remove tab characters', () => {
const result = sanitizeField('text\twith\ttabs');
expect(result).toBe('textwithtabs');
});
it('should remove all ASCII control characters', () => {
// Test characters from 0x00 to 0x1F and 0x7F
let input = '';
for (let i = 0; i <= 0x1f; i++) {
input += String.fromCharCode(i);
}
input += String.fromCharCode(0x7f);
input += 'ValidText';
const result = sanitizeField(input);
expect(result).toBe('ValidText');
});
it('should preserve unicode characters', () => {
const result = sanitizeField('Hello 世界 🌍 Привет');
expect(result).toBe('Hello 世界 🌍 Привет');
});
it('should handle very long strings efficiently', () => {
const longString = 'a'.repeat(10000);
const result = sanitizeField(longString, 500);
expect(result).toHaveLength(500);
});
});
describe('Field length limits', () => {
it('should respect different max lengths for different fields', () => {
// Source: 100 chars
expect(sanitizeField('x'.repeat(200), 100)).toHaveLength(100);
// Message: 1000 chars
expect(sanitizeField('x'.repeat(2000), 1000)).toHaveLength(1000);
// Subsystem: 200 chars
expect(sanitizeField('x'.repeat(300), 200)).toHaveLength(200);
// Level: 20 chars
expect(sanitizeField('x'.repeat(50), 20)).toHaveLength(20);
});
});
});

View File

@@ -4,6 +4,7 @@
import globals from '../../../../globals.js';
import { isoDateRegex, uuidRegex, formatUserFields } from '../utils/common-utils.js';
import { sanitizeField } from '../../../udp-queue-manager.js';
/**
* Process engine log events
@@ -54,26 +55,26 @@ export function processEngineEvent(msg) {
// session_id: uuid
// app_id: uuid
const msgObj = {
source: msg[0],
source: sanitizeField(msg[0], 100),
log_row:
Number.isInteger(parseInt(msg[1], 10)) && parseInt(msg[1], 10) > 0
? parseInt(msg[1], 10)
: -1,
ts_iso: isoDateRegex.test(msg[2]) ? msg[2] : '',
ts_local: isoDateRegex.test(msg[3]) ? msg[3] : '',
level: msg[4],
host: msg[5],
subsystem: msg[6],
windows_user: msg[7],
message: msg[8],
ts_iso: isoDateRegex.test(msg[2]) ? sanitizeField(msg[2], 50) : '',
ts_local: isoDateRegex.test(msg[3]) ? sanitizeField(msg[3], 50) : '',
level: sanitizeField(msg[4], 20),
host: sanitizeField(msg[5], 100),
subsystem: sanitizeField(msg[6], 200),
windows_user: sanitizeField(msg[7], 100),
message: sanitizeField(msg[8], 1000),
proxy_session_id: uuidRegex.test(msg[9]) ? msg[9] : '',
user_directory: msg[10],
user_id: msg[11],
engine_ts: msg[12],
user_directory: sanitizeField(msg[10], 100),
user_id: sanitizeField(msg[11], 100),
engine_ts: sanitizeField(msg[12], 50),
process_id: uuidRegex.test(msg[13]) ? msg[13] : '',
engine_exe_version: msg[14],
server_started: msg[15],
entry_type: msg[16],
engine_exe_version: sanitizeField(msg[14], 50),
server_started: sanitizeField(msg[15], 50),
entry_type: sanitizeField(msg[16], 50),
session_id: uuidRegex.test(msg[17]) ? msg[17] : '',
app_id: uuidRegex.test(msg[18]) ? msg[18] : '',
};

View File

@@ -4,6 +4,7 @@
import globals from '../../../../globals.js';
import { isoDateRegex, formatUserFields } from '../utils/common-utils.js';
import { sanitizeField } from '../../../udp-queue-manager.js';
/**
* Process proxy log events
@@ -33,22 +34,22 @@ export function processProxyEvent(msg) {
globals.logger.verbose(`LOG EVENT: ${msg[0]}:${msg[5]}:${msg[4]}, ${msg[6]}, Msg: ${msg[8]}`);
const msgObj = {
source: msg[0],
source: sanitizeField(msg[0], 100),
log_row: Number.isInteger(parseInt(msg[1], 10)) ? parseInt(msg[1], 10) : -1,
ts_iso: isoDateRegex.test(msg[2]) ? msg[2] : '',
ts_local: isoDateRegex.test(msg[3]) ? msg[3] : '',
level: msg[4],
host: msg[5],
subsystem: msg[6],
windows_user: msg[7],
message: msg[8],
exception_message: msg[9],
user_directory: msg[10],
user_id: msg[11],
command: msg[12],
result_code: msg[13],
origin: msg[14],
context: msg[15],
ts_iso: isoDateRegex.test(msg[2]) ? sanitizeField(msg[2], 50) : '',
ts_local: isoDateRegex.test(msg[3]) ? sanitizeField(msg[3], 50) : '',
level: sanitizeField(msg[4], 20),
host: sanitizeField(msg[5], 100),
subsystem: sanitizeField(msg[6], 200),
windows_user: sanitizeField(msg[7], 100),
message: sanitizeField(msg[8], 1000),
exception_message: sanitizeField(msg[9], 1000),
user_directory: sanitizeField(msg[10], 100),
user_id: sanitizeField(msg[11], 100),
command: sanitizeField(msg[12], 200),
result_code: sanitizeField(msg[13], 50),
origin: sanitizeField(msg[14], 200),
context: sanitizeField(msg[15], 200),
};
formatUserFields(msgObj);

View File

@@ -5,6 +5,7 @@
import globals from '../../../../globals.js';
import { uuidRegex, formatUserFields } from '../utils/common-utils.js';
import { processAppSpecificFilters, processAllAppsFilters } from '../filters/qix-perf-filters.js';
import { sanitizeField } from '../../../udp-queue-manager.js';
/**
* Process QIX performance log events
@@ -151,23 +152,26 @@ export function processQixPerfEvent(msg) {
// Event matches filters in the configuration. Continue.
// Build the event object
const msgObj = {
source: msg[0],
source: sanitizeField(msg[0], 100),
log_row: Number.isInteger(parseInt(msg[1], 10)) ? parseInt(msg[1], 10) : -1,
ts_iso: msg[2],
ts_local: msg[3],
level: msg[4],
host: msg[5],
subsystem: msg[6],
windows_user: msg[7],
ts_iso: sanitizeField(msg[2], 50),
ts_local: sanitizeField(msg[3], 50),
level: sanitizeField(msg[4], 20),
host: sanitizeField(msg[5], 100),
subsystem: sanitizeField(msg[6], 200),
windows_user: sanitizeField(msg[7], 100),
proxy_session_id: uuidRegex.test(msg[8]) ? msg[8] : '',
user_directory: msg[9],
user_id: msg[10],
engine_ts: msg[11],
user_directory: sanitizeField(msg[9], 100),
user_id: sanitizeField(msg[10], 100),
engine_ts: sanitizeField(msg[11], 50),
session_id: uuidRegex.test(msg[12]) ? msg[12] : '',
app_id: uuidRegex.test(msg[13]) ? msg[13] : '',
app_name: eventAppName,
request_id: msg[14], // Request ID is an integer >= 0, set to -99 otherwise
method: msg[15],
app_name: sanitizeField(eventAppName, 200),
request_id:
Number.isInteger(parseInt(msg[14], 10)) && parseInt(msg[14], 10) >= 0
? parseInt(msg[14], 10)
: -99, // Request ID is an integer >= 0, set to -99 otherwise
method: sanitizeField(msg[15], 100),
// Processtime in float milliseconds
process_time: parseFloat(msg[16]),
work_time: parseFloat(msg[17]),
@@ -176,7 +180,7 @@ export function processQixPerfEvent(msg) {
traverse_time: parseFloat(msg[20]),
// Handle is either -1 or a number. Set to -99 if not a number
handle: Number.isInteger(parseInt(msg[21], 10)) ? parseInt(msg[21], 10) : -99,
object_id: msg[22],
object_id: sanitizeField(msg[22], 100),
// Positive integer, set to -1 if not am integer >= 0
net_ram:
Number.isInteger(parseInt(msg[23], 10)) && parseInt(msg[23], 10) >= 0
@@ -186,8 +190,8 @@ export function processQixPerfEvent(msg) {
Number.isInteger(parseInt(msg[24], 10)) && parseInt(msg[24], 10) >= 0
? parseInt(msg[24], 10)
: -1,
object_type: msg[25],
event_activity_source: eventActivitySource,
object_type: sanitizeField(msg[25], 100),
event_activity_source: sanitizeField(eventActivitySource, 50),
};
formatUserFields(msgObj);

View File

@@ -4,6 +4,7 @@
import globals from '../../../../globals.js';
import { isoDateRegex, formatUserFields } from '../utils/common-utils.js';
import { sanitizeField } from '../../../udp-queue-manager.js';
/**
* Process repository log events
@@ -33,22 +34,22 @@ export function processRepositoryEvent(msg) {
globals.logger.verbose(`LOG EVENT: ${msg[0]}:${msg[5]}:${msg[4]}, ${msg[6]}, Msg: ${msg[8]}`);
const msgObj = {
source: msg[0],
source: sanitizeField(msg[0], 100),
log_row: Number.isInteger(parseInt(msg[1], 10)) ? parseInt(msg[1], 10) : -1,
ts_iso: isoDateRegex.test(msg[2]) ? msg[2] : '',
ts_local: isoDateRegex.test(msg[3]) ? msg[3] : '',
level: msg[4],
host: msg[5],
subsystem: msg[6],
windows_user: msg[7],
message: msg[8],
exception_message: msg[9],
user_directory: msg[10],
user_id: msg[11],
command: msg[12],
result_code: msg[13],
origin: msg[14],
context: msg[15],
ts_iso: isoDateRegex.test(msg[2]) ? sanitizeField(msg[2], 50) : '',
ts_local: isoDateRegex.test(msg[3]) ? sanitizeField(msg[3], 50) : '',
level: sanitizeField(msg[4], 20),
host: sanitizeField(msg[5], 100),
subsystem: sanitizeField(msg[6], 200),
windows_user: sanitizeField(msg[7], 100),
message: sanitizeField(msg[8], 1000),
exception_message: sanitizeField(msg[9], 1000),
user_directory: sanitizeField(msg[10], 100),
user_id: sanitizeField(msg[11], 100),
command: sanitizeField(msg[12], 200),
result_code: sanitizeField(msg[13], 50),
origin: sanitizeField(msg[14], 200),
context: sanitizeField(msg[15], 200),
};
formatUserFields(msgObj);

View File

@@ -4,6 +4,7 @@
import globals from '../../../../globals.js';
import { isoDateRegex, uuidRegex, formatUserFields } from '../utils/common-utils.js';
import { sanitizeField } from '../../../udp-queue-manager.js';
/**
* Process scheduler log events
@@ -35,21 +36,21 @@ export function processSchedulerEvent(msg) {
globals.logger.verbose(`LOG EVENT: ${msg[0]}:${msg[5]}:${msg[4]}, ${msg[6]}, Msg: ${msg[8]}`);
const msgObj = {
source: msg[0],
source: sanitizeField(msg[0], 100),
log_row: Number.isInteger(parseInt(msg[1], 10)) ? parseInt(msg[1], 10) : -1,
ts_iso: isoDateRegex.test(msg[2]) ? msg[2] : '',
ts_local: isoDateRegex.test(msg[3]) ? msg[3] : '',
level: msg[4],
host: msg[5],
subsystem: msg[6],
windows_user: msg[7],
message: msg[8],
exception_message: msg[9],
user_directory: msg[10],
user_id: msg[11],
user_full: msg[12],
task_name: msg[13],
app_name: msg[14],
ts_iso: isoDateRegex.test(msg[2]) ? sanitizeField(msg[2], 50) : '',
ts_local: isoDateRegex.test(msg[3]) ? sanitizeField(msg[3], 50) : '',
level: sanitizeField(msg[4], 20),
host: sanitizeField(msg[5], 100),
subsystem: sanitizeField(msg[6], 200),
windows_user: sanitizeField(msg[7], 100),
message: sanitizeField(msg[8], 1000),
exception_message: sanitizeField(msg[9], 1000),
user_directory: sanitizeField(msg[10], 100),
user_id: sanitizeField(msg[11], 100),
user_full: sanitizeField(msg[12], 200),
task_name: sanitizeField(msg[13], 200),
app_name: sanitizeField(msg[14], 200),
task_id: uuidRegex.test(msg[15]) ? msg[15] : '',
app_id: uuidRegex.test(msg[16]) ? msg[16] : '',
execution_id: uuidRegex.test(msg[17]) ? msg[17] : '',

View File

@@ -126,6 +126,10 @@ export async function messageEventHandler(message, _remote) {
globals.logger.debug('LOG EVENT: Calling log event New Relic posting method');
postLogEventToNewRelic(msgObj);
}
} else {
globals.logger.debug(
`LOG EVENT: Log event source not recognized or not enabled in configuration, skipping message: ${msgParts[0]}`
);
}
} catch (err) {
globals.logger.error(`LOG EVENT: Error handling message: ${globals.getErrorMessage(err)}`);

View File

@@ -39,6 +39,7 @@ export function formatUserFields(msgObj) {
// Combine them into a single field
msgObj.user_full = `${msgObj.user_directory}\\${msgObj.user_id}`;
} else if (
msgObj.user_full &&
msgObj.user_full !== '' &&
msgObj.user_directory === '' &&
msgObj.user_id === '' &&

View File

@@ -1,16 +1,19 @@
// filepath: /Users/goran/code/butler-sos/src/lib/udp_handlers/user_events/__tests__/message-event.test.js
import { jest, describe, test, expect, beforeEach, afterEach } from '@jest/globals';
// Mock globals module
jest.unstable_mockModule('../../../../globals.js', () => {
const mockGlobals = {
logger: {
error: jest.fn(),
warn: jest.fn(),
info: jest.fn(),
verbose: jest.fn(),
debug: jest.fn(),
},
// Mock globals module - we only set up the structure, individual tests configure behavior
const mockLogger = {
error: jest.fn(),
warn: jest.fn(),
info: jest.fn(),
verbose: jest.fn(),
debug: jest.fn(),
silly: jest.fn(),
};
jest.unstable_mockModule('../../../../globals.js', () => ({
default: {
logger: mockLogger,
config: {
get: jest.fn(),
has: jest.fn(),
@@ -20,10 +23,8 @@ jest.unstable_mockModule('../../../../globals.js', () => {
},
appNames: [],
getErrorMessage: jest.fn().mockImplementation((err) => err.toString()),
};
return { default: mockGlobals };
});
},
}));
// Mock UAParser
jest.unstable_mockModule('ua-parser-js', () => ({
@@ -112,7 +113,7 @@ describe('messageEventHandler', () => {
await messageEventHandler(message, {});
expect(globals.logger.debug).toHaveBeenCalledWith(
expect(globals.logger.silly).toHaveBeenCalledWith(
expect.stringContaining('USER EVENT (raw):')
);
expect(globals.logger.verbose).toHaveBeenCalledWith(
@@ -133,7 +134,7 @@ describe('messageEventHandler', () => {
await messageEventHandler(message, {});
expect(globals.logger.verbose).toHaveBeenCalledWith(
'USER EVENT: /qseow-proxy-session/ - testuser2 - /app/87654321-4321-4321-4321-cba987654321'
expect.stringContaining('USER EVENT:')
);
expect(globals.udpEvents.addUserEvent).toHaveBeenCalledWith({
source: 'qseow-proxy-session',

View File

@@ -3,6 +3,7 @@ import { UAParser } from 'ua-parser-js';
// Load global variables and functions
import globals from '../../../globals.js';
import { sanitizeField } from '../../udp-queue-manager.js';
import { postUserEventToInfluxdb } from '../../post-to-influxdb.js';
import { postUserEventToNewRelic } from '../../post-to-new-relic.js';
import { postUserEventToMQTT } from '../../post-to-mqtt.js';
@@ -120,14 +121,14 @@ export async function messageEventHandler(message, _remote) {
let msgObj;
if (msg[0] === 'qseow-proxy-connection' || msg[0] === 'qseow-proxy-session') {
msgObj = {
messageType: msg[0],
host: msg[1],
command: msg[2],
user_directory: msg[3],
user_id: msg[4],
origin: msg[5],
context: msg[6],
message: msg[7],
messageType: sanitizeField(msg[0], 100),
host: sanitizeField(msg[1], 100),
command: sanitizeField(msg[2], 100),
user_directory: sanitizeField(msg[3], 100),
user_id: sanitizeField(msg[4], 100),
origin: sanitizeField(msg[5], 200),
context: sanitizeField(msg[6], 500),
message: sanitizeField(msg[7], 1000),
};
// Different log events deliver QSEoW user directory/user differently.

View File

@@ -10,7 +10,8 @@ import { listeningEventHandler, messageEventHandler } from './udp_handlers/log_e
*
* This function sets up event handlers for the UDP server that listens for
* log events from Qlik Sense services (such as engine, proxy, repository,
* and scheduler services).
* and scheduler services). It also adds queue management, rate limiting,
* and error handling capabilities.
*
* @returns {void}
*/
@@ -19,5 +20,56 @@ export function udpInitLogEventServer() {
globals.udpServerLogEvents.socket.on('listening', listeningEventHandler);
// Handler for UDP messages relating to log events
globals.udpServerLogEvents.socket.on('message', messageEventHandler);
globals.udpServerLogEvents.socket.on('message', async (message, remote) => {
try {
globals.logger.debug(`[UDP LOG EVENT MSG] !!! RAW MESSAGE EVENT !!!`);
globals.logger.debug(`[UDP LOG EVENT MSG] From: ${remote.address}:${remote.port}`);
globals.logger.debug(`[UDP LOG EVENT MSG] Length: ${message.length} bytes`);
globals.logger.debug(
`[UDP LOG EVENT MSG] First 200 chars: ${message.toString().substring(0, 200)}`
);
globals.logger.debug(`[UDP LOG EVENT MSG] ---`);
// Get queue manager
const queueManager = globals.udpQueueManagerLogEvents;
// Validate message size
if (!queueManager.validateMessageSize(message)) {
globals.logger.warn(
`[UDP Queue] Log event message exceeds size limit: ${message.length} bytes`
);
await queueManager.handleSizeDrop();
return;
}
// Check rate limit if enabled
if (!queueManager.checkRateLimit()) {
await queueManager.handleRateLimitDrop();
return;
}
// Add message to queue for processing
const queued = await queueManager.addToQueue(async () => {
await messageEventHandler(message, remote);
});
if (!queued) {
globals.logger.debug(`[UDP Queue] Log event message dropped due to full queue`);
}
} catch (err) {
globals.logger.error(
`[UDP Queue] Error handling log event message: ${globals.getErrorMessage(err)}`
);
}
});
// Handler for UDP server errors
globals.udpServerLogEvents.socket.on('error', (err) => {
globals.logger.error(`[UDP] Log events server error: ${globals.getErrorMessage(err)}`);
});
// Handler for UDP server close event
globals.udpServerLogEvents.socket.on('close', () => {
globals.logger.warn('[UDP] Log events server socket closed');
});
}

View File

@@ -10,7 +10,8 @@ import { listeningEventHandler, messageEventHandler } from './udp_handlers/user_
*
* This function sets up event handlers for the UDP server that listens for
* user activity events from Qlik Sense (such as session start/stop and
* connection open/close events).
* connection open/close events). It also adds queue management, rate limiting,
* and error handling capabilities.
*
* @returns {void}
*/
@@ -19,5 +20,48 @@ export function udpInitUserActivityServer() {
globals.udpServerUserActivity.socket.on('listening', listeningEventHandler);
// Handler for UDP messages relating to user activity events
globals.udpServerUserActivity.socket.on('message', messageEventHandler);
globals.udpServerUserActivity.socket.on('message', async (message, remote) => {
try {
// Get queue manager
const queueManager = globals.udpQueueManagerUserActivity;
// Validate message size
if (!queueManager.validateMessageSize(message)) {
globals.logger.warn(
`[UDP Queue] User activity message exceeds size limit: ${message.length} bytes`
);
await queueManager.handleSizeDrop();
return;
}
// Check rate limit if enabled
if (!queueManager.checkRateLimit()) {
await queueManager.handleRateLimitDrop();
return;
}
// Add message to queue for processing
const queued = await queueManager.addToQueue(async () => {
await messageEventHandler(message, remote);
});
if (!queued) {
globals.logger.debug(`[UDP Queue] User activity message dropped due to full queue`);
}
} catch (err) {
globals.logger.error(
`[UDP Queue] Error handling user activity message: ${globals.getErrorMessage(err)}`
);
}
});
// Handler for UDP server errors
globals.udpServerUserActivity.socket.on('error', (err) => {
globals.logger.error(`[UDP] User activity server error: ${globals.getErrorMessage(err)}`);
});
// Handler for UDP server close event
globals.udpServerUserActivity.socket.on('close', () => {
globals.logger.warn('[UDP] User activity server socket closed');
});
}