Source code for RADAR.federated_data.algorithms.flexanomalies

from RADAR.base_algorithm_module import BaseAnomalyDetection
from flexanomalies.utils import AutoEncoder
from flexanomalies.utils import IsolationForest
from flexanomalies.utils import PCA_Anomaly
from flexanomalies.utils import ClusterAnomaly
from flexanomalies.utils import DeepCNN_LSTM
from flexanomalies.utils.load_data import federate_data
from flexanomalies.pool.aggregators_favg import aggregate_ae
from flexanomalies.pool.aggregators_cl import aggregate_cl
from flexanomalies.pool.aggregators_pca import aggregate_pca
from flexanomalies.pool.primitives_deepmodel import (
    build_server_model_ae,
    copy_model_to_clients_ae,
    train_ae,
    set_aggregated_weights_ae,
    weights_collector_ae,
    evaluate_global_model,
)
from flexanomalies.pool.primitives_cluster import (
    build_server_model_cl,
    copy_model_to_clients_cl,
    train_cl,
    set_aggregated_weights_cl,
    get_clients_weights_cl,
)
from flexanomalies.pool.primitives_iforest import (
    build_server_model_if,
    copy_model_to_clients_if,
    train_if,
    aggregate_if,
    set_aggregated_weights_if,
    get_clients_weights_if,
)
from flexanomalies.pool.primitives_pca import (
    build_server_model_pca,
    copy_model_to_clients_pca,
    train_pca,
    set_aggregated_weights_pca,
    get_clients_weights_pca,
)
from flexanomalies.utils.save_results import save_experiments_results
from flex.pool import FlexPool
from flexanomalies.utils.metrics import *
from inspect import signature

flexanomalies_algorithms = {
    "autoencoder": AutoEncoder,
    "isolationForest": IsolationForest,
    "pcaAnomaly": PCA_Anomaly,
    "clusterAnomaly": ClusterAnomaly,
    "deepCNN_LSTM": DeepCNN_LSTM,
}


