This guide explains how to use SONAR’s data shipping feature to send anomaly detection results to dedicated Wazuh data streams for real-time monitoring and integration with RADAR automated responses.
Data shipping is an add-on feature that enables SONAR to:
| Scenario | Use Shipping? | Reason |
|---|---|---|
| Real-time detection with automated response | ✅ Yes | RADAR can monitor specific data streams for high-confidence anomalies |
| Historical analysis | ❌ No | Standard anomaly index sufficient for post-mortem investigation |
| Testing/development | ❌ No | Debug mode with local test data is more appropriate |
| Production deployment | ✅ Yes | Enables integration with SIEM dashboards and alerting |
┌─────────────────┐
│ SONAR Training │ --ship flag
│ (cmd_train) │
└────────┬────────┘
│
│ Creates data stream template
↓
┌─────────────────────────────────────┐
│ Wazuh Indexer (OpenSearch) │
│ │
│ ┌───────────────────────────────┐ │
│ │ Index Template: │ │
│ │ sonar_stream_template_mvad │ │
│ │ │ │
│ │ Components: │ │
│ │ - component_template_sonar_ │ │
│ │ mvad (algorithm fields) │ │
│ │ - component_template_sonar_ │ │
│ │ {scenario_id} (features) │ │
│ └───────────────────────────────┘ │
│ │
│ ┌───────────────────────────────┐ │
│ │ Data Stream: │ │
│ │ sonar_anomalies_mvad_{id} │ │
│ │ │ │
│ │ Documents: │ │
│ │ - timestamp │ │
│ │ - is_anomaly: true │ │
│ │ - anomaly_score: 0.95 │ │
│ │ - threshold: 0.85 │ │
│ │ - scenario_name │ │
│ │ - feature_values │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
↑
│ Ships anomaly documents
│
┌────────┴────────┐
│ SONAR Detection │ --ship flag
│ (cmd_detect) │
└─────────────────┘
Base Template (sonar_stream_template)
│
├── Priority: 1
├── Index Pattern: sonar_anomalies_*
└── Fields:
├── is_anomaly (boolean)
├── anomaly_score (float)
├── threshold (float)
├── scenario_name (keyword)
└── detection_timestamp (date)
MVAD Algorithm Template (sonar_stream_template_mvad)
│
├── Priority: 2
├── Index Pattern: sonar_anomalies_mvad_*
├── Components:
│ └── component_template_sonar_mvad
└── Additional Fields:
├── mvad_score (float)
├── mvad_threshold (float)
├── sliding_window (integer)
├── bucket_minutes (integer)
└── context (object)
Scenario-Specific Template (sonar_detector_mvad_{scenario_id})
│
├── Priority: 3
├── Index Pattern: sonar_anomalies_mvad_{scenario_id}
├── Components:
│ ├── component_template_sonar_mvad
│ └── component_template_sonar_{scenario_id}
└── Additional Fields:
└── scenario_features (object)
├── feature_rule_level (float)
├── feature_srcip_count (float)
└── ... (scenario-specific features)
SONAR supports two ways to enable data shipping:
poetry run sonar train --scenario my_scenario.yaml --ship
# In your scenario YAML file
shipping:
enabled: true # Enable data shipping
install_templates: true # Install templates on first run
scenario_id: "my_custom_id" # Optional: custom scenario ID
Shipping is enabled when both conditions are met:
--ship flag is set ORshipping.enabled: true is set--debug flag disables shipping automatically (safety measure)Decision matrix:
CLI --ship |
YAML shipping.enabled |
CLI --debug |
Shipping Active? |
|---|---|---|---|
| ✅ True | ❌ False | ❌ False | ✅ Yes |
| ❌ False | ✅ True | ❌ False | ✅ Yes |
| ✅ True | ✅ True | ❌ False | ✅ Yes |
| ✅ True | ✅ True | ✅ True | ❌ No (debug mode) |
| ❌ False | ❌ False | ❌ False | ❌ No |
Best practices:
--debug (shipping auto-disabled)shipping.enabled: true in YAML--ship flag without modifying YAML--debug and --ship (debug mode wins)Create a data stream for your scenario:
poetry run sonar train \
--config configs/my_config.yaml \
--lookback-hours 168 \
--ship
Output:
[INFO] Training on 2400 time points...
[INFO] Saving model to ./mvad_model.pkl...
[INFO] Initializing data shipper for stream creation...
[INFO] Creating data stream for scenario: sonar_scenario_a1b2c3d4
[INFO] ✓ Data stream ready: sonar_anomalies_mvad_a1b2c3d4
[INFO] Future detection runs with --ship will index anomalies to this stream
[INFO] ✓ Training complete.
Ship anomalies to the created stream:
# Batch mode (single detection run)
poetry run sonar detect \
--config configs/my_config.yaml \
--lookback-minutes 60 \
--ship
# Real-time mode (continuous monitoring) - from project root
poetry run sonar scenario \
--use-case sonar/scenarios/brute_force_detection.yaml \
--mode realtime \
--ship
Output:
[INFO] Running detection on 240 time points...
[INFO] Found 3 anomalies.
[INFO] Shipping anomalies to Wazuh data stream...
[INFO] Shipped anomaly to sonar_anomalies_mvad_a1b2c3d4: abc123
[INFO] Shipped anomaly to sonar_anomalies_mvad_a1b2c3d4: def456
[INFO] Shipped anomaly to sonar_anomalies_mvad_a1b2c3d4: ghi789
[INFO] ✓ Shipped 3 anomalies to data stream
sonar train [OPTIONS] --ship
What happens:
Important: Shipping during training only creates the infrastructure. No anomalies are shipped during training.
sonar detect [OPTIONS] --ship
sonar scenario --use-case YAML --ship
What happens:
wazuh-anomalies-mvad, anomalies are shipped to scenario-specific data stream# From project root
sonar scenario --use-case sonar/scenarios/brute_force_detection.yaml --ship
Behavior:
Add a shipping section to your scenario YAML file for persistent configuration:
# Example: sonar/scenarios/brute_force_detection.yaml
name: "Brute Force Attack Detection"
description: "Detect SSH brute force attempts"
# ... wazuh, training, detection sections ...
shipping:
enabled: true # Enable data shipping for this scenario
install_templates: true # Install base templates on first run
scenario_id: "brute_force" # Custom scenario ID (optional)
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
boolean | false |
Enable/disable data shipping for this scenario |
install_templates |
boolean | true |
Whether to install base index templates on first run |
scenario_id |
string | null |
Custom scenario ID (defaults to hash of model path if not specified) |
The --ship CLI flag overrides YAML configuration:
# YAML has shipping.enabled: false
# CLI flag enables it anyway:
poetry run sonar scenario --use-case my_scenario.yaml --ship # Shipping enabled
# YAML has shipping.enabled: true
# CLI flag not provided, but shipping still enabled:
poetry run sonar scenario --use-case my_scenario.yaml # Shipping enabled
# YAML has shipping.enabled: true
# Debug mode always disables shipping:
poetry run sonar scenario --use-case my_scenario.yaml --debug # Shipping disabled
name: "Lateral Movement Detection"
description: "Detect lateral movement patterns using PSExec, WMI, RDP connections"
wazuh:
host: "wazuh.production.local"
port: 9200
index_pattern: "wazuh-alerts-*"
username: "admin"
password_env: "WAZUH_PASSWORD"
training:
lookback_days: 14
min_samples: 1000
bucket_minutes: 10
sliding_window: 144
categorical_top_k: 20
shipping:
enabled: true # Always ship in production
install_templates: true
scenario_id: "lateral_movement" # Consistent ID for RADAR
detection:
mode: "realtime"
lookback_minutes: 60
threshold: 0.75
min_consecutive: 2
Result: Every training run creates sonar_anomalies_mvad_lateral_movement stream, and all detected anomalies are shipped to it.
Each trained model gets a unique scenario ID based on its model path:
import hashlib
scenario_id = hashlib.md5(model_path.encode()).hexdigest()[:8]
# Example: "a1b2c3d4"
Stream name format: sonar_anomalies_mvad_{scenario_id}
Example: sonar_anomalies_mvad_a1b2c3d4
Each anomaly document contains:
{
"timestamp": "2026-01-06T15:30:00.000Z",
"is_anomaly": true,
"anomaly_score": 0.95,
"threshold": 0.85,
"mvad_score": 0.95,
"mvad_threshold": 0.85,
"scenario_name": "sonar_scenario_a1b2c3d4",
"detection_timestamp": "2026-01-06T15:35:00.000Z",
"alert_count": 450,
"feature_count": 3,
"sliding_window": 200,
"bucket_minutes": 5,
"context": {
"model_path": "./models/brute_force_model.pkl",
"feature_names": ["rule.level", "srcip_count", "dstip_count"],
"training_samples": 2400
},
"scenario_features": {
"feature_rule_level": 8.5,
"feature_srcip_count": 15.0,
"feature_dstip_count": 3.0
}
}
| Field | Type | Description |
|---|---|---|
timestamp |
date | Timestamp of the detection window |
is_anomaly |
boolean | Always true for shipped documents |
anomaly_score |
float | Anomaly severity score (0-1) |
threshold |
float | Detection threshold used |
mvad_score |
float | MVAD algorithm-specific score |
scenario_name |
keyword | Unique scenario identifier |
detection_timestamp |
date | When detection was performed |
alert_count |
integer | Number of alerts in detection window |
feature_count |
integer | Number of features analyzed |
sliding_window |
integer | MVAD sliding window size |
bucket_minutes |
integer | Time bucket size in minutes |
context |
object | Model metadata (path, features, training info) |
scenario_features |
object | Feature values that triggered anomaly |
RADAR can monitor SONAR data streams for high-confidence anomalies:
# RADAR playbook configuration
- name: Monitor SONAR brute force anomalies
monitor:
index: "sonar_anomalies_mvad_a1b2c3d4"
query:
bool:
must:
- match: { is_anomaly: true }
- range: { anomaly_score: { gte: 0.9 } }
polling_interval: 60
actions:
- block_srcip
- notify_soc
- create_ticket
GET /sonar_anomalies_mvad_*/_search
{
"query": {
"bool": {
"must": [
{ "term": { "is_anomaly": true } },
{ "range": { "anomaly_score": { "gte": 0.85 } } }
]
}
},
"sort": [
{ "detection_timestamp": "desc" }
]
}
Monitor specific attack types:
# Monitor only brute force scenario
GET /sonar_anomalies_mvad_a1b2c3d4/_search
# Monitor all SONAR anomalies
GET /sonar_anomalies_mvad_*/_search
# Monitor recent anomalies (last hour)
GET /sonar_anomalies_mvad_*/_search
{
"query": {
"range": {
"detection_timestamp": {
"gte": "now-1h"
}
}
}
}
SONAR automatically installs a rollover policy for data streams:
Policy: sonar_rollover_policy
{
"policy": {
"description": "SONAR anomaly data stream rollover policy",
"states": [
{
"name": "active",
"transitions": [
{
"state_name": "rollover",
"conditions": {
"min_index_age": "7d",
"min_doc_count": 100000
}
}
]
},
{
"name": "rollover",
"actions": [{ "rollover": {} }],
"transitions": [
{
"state_name": "delete",
"conditions": { "min_index_age": "30d" }
}
]
},
{
"name": "delete",
"actions": [{ "delete": {} }]
}
]
}
}
Behavior:
Policy is automatically installed during first training with --ship. To manually install:
from sonar.shipper.wazuh_data_shipper import WazuhDataShipper
from sonar.config import WazuhIndexerConfig
config = WazuhIndexerConfig(
base_url="https://localhost:9200",
username="admin",
password="admin"
)
shipper = WazuhDataShipper(config, install=True)
shipper.add_rollover_policy()
Cause: Cannot connect to OpenSearch/Wazuh Indexer.
Solution:
# Check Wazuh Indexer status
curl -k -u admin:admin https://localhost:9200/_cluster/health
# Verify credentials in config
cat configs/my_config.yaml
Cause: Base templates not installed.
Solution:
# Re-run training with --ship to install templates
poetry run sonar train --config configs/my_config.yaml --ship
Symptoms:
sonar_anomalies_mvad_*Diagnosis:
# Check if data stream exists
GET /_data_stream/sonar_anomalies_mvad_*
# Check template exists
GET /_index_template/sonar_stream_template_mvad_*
# Check recent documents
GET /sonar_anomalies_mvad_*/_search?size=1&sort=detection_timestamp:desc
Solution:
--ship before detectionSymptoms: Error during shipper.ship_single()
Common causes:
--ship firstFallback behavior: If shipping fails, SONAR automatically falls back to standard anomaly indexing:
[ERROR] Failed to ship anomalies: ...
[INFO] Falling back to standard anomaly indexing...
[INFO] Indexed anomaly: abc123
Behavior: Shipping is automatically disabled in debug mode.
# This will skip shipping (no Wazuh connection)
poetry run sonar train --debug --ship
# Output:
[INFO] 🔧 DEBUG MODE ENABLED - Using local test data
[INFO] ✓ Training complete.
# (No shipping messages)
Why: Debug mode uses local test data and doesn’t connect to Wazuh Indexer, so shipping is not applicable.
By default, scenario ID is derived from model path. To use custom naming:
from sonar.shipper.wazuh_data_shipper import WazuhDataShipper
shipper = WazuhDataShipper(config, install=False)
# Custom scenario ID
scenario_id = "brute_force_v1"
scenario_name = "Brute Force Detection v1.0"
# Define feature types
features = {
"rule.level": "float",
"srcip_count": "integer",
"failed_login_rate": "float"
}
stream_name = shipper.create_scenario_stream(
scenario_id=scenario_id,
scenario_name=scenario_name,
features=features
)
print(f"Stream created: {stream_name}")
# Output: sonar_anomalies_mvad_brute_force_v1
For high-throughput scenarios:
from sonar.shipper.wazuh_data_shipper import WazuhDataShipper
shipper = WazuhDataShipper(config)
# Prepare bulk request
actions = ["index"] * len(anomaly_docs)
bulk_body = shipper.get_bulk_request(
data=anomaly_docs,
stream_name="sonar_anomalies_mvad_a1b2c3d4",
actions=actions
)
# Ship in bulk
response = shipper.ship_bulk(bulk_body)
print(f"Indexed {len(anomaly_docs)} documents")
# Delete a specific data stream
shipper.delete_data_stream("sonar_anomalies_mvad_a1b2c3d4")
# Via OpenSearch API
DELETE /sonar_anomalies_mvad_a1b2c3d4
Warning: This deletes all anomaly records in the stream.
# Wrong: Detection without prior training
poetry run sonar detect --ship # Stream doesn't exist!
# Correct: Train first, then detect
poetry run sonar train --ship
poetry run sonar detect --ship
# Best: Use scenarios for reproducible workflows (from project root)
poetry run sonar scenario \
--use-case sonar/scenarios/brute_force_detection.yaml \
--mode realtime \
--ship
Set up monitoring for:
# Config: my_config.yaml
model_path: "./models/brute_force_ssh_v1.0.pkl" # Clear naming
This creates identifiable scenario IDs and makes debugging easier.
# 1. Test with standard indexing
poetry run sonar train
poetry run sonar detect
# 2. Verify anomalies in wazuh-anomalies-mvad
# 3. Enable shipping for production
poetry run sonar train --ship
poetry run sonar detect --ship
| Command | With –ship | Without –ship |
|---|---|---|
sonar train |
Creates data stream template | Trains model only |
sonar detect |
Ships to data stream | Indexes to wazuh-anomalies-mvad |
sonar scenario |
Full workflow with streaming | Standard workflow |
Key takeaway: Shipping is a production add-on for real-time monitoring and RADAR integration. Use standard indexing for testing and development.