Event Management Queue System
This document explains how Redis queues are used in the KillIT v3 Event Management system and how to extend them.
Overview
The event management system uses Redis-backed Bull queues for asynchronous processing of computationally intensive or time-consuming operations. This ensures fast event ingestion while allowing complex processing to happen in the background.
Current Queue Implementation
1. Event Correlation Queue (event-correlation)
- Purpose: Groups related events based on multiple correlation strategies
- Worker:
eventCorrelationWorker.js - Triggered: After every event ingestion
- Strategies:
- Temporal correlation (5-minute window)
- Topology correlation (CMDB relationships)
- Pattern correlation (same signature)
- Service correlation (same service/app)
2. Event AI Analysis Queue (event-ai-analysis)
- Purpose: Analyzes events using AI for anomaly detection and insights
- Worker:
eventAIAnalysisWorker.js - Triggered: For critical and major severity events
- Features:
- Anomaly scoring (0-100)
- Root cause analysis
- Suggested remediation actions
- Pattern identification
3. Event Enrichment Queue (event-enrichment)
- Purpose: Enriches events with additional context
- Status: Initialized but not actively used
- Potential Use: CI details, historical context, business impact
Queue Architecture
Event Ingestion
│
├──> Save Event to MongoDB
│
├──> Queue for Correlation
│ └──> eventCorrelationWorker
│ └──> Updates correlationId
│
└──> Queue for AI Analysis (if critical/major)
└──> eventAIAnalysisWorker
└──> Updates aiAnalysis field
Adding New Queue Processors
1. Create a New Worker
// workers/eventNotificationWorker.js
const queueService = require('../services/queueService');
const Event = require('../models/event');
const logger = require('../utils/logger');
async function processEventNotification(job) {
const { eventId, notificationType } = job.data;
try {
logger.info('Processing event notification', { eventId, notificationType });
// Your processing logic here
const event = await Event.findById(eventId);
// Send notifications based on type
// ...
await job.progress(100);
return { success: true, eventId };
} catch (error) {
logger.error('Event notification failed', { error: error.message });
throw error;
}
}
async function initializeWorker() {
await queueService.processQueue('event-notification', processEventNotification);
logger.info('Event notification worker initialized');
}
module.exports = { processEventNotification, initializeWorker };
2. Add Queue to Event Ingestion
// In eventIngestionService.js
async queueForNotification(event) {
try {
// Determine if notification is needed
if (event.severity === 'critical' || event.businessImpact?.score > 8) {
await this.notificationQueue.add({
eventId: event._id,
notificationType: 'critical-alert'
}, {
priority: 1,
attempts: 5
});
}
} catch (error) {
logger.error('Queue for notification error:', error);
}
}
3. Initialize Worker in Server
// In server.js
if (process.env.ENABLE_EVENT_WORKERS !== 'false') {
try {
const { initializeWorker } = require('./workers/eventNotificationWorker');
await initializeWorker();
logger.info('Event notification worker initialized');
} catch (error) {
logger.error('Failed to initialize notification worker:', error);
}
}
Queue Configuration
Environment Variables
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-password
# Queue Configuration
QUEUE_PREFIX=killit-v3
MAX_CONCURRENT_JOBS=5
ENABLE_EVENT_WORKERS=true
Job Options
{
attempts: 3, // Retry failed jobs 3 times
backoff: {
type: 'exponential', // Exponential backoff
delay: 2000 // Start with 2 second delay
},
removeOnComplete: true, // Clean up completed jobs
removeOnFail: false, // Keep failed jobs for debugging
timeout: 60000 // 60 second timeout
}
Monitoring Queues
Bull Board UI
Access the queue monitoring dashboard at: http://localhost:8080/admin/queues
Features:
- View job counts (waiting, active, completed, failed)
- Inspect individual job data
- Retry failed jobs
- Clean old jobs
Programmatic Monitoring
const queue = queueService.getQueue('event-correlation');
// Get queue statistics
const jobCounts = await queue.getJobCounts();
console.log('Waiting:', jobCounts.waiting);
console.log('Active:', jobCounts.active);
console.log('Failed:', jobCounts.failed);
// Get specific job
const job = await queue.getJob(jobId);
const state = await job.getState();
Best Practices
1. Queue Naming
- Use descriptive names:
event-{action} - Examples:
event-notification,event-archive,event-remediation
2. Job Data
- Keep job data minimal (store IDs, not full objects)
- Include all necessary context for processing
- Add metadata for debugging (timestamp, userId, etc.)
3. Error Handling
- Always wrap processing in try-catch
- Log errors with context
- Use appropriate retry strategies
- Consider dead letter queues for permanent failures
4. Performance
- Set appropriate concurrency limits
- Use job priorities for critical events
- Implement rate limiting for external API calls
- Monitor memory usage in workers
5. Testing
// Test queue processing
const mockJob = {
id: 'test-123',
data: { eventId: 'event-456' },
progress: jest.fn(),
log: jest.fn()
};
await processEventNotification(mockJob);
expect(mockJob.progress).toHaveBeenCalledWith(100);
Potential Queue Expansions
1. Event Notification Queue
- Send emails, SMS, Slack messages
- Escalation handling
- Notification preferences per user/team
2. Event Remediation Queue
- Trigger automated fixes
- Execute runbooks
- Integration with automation tools
3. Event Metrics Queue
- Calculate MTTR, MTTA
- Update dashboards
- Generate reports
4. Event Integration Queue
- Sync with external ITSM tools
- Webhook deliveries
- Third-party integrations
5. Event Archive Queue
- Move old events to cold storage
- Compress event data
- Generate audit logs
6. Event ML Training Queue
- Prepare training data
- Update ML models
- Improve correlation accuracy
Troubleshooting
Common Issues
-
Jobs Not Processing
- Check Redis connection
- Verify worker is running
- Check for errors in logs
-
High Memory Usage
- Reduce concurrent job limit
- Implement job batching
- Check for memory leaks
-
Jobs Failing Repeatedly
- Check job data validity
- Verify external dependencies
- Review error logs
Debug Commands
# Check Redis connection
redis-cli ping
# Monitor Redis commands
redis-cli monitor
# Check queue keys
redis-cli keys "bull:killit-v3:*"
# Get queue job count
redis-cli llen "bull:killit-v3:event-correlation:wait"
Conclusion
The Redis queue system provides a robust foundation for asynchronous event processing. By following these patterns and best practices, you can extend the system to handle new requirements while maintaining performance and reliability.