Source code for RADAR.visualization_module

"""
Visualization Module

"""

import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap


[docs] class DataVisualization: def __init__( self, data, plot_technique="scatter", dim_reduction_technique=None, n_components=2, y_true=None, y_pred=None, color_map=None, point_size=5, opacity=0.8, heatmap_color="magma", subset_size_percent=0.2, **plot_kwargs ): """ A class for data visualization with dimensionality reduction options and customizable plots. Parameters: - data: np.array, input data - plot_technique: str, visualization technique ('scatter', 'line', 'hist', 'boxplot', 'heatmap', 'prediction_forecasting','anomaly_labels',plot_anomaly) - dim_reduction_technique: str, dimensionality reduction method ('PCA', 't-SNE', 'UMAP', None) - n_components: int, number of components for dimensionality reduction - y_true: np.array, true values for prediction visualization - y_pred: np.array, predicted values for prediction visualization - color_map: str/list, color scheme or custom color list - point_size: int, size of points (only for scatter plots) - opacity: float, opacity of points (0 to 1) - heatmap_color: str, color scheme for the heatmap (e.g., 'viridis', 'plasma', 'cividis') - subset_size_percent, proportion of the original data set that is selected - plot_kwargs: additional arguments for customizing plots """ self.data = np.array(data) self.plot_technique = plot_technique self.dim_reduction_technique = dim_reduction_technique self.n_components = n_components self.y_true = np.array(y_true) if y_true is not None else None self.y_pred = np.array(y_pred) if y_pred is not None else None self.reduced_data = None self.color_map = color_map self.point_size = point_size self.opacity = opacity self.heatmap_color = ( heatmap_color # Allows customization of the heatmap color scheme ) self.subset_size_percent = subset_size_percent self.plot_kwargs = plot_kwargs # Stores extra arguments for plot customization assert self.dim_reduction_technique in [ None, "PCA", "t-SNE", "UMAP", ], "Invalid dimensionality reduction technique."
[docs] def fit(self): """Applies dimensionality reduction if needed.""" if self.dim_reduction_technique is None: self.reduced_data = self.data return if self.dim_reduction_technique == "PCA": reducer = PCA(n_components=self.n_components) elif self.dim_reduction_technique == "t-SNE": reducer = TSNE(n_components=self.n_components) elif self.dim_reduction_technique == "UMAP": reducer = umap.UMAP(n_components=self.n_components) self.reduced_data = reducer.fit_transform(self.data)
[docs] def transform(self): """Returns the transformed data.""" return self.reduced_data
[docs] def show(self): """Generates and displays the data visualization.""" plot_methods = { "prediction_forecasting": self._plot_prediction_forecasting, "anomaly_labels": self._plot_anomaly_labels, "plot_anomaly": self._plot_anomaly, "scatter": self._plot_scatter, "line": self._plot_line, "hist": self._plot_hist, "boxplot": self._plot_boxplot, "heatmap": self._plot_heatmap, } plot_method = plot_methods.get(self.plot_technique) if plot_method: self.fig = plot_method() # Save the generated Plotly Figure else: raise ValueError("Unrecognized plotting technique.")
[docs] def to_json(self): """Returns the plot as a JSON string (for API usage).""" plot_methods = { "prediction_forecasting": self._plot_prediction_forecasting, "anomaly_labels": self._plot_anomaly_labels, "plot_anomaly": self._plot_anomaly, "scatter": self._plot_scatter, "line": self._plot_line, "hist": self._plot_hist, "boxplot": self._plot_boxplot, "heatmap": self._plot_heatmap, } plot_method = plot_methods.get(self.plot_technique) if plot_method: fig = plot_method() return fig.to_json() else: raise ValueError("Unrecognized plotting technique.")
def _plot_prediction_forecasting(self): """ Visualizes the comparison between true values and predicted values. This method creates a line plot with two traces: 1. True values (`y_true`) represented as solid lines. 2. Predicted values (`y_pred`) represented as dashed lines. Raises: ValueError: If either 'y_true' or 'y_pred' is None. """ if self.y_true is None or self.y_pred is None: raise ValueError( "Both 'y_true' and 'y_pred' are required for prediction visualization." ) fig = go.Figure() fig.add_trace( go.Scatter( x=np.arange(len(self.y_true)), y=self.y_true, mode="lines", name="True Values", ) ) fig.add_trace( go.Scatter( x=np.arange(len(self.y_pred)), y=self.y_pred, mode="lines", name="Predictions", line=dict(dash="dash"), ) ) fig.update_layout( title="Comparison of True and Predicted Values", xaxis_title="Index", yaxis_title="Value", width=800, height=500, ) fig.show() return fig def _plot_anomaly_labels(self): """ Visualizes the comparison between true anomaly labels and predicted anomaly labels. This method creates a scatter plot with two sets of markers: 1. True labels (`y_true`) represented by blue circles. 2. Predicted labels (`y_pred`) represented by red crosses. Raises: ValueError: If either 'y_true' or 'y_pred' is None. """ if self.y_true is None or self.y_pred is None: raise ValueError( "Both 'y_true' and 'y_pred' are required for prediction visualization." ) fig = go.Figure() fig.add_trace( go.Scatter( x=np.arange(len(self.y_true)), y=self.y_true, mode="markers", name="True Labels", marker=dict(color="blue", symbol="circle", size=10, opacity=0.7), ) ) fig.add_trace( go.Scatter( x=np.arange(len(self.y_pred)), y=self.y_pred, mode="markers", name="Predicted Labels", marker=dict(color="red", symbol="x", size=8, opacity=0.8), ) ) fig.update_layout( title="Comparison of True and Predicted Labels", xaxis_title="Index", yaxis_title="Label (0 or 1)", yaxis=dict(tickvals=[0, 1], ticktext=["0", "1"]), width=800, height=500, ) fig.show() return fig def _plot_anomaly(self): """ Visualizes anomalies in a 2D space by comparing true and predicted labels. This method creates a 2D scatter plot of reduced data, with color coding: 1. True normal values (0) are shown in blue. 2. True anomalies (1) are shown in red. 3. Incorrect predictions are shown in orange. It randomly selects a subset of the data for visualization. Raises: ValueError: If either 'reduced_data', 'y_true', or 'y_pred' is None. """ if self.reduced_data is None: raise ValueError("You must run 'fit()' before 'show()'.") if self.y_true is None or self.y_pred is None: raise ValueError( "Both 'y_true' and 'y_pred' are required for anomaly visualization." ) subset_size = int(self.subset_size_percent * len(self.data)) subset_indices = np.random.choice(len(self.data), subset_size, replace=False) y_true_subset = self.y_true[subset_indices] y_pred_subset = self.y_pred[subset_indices] X_test_2D = self.reduced_data[subset_indices] colors = np.array( [ "rgba(0, 0, 255, 0.6)" if label == 0 else "rgba(255, 0, 0, 0.6)" for label in y_true_subset ] ) edge_colors = np.where( y_true_subset == y_pred_subset, colors, "rgba(255, 165, 0, 0.8)" ) fig = go.Figure() fig.add_trace( go.Scatter( x=X_test_2D[:, 0], y=X_test_2D[:, 1], mode="markers", marker=dict( color=colors, size=12, line=dict(color=edge_colors, width=2) ), text=["True label: " + str(label) for label in y_true_subset], hoverinfo="text", ) ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker=dict( color="rgba(0, 0, 255, 0.6)", size=12, line=dict(color="rgba(0, 0, 255, 0.6)", width=2), ), name="Normal (y_true=0)", showlegend=True, ) ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker=dict( color="rgba(255, 0, 0, 0.6)", size=12, line=dict(color="rgba(255, 0, 0, 0.6)", width=2), ), name="Anomaly (y_true=1)", showlegend=True, ) ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker=dict( color="rgba(255, 165, 0, 0.8)", size=12, line=dict(color="rgba(255, 165, 0, 0.8)", width=2), ), name="Incorrect Prediction", showlegend=True, ) ) # Legend fig.update_layout( title="Visualization of Anomalies", xaxis_title="Principle Component 1", yaxis_title="Principle Component 2", showlegend=True, legend=dict( title="Legend", font=dict(size=12), # Legend font size itemsizing="constant", # fixed size of legend icons bordercolor="Black", # Black border for the legend borderwidth=2.0, # border width of the legend border orientation="h", # Legend in horizontal format x=0.5, # Center the legend on X-axis xanchor="center", # Legend in the center ), width=900, height=700, ) fig.show() return fig def _plot_scatter(self): """ Creates a scatter plot to visualize the 2D reduced data. This method plots the data points based on the first two principal components, with optional color mapping and size adjustments. Raises: ValueError: If 'reduced_data' is None. """ if self.reduced_data is None: raise ValueError("You must run 'fit()' before 'show()'.") color = ( self.color_map if isinstance(self.color_map, (list, np.ndarray)) else None ) fig = px.scatter( x=self.reduced_data[:, 0], y=self.reduced_data[:, 1], title="Data Visualization (Scatter)", labels={"x": "Component 1", "y": "Component 2"}, color=color, opacity=self.opacity, size=[self.point_size] * len(self.reduced_data), **self.plot_kwargs ) fig.show() return fig def _plot_line(self): """ Creates a line plot to visualize the first component of the reduced data. This method plots the values of the first principal component over the indices of the data. Raises: ValueError: If 'reduced_data' is None. """ if self.reduced_data is None: raise ValueError("You must run 'fit()' before 'show()'.") fig = px.line( x=np.arange(len(self.reduced_data[:, 0])), y=self.reduced_data[:, 0], title="Data Visualization (Line)", labels={"x": "Index", "y": "Value"}, **self.plot_kwargs ) fig.show() return fig def _plot_hist(self): """ Creates a histogram to visualize the distribution of the reduced data. """ fig = px.histogram( self.reduced_data.flatten(), nbins=30, title="Data Histogram", labels={"value": "Value"}, **self.plot_kwargs ) fig.show() return fig def _plot_boxplot(self): fig = px.box( self.reduced_data, title="Box Plot", labels={"value": "Value"}, **self.plot_kwargs ) fig.show() return fig def _plot_heatmap(self): """ Creates a heatmap to visualize the intensity of values in the reduced data. This method plots a heatmap using the reduced data with a specified color scale. """ fig = go.Figure( data=go.Heatmap(z=self.reduced_data, colorscale=self.heatmap_color) ) fig.update_layout( title="Data Heatmap", width=800, height=500, **self.plot_kwargs ) fig.show() return fig
[docs] class DataVisualizationScoresTS: """ Class for visualizing anomalies in temporal data from scores. """ def __init__(self, scores): """ Initializes the display with the scores. Parameters ---------- scores : array-like List or array with the anomaly scores. """ self.scores = np.array(scores) self.x = np.arange(len(scores))
[docs] def to_json(self): """Returns the plot as a JSON string (for API usage).""" fig = self.visualize() return fig.to_json()
[docs] def visualize(self, method="percentile", threshold=0.95, top_k=None) -> go.Figure: """ Visualize anomalies from anomaly scores without ground-truth labels. Parameters: ----------- scores : array-like List or array with anomaly scores. method : str, optional Method to determine the threshold. Options: - "percentile" -> use the given percentile in 'threshold' - "std" -> mean + threshold*std - "topk" -> mark the 'top_k' highest scores threshold : float, optional Value used as percentile (0-1) or number of standard deviations. E.g., 0.95 (95th percentile) or 3 (mean + 3*std). top_k : int, optional If method="topk", number of top anomalies to mark. """ scores = self.scores x = self.x if method == "percentile": threshold_value = np.quantile(scores, threshold) anomalies = scores > threshold_value elif method == "std": threshold_value = np.mean(scores) + threshold * np.std(scores) anomalies = scores > threshold_value elif method == "topk" and top_k is not None: idx_top = np.argsort(scores)[-top_k:] anomalies = np.zeros_like(scores, dtype=bool) anomalies[idx_top] = True threshold_value = scores[idx_top].min() else: raise ValueError("Invalid method or missing 'top_k'.") # Gráfico interactivo con Plotly fig = go.Figure() # Línea de scores fig.add_trace( go.Scatter( x=x, y=scores, mode="lines+markers", name="Scores", line=dict(color="blue"), marker=dict(size=6), ) ) # Anomalías en rojo fig.add_trace( go.Scatter( x=x[anomalies], y=scores[anomalies], mode="markers", name="Anomalies", marker=dict(color="red", size=10, symbol="circle"), ) ) # Línea de umbral fig.add_trace( go.Scatter( x=[x[0], x[-1]], y=[threshold_value, threshold_value], mode="lines", name=f"Threshold ({method})", line=dict(color="orange", dash="dash"), ) ) fig.update_layout( title="Anomaly Detection Based on Scores", xaxis_title="Index", yaxis_title="Anomaly Score", template="plotly_white", legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01), ) return fig