[docs] class FlexAnomalyDetection(BaseAnomalyDetection): def __init__(self, **kwargs): super().__init__(**kwargs) self.algorithm_name = kwargs.get("algorithm_", "autoencoder") self.algorithm_ = flexanomalies_algorithms[self.algorithm_name] self.model = None self.pool = None self.flex_data = None self.federated_functions = { "autoencoder": { "build_model": build_server_model_ae, "copy": copy_model_to_clients_ae, "train": train_ae, "collect": weights_collector_ae, "aggregate": aggregate_ae, "set_weights": set_aggregated_weights_ae, }, "isolationForest": { "build_model": build_server_model_if, "copy": copy_model_to_clients_if, "train": train_if, "collect": get_clients_weights_if, "aggregate": aggregate_if, "set_weights": set_aggregated_weights_if, }, "pcaAnomaly": { "build_model": build_server_model_pca, "copy": copy_model_to_clients_pca, "train": train_pca, "collect": get_clients_weights_pca, "aggregate": aggregate_pca, "set_weights": set_aggregated_weights_pca, }, "clusterAnomaly": { "build_model": build_server_model_cl, "copy": copy_model_to_clients_cl, "train": train_cl, "collect": get_clients_weights_cl, "aggregate": aggregate_cl, "set_weights": set_aggregated_weights_cl, }, "deepCNN_LSTM": { "build_model": build_server_model_ae, "copy": copy_model_to_clients_ae, "train": train_ae, "collect": weights_collector_ae, "aggregate": aggregate_ae, "set_weights": set_aggregated_weights_ae, }, } self.set_params(**kwargs)
[docs] @classmethod def register_algorithm(cls, name, model_class): """Register a new algorithm in the class. Parameters: - name (str): The name of the new algorithm. - model_class (class): The class implementing the anomaly detection model. The class should have: - An __init__ method that accepts model-specific parameters. - A fit(X, y) method to train the model. - A predict(X) method to make predictions. - Optionally, a decision_function(X) for scoring anomalies. """ if name in flexanomalies_algorithms: print( f" The algorithm {name} is already registered and will be overwritten." ) flexanomalies_algorithms[name] = model_class
[docs] def register_federated_functions(self, name, functions): """Records the federated functions of a new algorithm. - federated_functions (dict): A dictionary defining federated learning functions. Required keys: - "build_model": Function to initialize the server model. - "copy": Function to distribute the model to clients. - "train": Function to train the model on client data. - "collect": Function to gather updates from clients. - "aggregate": Function to combine client updates. - "set_weights": Function to update the global model. """ self.federated_functions[name] = functions
[docs] def fit(self, X, y): """Fit detector. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : numpy array of shape Returns ------- self : object Fitted estimator. """ if self.algorithm_name not in self.federated_functions: raise ValueError( f"No federated functions defined for {self.algorithm_name},model not defined" ) # Define the pool for the federated system self._initialize_pool(X, y) federated_ops = self.federated_functions[self.algorithm_name] for i in range(self.n_rounds): print(f"\nRunning round: {i}\n") self.pool.servers.map(federated_ops["copy"], self.pool.clients) self.pool.clients.map(federated_ops["train"]) self.pool.aggregators.map(federated_ops["collect"], self.pool.clients) if self.algorithm_name in ["clusterAnomaly", "ClusterAnomaly"]: self.pool.aggregators.map(federated_ops["aggregate"], model=self.model) else: self.pool.aggregators.map(federated_ops["aggregate"]) self.pool.aggregators.map(federated_ops["set_weights"], self.pool.servers) # Save model self.model = self.pool.servers._models[f"{self.algorithm_name}_server"]["model"] return self
[docs] def decision_function(self, X, y=None): """Predict raw anomaly score of X using the fitted detector. Parameters ---------- X : numpy array of shape (n_samples, n_features) The training input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ if self.model is None: raise ValueError("The model must be trained before using decision_function") try: if self.algorithm_name in ["deepCNN_LSTM", "DeepCNN_LSTM"]: return self.model.decision_function(X,y) else: return self.model.decision_function(X) except Exception as e: print(f"{self.algorithm_name}, decision_function():", str(e))
[docs] def predict(self, X,y=None): """Predict raw anomaly scores of X using the fitted detector. The anomaly score of an input sample is computed based on the fitted detector. For consistency, outliers are assigned with higher anomaly scores. If label_parser is an attribute, then we execute the particular predict function Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. Sparse matrices are accepted only if they are supported by the base estimator. Returns ------- anomaly_scores : numpy array of shape (n_samples,) The anomaly score of the input samples. """ if "label_parser" in self.get_params().keys() and self.label_parser != None: return self.label_parser(X) else: try: if self.algorithm_name in ["deepCNN_LSTM", "DeepCNN_LSTM"]: return self.model.predict(X,y) else: return self.model.predict(X) except Exception as e: print(f"{self.algorithm_name}, predict():", str(e))
[docs] def evaluate(self, X_test, y_test, label_test=None): if self.model is None: raise ValueError("The model must be trained before evaluate.") if label_test is not None and label_test.size > 0: return evaluate_global_model(self.model, X_test, y_test, label_test) return self.model.evaluate(X_test, y_test)
[docs] def set_params(self, **params): """Set the parameters of this estimator. Returns ------- self : object """ super().set_params(**params) # Simple optimization to gain speed (inspect is slow) if not params: return self # Separate federated parameters (do not belong to the model) self.federated_params = { k: v for k, v in params.items() if k in ["n_clients", "n_rounds"] } # Model parameters (all other) model_params = { k: v for k, v in params.items() if k not in self.federated_params } print( f"Federated Params:{self.federated_params} \n Model Params:{model_params}" ) if self.algorithm_name not in flexanomalies_algorithms: raise ValueError(f"The algorithm '{self.algorithm_name}' is not defined.") # Obtain valid model parameters valid_params = self.get_default_params(**model_params) # Assign algorithm name setattr(self.algorithm_, "algorithm_", valid_params["algorithm_"]) # Verification of invalid parameters for the model for key, value in model_params.items(): if key not in valid_params: raise ValueError( f"Invalid parameter {key!r} for model {self.algorithm_name}. " f"Valid parameters are: {valid_params!r}." ) # Identify mandatory positional parameters of the model positional_params = {} init_signature = signature(self.algorithm_.__init__) for param_name, param in init_signature.parameters.items(): if param_name != "self": if param_name in model_params: positional_params[param_name] = model_params[param_name] # Init Model try: self.model = self.algorithm_(**positional_params) except Exception as e: print("Error instantiating the model:", str(e)) raise # Assign the remaining parameters to the model for key, value in model_params.items(): setattr(self.model, key, value) return self
def _initialize_pool(self, X_train=None, y_train=None): self.n_clients = self.federated_params["n_clients"] self.n_rounds = self.federated_params["n_rounds"] # If flex data is an attribute, then we use the federated data set given as attribute if hasattr(self, "flex_data") and self.flex_data is not None: print("Using external federated dataset.") flex_dataset = self.flex_data else: X, y = X_train, y_train flex_dataset = federate_data(self.n_clients, X, y) federated_ops = self.federated_functions.get(self.algorithm_name) if not federated_ops: raise ValueError( f"No federated functions defined for {self.algorithm_name}" ) # Define pool for clients and server self.pool = FlexPool.client_server_pool( fed_dataset=flex_dataset, server_id=f"{self.algorithm_name}_server", init_func=federated_ops["build_model"], model=self.model, )
[docs] def get_default_params(self, **params): """Get DEFAULT parameters for this estimator, params is used to configure positional parameters in order to obtain default parameters of the object. Returns ------- params : mapping of string to any Parameter names mapped to their values. """ # Separating federated parameters from model parameters federated_params = { k: v for k, v in params.items() if k in ["n_clients", "n_rounds"] } model_params = {k: v for k, v in params.items() if k not in federated_params} out = super().get_params() init_signature = signature(self.algorithm_.__init__) param_names = [ p.name for p in init_signature.parameters.values() if p.name != "self" and p.kind not in (p.VAR_KEYWORD, p.VAR_POSITIONAL) ] # Identify the positional parameters required for the model. positional_params = { p.name: model_params[p.name] for p in init_signature.parameters.values() if p.name in model_params } # Use the existing model or instantiate a new one if necessary. model_instance = self.model or self.algorithm_(**positional_params) # Obtain model parameters out["algorithm_"] = self.algorithm_name for key in param_names: out[key] = getattr(model_instance, key, None) return out
[docs] def get_params(self): """Get parameters for this estimator. Returns ------- params : mapping of string to any Parameter names mapped to their values. """ out = super().get_params() out["algorithm_"] = self.algorithm_name if not self.model: return out # If there is no model, return only the basics # Get parameters directly from the model if hasattr(self.model, "get_params"): out.update(self.model.get_params()) else: print( f"Warning: The model '{self.algorithm_name}' does not have a 'get_params' method." ) print("Inspecting model's attributes:") model_attributes = vars(self.model) # This returns the attribute dictionary for attr, value in model_attributes.items(): print(f"{attr}: {value}") out.update(model_attributes) return out