This document provides a comprehensive overview of SONAR’s (SIEM-Oriented Neural Anomaly Recognition) software architecture, design principles, and implementation patterns. It serves as the authoritative reference for understanding and extending the system.
SONAR (SIEM-Oriented Neural Anomaly Recognition) is a multivariate time-series anomaly detection subsystem for IDPS-ESCAPE. It analyzes Wazuh security alerts to identify unusual patterns that may indicate security threats.
| Component | Technology |
|---|---|
| Anomaly detection | time-series-anomaly-detector (Microsoft MVAD) |
| Data processing | pandas, NumPy |
| Alert storage | OpenSearch (Wazuh Indexer) |
| Configuration | YAML, Python dataclasses |
| CLI | argparse |
| Package management | Poetry |
Each module has a single, well-defined responsibility:
Data Ingestion → Feature Engineering → Model Training/Prediction → Post-processing → Result Storage
This separation enables:
All operational parameters are externalized to configuration:
The LocalDataProvider implements the same interface as WazuhIndexerClient, enabling:
All external interactions include:
Each operation (training, detection) is self-contained:
┌─────────────────────────────────────────────────────────────────┐
│ CLI Layer (cli.py) │
│ Commands: train | detect | scenario | check │
├─────────────────────────────────────────────────────────────────┤
│ Configuration Layer │
│ config.py (dataclasses) | scenario.py (YAML) │
├─────────────────────────────────────────────────────────────────┤
│ Data Ingestion Layer │
│ WazuhIndexerClient | LocalDataProvider (debug mode) │
├─────────────────────────────────────────────────────────────────┤
│ Feature Engineering Layer │
│ WazuhFeatureBuilder │
│ Timestamp parsing | Bucketing | Aggregation | Encoding │
├─────────────────────────────────────────────────────────────────┤
│ ML Engine Layer │
│ MVADModelEngine │
│ Train | Predict | Save | Load operations │
├─────────────────────────────────────────────────────────────────┤
│ Post-processing Layer │
│ MVADPostProcessor │
│ Anomaly document generation for Wazuh indexing │
├─────────────────────────────────────────────────────────────────┤
│ Storage Layer │
│ Wazuh Indexer (OpenSearch REST API) │
│ wazuh-alerts-* (read) | wazuh-anomalies-mvad (write) │
└─────────────────────────────────────────────────────────────────┘
flowchart LR
subgraph Input
Wazuh[(Wazuh Indexer)]
JSON[(Local JSON)]
end
subgraph Processing
Client[Data Client]
Features[Feature Builder]
Engine[MVAD Engine]
Post[Post-processor]
end
subgraph Output
Model[(Model File)]
Anomalies[(Anomaly Index)]
end
Wazuh --> Client
JSON --> Client
Client --> Features
Features --> Engine
Engine --> Model
Engine --> Post
Post --> Anomalies
| Module | File | Purpose | Key classes/functions |
|---|---|---|---|
| Configuration | config.py |
Dataclass definitions for all settings | PipelineConfig, WazuhIndexerConfig, MVADConfig, FeatureConfig, DebugConfig |
| Scenarios | scenario.py |
YAML-based workflow definitions | UseCase, TrainingScenario, DetectionScenario |
| Data ingestion | wazuh_client.py |
OpenSearch REST API client | WazuhIndexerClient |
| Debug provider | local_data_provider.py |
Local JSON file loader | LocalDataProvider |
| Feature engineering | features.py |
Time-series construction | WazuhFeatureBuilder |
| ML engine | engine.py |
MVAD model wrapper | MVADModelEngine |
| Post-processing | pipeline.py |
Result document generation | MVADPostProcessor |
| Data shipping | shipper/wazuh_data_shipper.py |
Data stream management for anomalies | WazuhDataShipper, TemplateHandler |
| CLI | cli.py |
Command-line interface | main(), cmd_train(), cmd_detect(), cmd_scenario() |
cli.py
├── config.py
├── scenario.py
├── wazuh_client.py OR local_data_provider.py
├── features.py
├── engine.py
├── pipeline.py
└── shipper/wazuh_data_shipper.py (optional, when --ship flag used)
engine.py
├── config.py
└── time-series-anomaly-detector (external)
features.py
├── config.py
└── pandas
wazuh_client.py
├── config.py
└── requests
pipeline.py
└── config.py
shipper/wazuh_data_shipper.py
├── config.py
├── opensearchpy
└── shipper/wazuh_base_template.py
The shipper module provides production-grade data streaming for anomaly results:
Purpose: Create and manage dedicated OpenSearch data streams for SONAR anomaly results, enabling:
Components:
| File | Purpose |
|---|---|
shipper/__init__.py |
Abstract DataShipper base class |
shipper/wazuh_base_template.py |
Index template definitions for SONAR streams |
shipper/wazuh_data_shipper.py |
WazuhDataShipper implementation with template management |
When used:
--ship flag during training or detectionTemplate hierarchy:
See: data-shipping-guide.md for complete usage guide.
sequenceDiagram
participant CLI
participant Config
participant Client
participant Features
participant Engine
CLI->>Config: Load PipelineConfig
CLI->>Client: Initialize (Wazuh or Local)
CLI->>Client: search_alerts(start, end)
Client-->>CLI: List[Dict] alerts
CLI->>Features: build_timeseries(alerts)
Features-->>CLI: DataFrame ts_data
CLI->>Engine: train(ts_data)
Engine-->>CLI: Model trained
CLI->>Engine: save()
Engine-->>CLI: Model persisted to disk
sequenceDiagram
participant CLI
participant Config
participant Client
participant Features
participant Engine
participant Post
CLI->>Config: Load PipelineConfig
CLI->>Engine: load()
Engine-->>CLI: Model loaded
CLI->>Client: search_alerts(start, end)
Client-->>CLI: List[Dict] alerts
CLI->>Features: build_timeseries(alerts)
Features-->>CLI: DataFrame ts_data
CLI->>Engine: predict(ts_data)
Engine-->>CLI: Anomaly results
CLI->>Post: build_wazuh_anomaly_docs(results)
Post-->>CLI: List[Dict] anomaly_docs
CLI->>Client: index_anomaly(doc) for each
Raw Wazuh Alerts (JSON)
│
▼
┌─────────────────────────────────────┐
│ Parse timestamps (ISO 8601) │
│ Extract numeric fields (rule.level) │
│ Extract categorical fields │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Bucket by time interval (5 min) │
│ Aggregate: mean, count, sum │
│ One-hot encode categories (top-k) │
└─────────────────────────────────────┘
│
▼
pandas DataFrame (time-indexed, numeric columns)
│
▼
┌─────────────────────────────────────┐
│ MVAD sliding window analysis │
│ Multivariate anomaly scoring │
└─────────────────────────────────────┘
│
▼
Anomaly Results (scores, timestamps, is_anomaly flags)
│
▼
┌─────────────────────────────────────┐
│ Generate Wazuh-compatible documents │
│ Add metadata (scenario, threshold) │
└─────────────────────────────────────┘
│
▼
Indexed to wazuh-anomalies-mvad
PipelineConfig (root)
├── wazuh: WazuhIndexerConfig
│ ├── base_url: str
---
## Configuration system
### Unified configuration approach
SONAR separates infrastructure configuration from detection logic:
**Config files** (`default_config.yaml`, `resource_monitoring_config.yaml`):
- Wazuh connection settings
- Debug mode configuration
- Model storage paths
- Default feature extraction parameters
**Scenario files** (`scenarios/*.yaml`):
- Training parameters (lookback, features, sliding window)
- Detection parameters (mode, threshold, lookback)
- Query filters (scenario-specific)
### Configuration hierarchy
PipelineConfig ├── wazuh: WazuhIndexerConfig │ ├── base_url: str │ ├── username: str │ ├── password: str │ ├── verify_ssl: bool │ ├── alerts_index_pattern: str │ └── anomalies_index: str ├── mvad: MVADConfig │ ├── sliding_window: int │ ├── device: str (cpu|cuda) │ └── extra_params: Dict ├── features: FeatureConfig │ ├── numeric_fields: List[str] │ ├── bucket_minutes: int │ ├── categorical_fields: List[str] │ ├── categorical_top_k: int │ └── derived_features: bool ├── debug: DebugConfig │ ├── enabled: bool │ ├── data_dir: str │ ├── training_data_file: str │ └── detection_data_file: str ├── shipping: ShippingConfig │ ├── enabled: bool │ ├── install_templates: bool │ └── scenario_id: Optional[str] └── model_path: str
### Configuration loading
1. **Default**: `default_config.yaml` (auto-loaded if no --config specified)
2. **Custom**: Specified via `--config` CLI argument
3. **Scenario**: Merged from scenario YAML (overrides config defaults)
4. **CLI args**: Override specific values (e.g., `--lookback-hours`)
### Example infrastructure configuration
```yaml
# default_config.yaml
wazuh:
base_url: "https://localhost:9200"
username: "admin"
password: "admin"
verify_ssl: false
alerts_index_pattern: "wazuh-alerts-*"
anomalies_index: "wazuh-anomalies-mvad"
mvad:
sliding_window: 200
device: "cpu"
extra_params: {}
features:
numeric_fields: ["rule.level"]
bucket_minutes: 5
categorical_fields: []
categorical_top_k: 10
derived_features: true
debug:
enabled: false
data_dir: "./test_data/synthetic_alerts"
training_data_file: "normal_baseline.json"
detection_data_file: "with_anomalies.json"
model_path: "./model/mvad_model.pkl"
# scenarios/brute_force_detection.yaml
name: "Brute Force Detection"
description: "Detect authentication attack patterns"
enabled: true
model_name: "brute_force_baseline_v1" # Optional: custom model name
# Optional: Filter specific alerts before processing
query_filter:
bool:
should:
- match: {"rule.groups": "authentication"}
training:
lookback_hours: 168
numeric_fields: ["rule.level"]
categorical_fields: ["agent.id", "rule.groups"]
bucket_minutes: 5
sliding_window: 200
device: "cpu"
detection:
mode: "historical"
lookback_minutes: 60
threshold: 0.7
min_consecutive: 2
name: "Scenario Name"
description: "Description of the use case"
enabled: true
# Optional: Filter specific alerts
query_filter:
bool:
should: [...]
training: # Optional section
lookback_hours: 24
numeric_fields: ["rule.level"]
categorical_fields: []
bucket_minutes: 5
sliding_window: 200
device: "cpu"
detection: # Optional section
mode: "batch" # historical | batch | realtime
lookback_minutes: 10
threshold: 0.7
min_consecutive: 2
| Scenario sections | Execution behavior | Use case |
|---|---|---|
training + detection |
Train model → detect anomalies | Full workflow |
training only |
Train and save model (no detection) | Baseline establishment |
detection only |
Load existing model → detect | Ad-hoc investigation |
| Mode | Behavior | Use case |
|---|---|---|
| historical | One-shot on recent data | Ad-hoc investigation |
| batch | One-shot after training | Baseline + immediate detection |
| realtime | Continuous polling loop | Production monitoring |
flowchart TD
Start([Start]) --> Load[Load scenario YAML]
Load --> CheckEnabled{enabled?}
CheckEnabled -->|No| Skip([Skip scenario])
CheckEnabled -->|Yes| CheckTraining{has_training<br/>section?}
CheckTraining -->|Yes| Train[Phase 1: Training]
Train --> SaveModel[Save model.pkl]
SaveModel --> CheckDetection{has_detection<br/>section?}
CheckTraining -->|No| LoadModel[Load existing model.pkl]
LoadModel --> CheckDetection
CheckDetection -->|No| Done([Done])
CheckDetection -->|Yes| GetMode{detection<br/>mode?}
GetMode -->|historical| Historical[Phase 2: One-shot detection]
GetMode -->|batch| Batch[Phase 2: One-shot detection]
GetMode -->|realtime| Realtime[Phase 2: Continuous polling]
Historical --> Index[Index anomalies to Wazuh]
Batch --> Index
Realtime --> Index
Index --> Done
SONAR automatically handles feature mismatches between training and detection:
# engine.py - MVADModelEngine
def predict(self, ts_data: pd.DataFrame) -> Any:
# Store training columns during training
self.training_columns = ts_clean.columns.tolist()
# During detection: align columns automatically
if hasattr(self, 'training_columns'):
# Add missing columns (fill with 0)
for col in self.training_columns:
if col not in ts_clean.columns:
ts_clean[col] = 0.0
# Remove extra columns
ts_clean = ts_clean[self.training_columns]
# Reorder to match training
ts_clean = ts_clean[self.training_columns]
This ensures:
FeatureConfig in config.py:
@dataclass
class FeatureConfig:
# Existing fields...
new_feature_fields: Sequence[str] = field(default_factory=list)
WazuhFeatureBuilder in features.py:
def _extract_single_alert_features(self, alert: Dict) -> Dict:
features = {}
# Existing extraction...
# Add new feature extraction
for field_path in self.cfg.new_feature_fields:
value = self._get_nested_field(alert, field_path)
features[f"new_{field_path}"] = process_value(value)
return features
DetectionScenario in scenario.py:
mode: Literal["historical", "batch", "realtime", "new_mode"] = "historical"
cli.py:
def _execute_detection_phase(use_case, engine, client, fb, args):
if use_case.detection.mode == "new_mode":
return _run_new_mode_detection(...)
Create a new provider implementing the same interface as WazuhIndexerClient:
class NewDataProvider:
def search_alerts(self, start: datetime, end: datetime,
query: Optional[Dict] = None) -> List[Dict]:
"""Return alerts matching the time range and optional query."""
pass
def check_connection(self) -> bool:
"""Return True if data source is accessible."""
pass
def index_anomaly(self, doc: Dict) -> str:
"""Index an anomaly document, return document ID."""
pass
Example: local_data_provider.py for debug mode.
def cmd_new_command(args, cfg: PipelineConfig) -> int:
"""Implementation of new command."""
# Your logic here
return 0 # Success
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
# Add new command
new_parser = subparsers.add_parser("new_command", help="Description")
new_parser.add_argument("--option", help="Option description")
new_parser.set_defaults(func=cmd_new_command)
SONAR’s data shipping feature enables integration with RADAR (automated response system) by publishing anomaly results to dedicated Wazuh data streams. This is an optional add-on feature for production deployments.
The ShippingConfig dataclass controls data shipping behavior:
@dataclass
class ShippingConfig:
"""Configuration for data shipping to Wazuh data streams."""
enabled: bool = False
"""Enable data shipping (ship anomalies to dedicated data streams)."""
install_templates: bool = True
"""Install base templates on first run (disable if already installed)."""
scenario_id: str = None
"""Optional custom scenario ID (auto-generated from model_path if None)."""
During training with shipping enabled:
_should_ship(cfg, args) determines if shipping should be enabled
--ship flag OR shipping.enabled=true in config_create_scenario_stream(cfg, ts_data, scenario_name) is called
sonar_anomalies_mvad_{scenario_id}Template hierarchy:
Base Template (sonar_stream_template)
└── MVAD Algorithm Template (sonar_stream_template_mvad)
└── Scenario-Specific Template (sonar_detector_mvad_{scenario_id})
└── Component: scenario features (feature_rule_level, etc.)
During detection with shipping enabled:
MVADPostProcessor.build_wazuh_anomaly_docs() creates anomaly documents_should_ship(cfg, args) checks configuration_ship_anomalies(cfg, client, anomaly_docs) is called
sonar_anomalies_mvad_{scenario_id}WazuhDataShipper.ship_single()Data flow: SONAR → Data Stream → RADAR
┌─────────────────────┐
│ SONAR Detection │
│ (detect anomalies) │
└──────────┬──────────┘
│
│ Ships anomalies
↓
┌─────────────────────────────────────┐
│ Wazuh Indexer (OpenSearch) │
│ │
│ Data Stream: │
│ sonar_anomalies_mvad_{scenario_id} │
│ │
│ Documents: │
│ - is_anomaly: true │
│ - anomaly_score: 0.95 │
│ - threshold: 0.85 │
│ - scenario_name: "Brute Force" │
│ - feature_values: {...} │
└──────────┬──────────────────────────┘
│
│ RADAR monitors stream
↓
┌─────────────────────┐
│ RADAR │
│ (automated │
│ response) │
└─────────────────────┘
RADAR configuration example:
# RADAR monitors specific SONAR data streams
monitors:
- name: "SONAR Brute Force Detector"
data_stream: "sonar_anomalies_mvad_12ab34cd"
threshold: 0.85
action: "block_ip"
playbook: "ansible/playbooks/block_malicious_ip.yml"
Detection completes
│
├─ shipping.enabled=true AND --ship flag? ────► No ──► Standard indexing
│ to wazuh-anomalies-mvad
└─ Yes
│
├─ Debug mode? ───────────────────────────► Yes ─► Standard indexing
│ (no shipping in debug)
└─ No
│
└─ Ship to sonar_anomalies_mvad_{scenario_id}
│
├─ Success ──────────────► Logged and indexed
│
└─ Failure ──────────────► Fall back to standard indexing
| Module | Role |
|---|---|
cli.py |
_should_ship(), _create_scenario_stream(), _ship_anomalies() helper functions |
shipper/wazuh_data_shipper.py |
WazuhDataShipper class for index template management and document shipping |
shipper/wazuh_base_template.py |
Base template definitions for MVAD data streams |
config.py |
ShippingConfig dataclass |
scenario.py |
Scenario YAML support for shipping section |
1. Via scenario YAML:
shipping:
enabled: true
install_templates: true
scenario_id: "brute_force_v2"
2. Via CLI flag (overrides config):
poetry run sonar train --scenario my_scenario.yaml --ship
poetry run sonar detect --scenario my_scenario.yaml --ship
For complete usage guide, see data-shipping-guide.md.
For detailed UML diagrams including:
See uml-diagrams.md.
| Document | Description |
|---|---|
| README.md | Documentation index and quick reference |
| setup-guide.md | Installation, configuration, and usage |
| scenario-guide.md | Complete scenario system guide |
| troubleshooting.md | Error solutions and diagnostics |
| uml-diagrams.md | Visual system design diagrams |