SecureWatch Data Ingestion Guide v2.1.0¶
📋 Documentation Navigation: Main README | Quick Start | Deployment Guide | Log Formats
📋 Overview¶
SecureWatch provides comprehensive, enterprise-grade data ingestion capabilities designed to rival Splunk’s data collection and processing features. Our modular architecture supports multiple ingestion methods, protocols, and data formats with high-performance processing and guaranteed delivery.
🚀 Ingestion Methods¶
1. HTTP Event Collector (HEC) Service¶
Port: 8888
Protocol: HTTP/HTTPS
Compatibility: Splunk HEC API compatible
Features¶
Token-based Authentication: Secure API access with configurable tokens
Multi-format Support: JSON events, raw data, batch processing
Rate Limiting: Configurable request limits and throttling
High Performance: Async processing with Kafka integration
Health Monitoring: Built-in health checks and metrics
Usage Examples¶
# Single Event
curl -X POST http://localhost:8888/services/collector \
-H "Authorization: Splunk <YOUR_TOKEN>" \
-H "Content-Type: application/json" \
-d '{"event": {"message": "User login", "user": "john.doe"}}'
# Batch Events
curl -X POST http://localhost:8888/services/collector \
-H "Authorization: Splunk <YOUR_TOKEN>" \
-H "Content-Type: application/json" \
-d '{"event": {"message": "Event 1"}}
{"event": {"message": "Event 2"}}'
# Raw Data
curl -X POST http://localhost:8888/services/collector/raw \
-H "Authorization: Splunk <YOUR_TOKEN>" \
-H "Content-Type: text/plain" \
-d 'Jan 1 12:00:00 server1 sshd[1234]: User login from 192.168.1.100'
Token Management¶
# Create new token
curl -X POST http://localhost:8888/admin/tokens \
-H "Content-Type: application/json" \
-d '{
"name": "production-app",
"description": "Production application token",
"permissions": ["write"],
"rateLimit": 1000,
"enabled": true
}'
# List tokens
curl http://localhost:8888/admin/tokens
# Disable token
curl -X DELETE http://localhost:8888/admin/tokens/<TOKEN_ID>
2. Universal Syslog Ingestion¶
Ports: 514 (UDP/TCP), 601 (TCP), 6514 (TLS)
Protocols: UDP, TCP, TLS
Standards: RFC 3164, RFC 5424
Supported Protocols¶
Port |
Protocol |
Description |
Use Case |
|---|---|---|---|
514/udp |
UDP |
Traditional syslog |
High-volume, fire-and-forget |
514/tcp |
TCP |
Reliable syslog |
Guaranteed delivery |
601/tcp |
TCP |
RFC 5425 compliant |
Standards-based deployment |
6514/tcp |
TLS |
Secure syslog |
Encrypted transmission |
Configuration¶
# Environment variables for log-ingestion service
SYSLOG_UDP_PORT=514
SYSLOG_TCP_PORT=514
SYSLOG_RFC5425_PORT=601
SYSLOG_TLS_PORT=6514
Usage Examples¶
# Send via UDP (traditional)
echo '<134>Jan 1 12:00:00 server1 app[1234]: User authenticated' | nc -u localhost 514
# Send via TCP (reliable)
echo '<134>Jan 1 12:00:00 server1 app[1234]: User authenticated' | nc localhost 514
# Send with JSON payload
echo '<134>Jan 1 12:00:00 server1 app[1234]: User authenticated JSON:{"user":"john","ip":"192.168.1.100"}' | nc localhost 514
# TLS syslog (requires proper certificates)
openssl s_client -connect localhost:6514 -cert client.crt -key client.key
JSON Payload Support¶
SecureWatch supports structured data within syslog messages:
<134>Jan 1 12:00:00 server1 app[1234]: User login JSON:{"user":"john.doe","source_ip":"192.168.1.100","session_id":"abc123"}
The JSON portion is automatically extracted and indexed as structured fields.
3. File Upload & Processing API¶
Port: 4000 (Frontend) + 4002 (Processing)
Interface: Web UI + REST API
Formats: CSV, XML, JSON, EVTX, Text
Web Interface¶
Navigate to Settings → Log Sources
Use the File Upload widget
Configure upload options:
Source name
Source type
CSV delimiter
Header detection
Drag and drop files or browse to select
Monitor real-time processing progress
REST API¶
# Upload file via API
curl -X POST http://localhost:4000/api/files/upload \
-F "file=@logfile.csv" \
-F "options={\"source\":\"webserver\",\"sourcetype\":\"access_log\",\"hasHeaders\":true}"
# Check processing status
curl http://localhost:4000/api/files/upload?fileId=<FILE_ID>
Supported File Types¶
Format |
Extensions |
Max Size |
Description |
|---|---|---|---|
CSV |
.csv |
50MB |
Comma-separated values with automatic field detection |
XML |
.xml |
25MB |
Structured XML logs with nested element support |
JSON |
.json |
25MB |
JSON objects and arrays with automatic parsing |
EVTX |
.evtx |
100MB |
Windows Event Logs with full field extraction |
Text |
.txt, .log |
50MB |
Plain text logs with regex parsing |
4. Enhanced Agent with Persistent Queuing¶
Platform: Cross-platform (Windows, macOS, Linux)
Transport: HTTPS with mTLS
Features: Guaranteed delivery, compression, retry logic
Agent Configuration¶
[agent]
agent_id = unique-agent-id
config_update_interval = 300
[transport]
endpoint = https://securewatch.company.com/api/ingest
cert_path = /etc/securewatch/certs/client.crt
key_path = /etc/securewatch/certs/client.key
ca_path = /etc/securewatch/certs/ca.crt
batch_size = 100
[queue]
max_size = 50000
max_age_hours = 72
compression_threshold = 2048
retry_delays = [30, 300, 1800, 7200]
Queue Management¶
# Agent queue statistics
from agent.core.persistent_queue import PersistentQueue
queue = PersistentQueue(config)
stats = await queue.get_stats()
print(f"Pending events: {stats['status_counts']['pending']}")
print(f"Completed events: {stats['status_counts']['completed']}")
print(f"Failed events: {stats['status_counts']['failed']}")
🔧 Configuration & Setup¶
Docker Deployment¶
# docker-compose.yml excerpt
services:
hec-service:
build: ./apps/hec-service
ports:
- "8888:8888"
environment:
- KAFKA_BROKERS=kafka:29092
- RATE_LIMIT_MAX_REQUESTS=1000
log-ingestion:
build: ./apps/log-ingestion
ports:
- "4002:4002"
- "514:514/udp"
- "514:514/tcp"
- "601:601/tcp"
- "6514:6514/tcp"
environment:
- SYSLOG_UDP_PORT=514
- SYSLOG_TCP_PORT=514
- SYSLOG_RFC5425_PORT=601
- SYSLOG_TLS_PORT=6514
Startup¶
# Complete platform startup
./start.sh
# Individual services
docker-compose up -d hec-service
docker-compose up -d log-ingestion
📊 Monitoring & Metrics¶
Health Checks¶
# HEC Service
curl http://localhost:8888/health
# Log Ingestion
curl http://localhost:4002/health
# Syslog adapter statistics
curl http://localhost:4002/adapters/syslog/stats
Performance Metrics¶
# Prometheus metrics
curl http://localhost:8888/metrics
curl http://localhost:4002/metrics
# Kafka topics
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
Service Statistics¶
{
"hec": {
"tokens_active": 5,
"events_received": 15420,
"events_processed": 15420,
"rate_limit_hits": 0,
"avg_response_time_ms": 45
},
"syslog": {
"ports": {
"udp": 514,
"tcp": 514,
"rfc5425": 601,
"tls": 6514
},
"active_connections": {
"tcp": 12,
"rfc5425": 3
},
"messages_received": 89234,
"parse_errors": 23
},
"agent": {
"agents_connected": 45,
"queue_pending": 1250,
"queue_processed": 234567,
"compression_ratio": 0.65
}
}
🛡️ Security Features¶
Data Protection¶
Encryption in Transit: TLS 1.2+ for all HTTP/HTTPS traffic
Compression: Automatic payload compression to reduce bandwidth
Data Validation: Input validation and sanitization
Audit Logging: Complete audit trail of all ingestion activities
🚀 Performance Optimization¶
High-Volume Scenarios¶
Use HEC Batch API: Send multiple events per request
Enable Compression: Automatic payload compression
Kafka Partitioning: Distribute load across Kafka partitions
Buffer Management: Configure appropriate buffer sizes
Connection Pooling: Reuse connections for better performance
Tuning Parameters¶
# HEC Service
BATCH_SIZE=100
BATCH_TIMEOUT_MS=5000
RATE_LIMIT_MAX_REQUESTS=1000
# Log Ingestion
KAFKA_BATCH_SIZE=100
BUFFER_SIZE=1000000
COMPRESSION_ENABLED=true
# Agent
QUEUE_MAX_SIZE=50000
COMPRESSION_THRESHOLD=2048
BATCH_SIZE=100
🔍 Troubleshooting¶
Common Issues¶
HEC Service Not Responding¶
# Check service status
curl http://localhost:8888/health
# View logs
docker logs securewatch_hec
# Verify Kafka connection
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
Syslog Messages Not Received¶
# Test UDP connectivity
nc -u localhost 514 < test_message.txt
# Check port bindings
netstat -tulpn | grep :514
# View adapter statistics
curl http://localhost:4002/adapters/syslog/stats
Agent Queue Issues¶
# Check agent status
ps aux | grep event_log_agent.py
# View queue statistics
python3 -c "from agent.core.persistent_queue import PersistentQueue; print('Queue operational')"
# Monitor transport logs
tail -f /var/log/securewatch/agent.log
Performance Issues¶
High Latency: Increase batch sizes, enable compression
Memory Usage: Adjust buffer sizes, enable disk spillover
Network Issues: Check firewall rules, DNS resolution
Queue Backlog: Increase processing threads, optimize Kafka
📚 Integration Examples¶
Splunk Universal Forwarder Migration¶
# Replace Splunk forwarder with SecureWatch agent
# Old Splunk outputs.conf:
[tcpout]
defaultGroup = securewatch
server = splunk.company.com:9997
# New SecureWatch agent config:
[transport]
endpoint = https://securewatch.company.com:8888/services/collector
Application Integration¶
# Python application sending to HEC
import requests
def send_to_securewatch(event_data):
headers = {
'Authorization': 'Splunk your-token-here',
'Content-Type': 'application/json'
}
payload = {
'event': event_data,
'source': 'python-app',
'sourcetype': 'application_log'
}
response = requests.post(
'http://localhost:8888/services/collector',
json=payload,
headers=headers
)
return response.status_code == 200
Rsyslog Integration¶
# /etc/rsyslog.conf
# Forward all logs to SecureWatch
*.* @@localhost:514
# Forward specific facility
mail.* @@localhost:601
# TLS forwarding (requires certificates)
$DefaultNetstreamDriverCAFile /etc/ssl/certs/ca.pem
$ActionSendStreamDriver gtls
$ActionSendStreamDriverMode 1
$ActionSendStreamDriverAuthMode x509/name
*.* @@localhost:6514
📈 Scaling & Production Deployment¶
Horizontal Scaling¶
Load Balancer: Distribute HEC requests across multiple instances
Kafka Partitioning: Scale message processing
Database Sharding: Distribute storage load
Agent Distribution: Deploy agents across multiple regions
Production Checklist¶
TLS certificates configured for all services
Rate limiting configured appropriately
Monitoring and alerting set up
Log retention policies defined
Backup and disaster recovery tested
Security audit completed
Performance testing conducted
Documentation updated
Enterprise-grade data ingestion for comprehensive security monitoring and log management 🚀