The anomaly detection engine is the core component of ADBox. In fact, for every available anomaly detection method it orchestrates the interaction between the bulk functions of every algorithm, the data ingestion, data storage, user output, etc. In other words, the engine determines the sequence of actions to be performed to successfully go through the detection pipeline.
The ADEngine class method maintains the status of the current detector, which is the detector to be used if nothing else is specified and the list of available detectors. Moreover, it exposes the actions that are available for the end user, triggering the corresponding pipelines. Moreover, it defines the default behavior of ADBox if no relevant information is provided to that effect.
The list of available actions includes:
Currently, the ADBox engine supports the MTAD-GAT algorithm for anomaly detection.
Instructions are given to the engine as use cases. The engine can parse and transform use case input into data consumable by various detection pipelines, by calling the DetectorsConfigManager.
By default if the key words for training and prediction are not provided in the use case, the corresponding request is set to None and the output of the pipeline will be empty.
The engine state is maintained via its attributes. Normally, these attributes are set at initiation time using default values, however they can be automatically changed by the pipelines. For example, at the end of the training pipeline the current_detector_id is set to that of detector that was just trained.
| Attribute | Description | Type | Default |
|---|---|---|---|
algorithm |
algorithm used by the detection pipelines. Currently only MTAD-GAT is available. | str |
MTAD-GAT |
current_detector_id |
uuid of the detector to be used by the pipelines. |
str |
Last trained ID |
default_config_path |
The path to the default detector configuration. See default behavior and use case parsing. | str |
DEFAULT_DETECTOR_INPUT_CONFIG |
detectors |
List of detector IDs available locally | List[str] |
|
ship_to_indexer |
If True, data shipping to Wazuh is enabled. See Wazuh-ADBox integration |
bool |
False |
transform_columns_path |
the path to the config file for ingested data type transformation. See Data transformation . | str |
WAZUH_COLUMNS_PATH |
test_env |
If set True, the engine gathers assets (e.g., uc) from the test environment. |
bool |
False |
A detector is the object that is used to perform detection. In the specific case of anomaly detection via MTAD-GAT algorithm, it must include for example a trained ML, along with all the configuration used, and POT object.
In ADBox implementation detector is realize as a collection of files stored under an unique id, which is also the name of subfolder of siem_mtad_gat/assets/detector_models containing such files. All the outcomes generated by a pipeline associate with a certain detector are stored in the corresponding folder. For details see Detector data structure.
├── a77c773c-9e6f-4700-92f2-53c0e682f290
│ ├── input
│ │ ├── detector_input_parameters.json
│ │ └── training_config.json
│ ├── prediction
│ │ ├── uc-16_predicted_anomalies_data-1_2024-08-12_13-48-42.json
│ │ └── uc-16_predicted_data-1_2024-08-12_13-48-42.json
│ └── training
│ ├── losses_train_data.json
│ ├── model.pt
│ ├── scaler.pkl
│ ├── spot
│ │ ├── spot_feature-0.pkl
│ │ ├── spot_feature-1.pkl
│ │ ├── spot_feature-2.pkl
│ │ ├── spot_feature-3.pkl
│ │ ├── spot_feature-4.pkl
│ │ └── spot_feature-global.pkl
│ ├── test_output.pkl
│ ├── train_losses.png
│ ├── train_output.pkl
│ └── validation_losses.png
Below we describe the train and predict pipelines.
The main goal of the train pipeline is to create a new detector.
See parameters in Use-case Guide,
The train pipeline:
get_training_requests_from_uc method).Running the training pipeline should produce input and training folders under the id of the model.
---
title: Training pipeline
---
flowchart LR
s[/Start Engine/] --> P(Parse uc)
P --> Init
Init--> Ingest
Ingest --> Transform
Transform --> Train
Train --> Response
Response --> B{ship?}
B -->|no| D[/Return/]
B -->|yes| C[create detector stream]
C --> D
See also the training pipeline sequence diagram.
The main goal of the prediction pipeline is to use a detector to find anomalies in a selected time-frame.
Note that, at prediction time, detector parameters such as window size, granularity, features aggregation methods, etc. cannot be selected as they are inherent properties of the detector.
See parameters in the use-case guide.
The prediction pipeline:
get_prediction_requests_from_uc](/siem_mtad_gat/ad_engine/mtad_gat/ad_engine.py) method).---
title: Prediction pipeline
---
flowchart LR
s[/Start Engine/] --> P(Parse uc)
P --> Init
Init--> Ingest
subgraph prediction body
Ingest --> Transform
Transform --> Train
Train --> Response
Response --> B{ship?}
B -->|no| Ingest
B -->|yes| C[ship]
C --> Ingest
end
Ingest --> r[/Return/]
TimeManager.TimeManager.
sequenceDiagram
Title __prediction_pipeline_body(prediction_request,exec_timestamp,start_fetch,end_fetch,uc_number,out_interval_extrema,detector_stream)
participant p1 as engine:ADEngine
participant p2 as :data_manager.DataStorageManager
participant rr as response_handler:request_response_handler
activate p2
activate p1
p1 ->>+ p1: __prediction_pipeline_body(prediction_request,exec_timestamp,start_fetch,end_fetch,uc_number,out_interval_extrema,detector_stream)
p1 ->>+ p1: ingest_prediction(prediction_request,start_fetch,end_fetch)
p1 -->>- p1: ingested_data
p1 ->>+ p1: transform(prediction_request,ingested_data,settings.CALLER_PREDICT)
p1 -->>- p1: transformed_data
p1 ->>+ p1: predict(transformed_data)
p1 -->>- p1: pred_df, anomalies
p1 ->> +rr: prediction_pipeline_response(prediction_request, anomalies,column_names,out_interval_extrema, engine.algorithm)
rr -->>- p1 : predicted_anomalies_data
p1 ->> +rr: prediction_pipeline_response(prediction_request, pred_df,column_names,out_interval_extrema, engine.algorithm)
rr -->>- p1 : predicted_data
alt engine.ship_to_indexer is True and detector_stream is not None:
p1 ->>+ p1: __ship_to_wazuh_prediction_pipeline(predicted_data,detector_stream)
deactivate p1
end
p1 ->> p2: save_predict_output(predicted_anomalies_data,exec_timestamp,uc_number,output_type="predicted_anomalies_data")
p1 ->> p2: save_predict_output(predicted_data,uc_number,exec_timestamp=exec_timestamp,output_type="predicted_data")
p1 ->> -p1: predicted_anomalies_data
deactivate p1
deactivate p2
For online prediction run modes, i.e., real time and batch mode, the predict body runs in a loop. To stop the prediction, you can simply issue an interrupt using ctrl-C, which will simply interrupt the loop and close the running managers.
The AD engine can be imported and used from other modules.
For example, suppose we compiled uc_15.yaml and stored it in the drivers folder; the training pipeline can be started as follows:
engine=ADEngine()
train_request=engine.get_training_requests_from_uc(uc_number=15) # parse request and update configs
response_train=engine.training_pipeline(training_request=train_request)
If the training request is None, the response will be None as well.
Similarly, the output of engine.prediction_pipeline is a generator, whose behavior depends on the run mode.
engine=ADEngine()
pred_request=engine.get_prediction_requests_from_uc(uc_number=args.usecase)
engine.run_prediction_pipeline(prediction_request=pred_request,uc_number=args.usecase)