Skip to content

Data generation

sample_curves(dataset_specification, f=None, w0=None, random_state=2024, measurement_scale=None, callback=None)

Samples synthetic curves given a dataset specification.

Parameters:

Name Type Description Default
dataset_specification dict

A dataset specification which contains

required
f Callable

The function to fit the curves. Use this parameter if no function is specified

None
w0 ndarray

The inital guess for the optimization problem used to synthesize curves.

None
random_state int

The random state for reproducablity.

2024
measurement_scale float

The scale for the noise applied on the evaluated curves. If not

None

Returns:

Type Description
ndarray

The coefficients for each sampled curve.

list[LatentInformation]

The latent information for each sampled curve.

ndarray

The evaluated sampled curves.

Source code in driftbench/data_generation/sample.py
def sample_curves(
    dataset_specification,
    f=None,
    w0=None,
    random_state=2024,
    measurement_scale=None,
    callback=None,
):
    """
    Samples synthetic curves given a dataset specification.

    Args:
        dataset_specification (dict): A dataset specification which contains
        all information to syntethisize curves in yaml-format.
        Each dataset is encoded with a name and needs a latent information provided.
        The function `f` to fit and as well as initial guess `w0`can be provided as well.
        f (Callable): The function to fit the curves. Use this parameter if no function is specified
        in `dataset_specification`.
        w0 (np.ndarray): The inital guess for the optimization problem used to synthesize curves.
        Use this parameter if no initial guess is specified in `dataset_specification`.
        random_state (int): The random state for reproducablity.
        measurement_scale (float): The scale for the noise applied on the evaluated curves. If not
        set, 5% percent of the mean of the curves is used. Set to 0.0 if you want to omit
        this noise.

    Returns:
        (np.ndarray): The coefficients for each sampled curve.
        (list[LatentInformation]): The latent information for each sampled curve.
        (np.ndarray): The evaluated sampled curves.
    """
    dimensions = dataset_specification["dimensions"]
    drifts = dataset_specification.get("drifts")
    x_scale = dataset_specification.get("x_scale", 0.02)
    y_scale = dataset_specification.get("y_scale", 0.1)
    func = _get_func(dataset_specification, f)
    w_init = _get_w_init(dataset_specification, w0)
    rng = np.random.RandomState(random_state)
    latent_information = _generate_latent_information(
        dataset_specification, rng, x_scale, y_scale
    )
    if drifts is not None:
        latent_information = drifts.apply(latent_information)
    data_generator = CurveGenerator(func, w_init)
    w = data_generator.run(latent_information, callback=callback)
    x_range = np.concatenate(
        (
            dataset_specification["latent_information"].x0,
            dataset_specification["latent_information"].x1,
            dataset_specification["latent_information"].x2,
        )
    )
    x_min, x_max = int(np.min(x_range)), int(np.max(x_range))
    xs = np.linspace(x_min, x_max, dimensions)
    curves = np.array([func(w_i, xs) for w_i in w])
    # Apply a default noise of 5% of the mean of the sampled curves
    if measurement_scale is None:
        scale = 0.05 * np.mean(curves)
        curves = rng.normal(loc=curves, scale=scale)
    else:
        curves = rng.normal(loc=curves, scale=measurement_scale)
    return w, latent_information, curves

Drift

Represents a drift for 1d or 2d input.

Source code in driftbench/data_generation/drifts.py
class Drift(metaclass=ABCMeta):
    """
    Represents a drift for 1d or 2d input.
    """

    def __init__(self, start, end, feature=None, dimension=0) -> None:
        """
        Args:
            start (int): The start index.
            end (int): The end index.
            feature (str): The feature the drift should be applied on.
            dimension (int): The dimension the drift should be applied on.
        """
        self._validate_drift_bounds(start, end)
        self.start = start
        self.end = end
        self.feature = feature
        self.dimension = dimension

    def _validate_drift_bounds(self, start, end):
        if start >= end:
            raise ValueError("End must be greater than start.")
        if start < 0 or end < 0:
            raise ValueError("Drift bounds are not allowed to be negative.")

    @abstractmethod
    def transform(self, X):
        """
        Applies the transformation specified by the drift object on the given input
        Args:
            X (numpy.ndarray): The 1d- or 2d-input data to be drifted.
        """
        pass

__init__(start, end, feature=None, dimension=0)

Parameters:

Name Type Description Default
start int

The start index.

required
end int

The end index.

required
feature str

The feature the drift should be applied on.

None
dimension int

The dimension the drift should be applied on.

0
Source code in driftbench/data_generation/drifts.py
def __init__(self, start, end, feature=None, dimension=0) -> None:
    """
    Args:
        start (int): The start index.
        end (int): The end index.
        feature (str): The feature the drift should be applied on.
        dimension (int): The dimension the drift should be applied on.
    """
    self._validate_drift_bounds(start, end)
    self.start = start
    self.end = end
    self.feature = feature
    self.dimension = dimension

transform(X) abstractmethod

Applies the transformation specified by the drift object on the given input Args: X (numpy.ndarray): The 1d- or 2d-input data to be drifted.

Source code in driftbench/data_generation/drifts.py
@abstractmethod
def transform(self, X):
    """
    Applies the transformation specified by the drift object on the given input
    Args:
        X (numpy.ndarray): The 1d- or 2d-input data to be drifted.
    """
    pass

DriftSequence

Represents a sequence of drifts, which will be applied on a latent information object.

Source code in driftbench/data_generation/drifts.py
class DriftSequence:
    """
    Represents a sequence of drifts, which will be applied on a latent information object.
    """

    def __init__(self, drifts):
        """
        Args:
            drifts (list[Drift]): A list of drifts which are being used for the transformation.
        """
        self._validate_drifts(drifts)
        self.drifts = sorted(drifts, key=lambda drift: drift.start)

    def apply(self, X):
        """
        Applies the transformation by the given drifts on the latent information input.

        Args:
            X (list[LatentInformation]): The list of latent information the drifts are applied on.

        Returns:
            (list[LatentInformation]): A list of drifted latent information according to the drift sequence.
        """
        drifted = copy.deepcopy(X)
        for drift in self.drifts:
            feature = np.array([getattr(x, drift.feature) for x in drifted])
            feature[:, drift.dimension] = drift.transform(
                feature[:, drift.dimension]
            ).flatten()
            for i, x in enumerate(drifted):
                setattr(x, drift.feature, feature[i])
        return drifted

    def get_aggregated_drift_bounds(self):
        """
        Returns the aggregated drift bounds, i.e. the maximum range where drifts are applied.

        Returns:
            (tuple[int, int]): A tuple of (int, int), where the first value denotes the start
            index and the second value the end index of the aggregated drift bounds.
        """
        start = self.drifts[0].start
        end = self.drifts[-1].end
        return start, end

    def get_individual_drift_bounds(self):
        """
        Returns the drift bounds for each individual drift in the drift sequence.

        Returns:
            (list[tuple[int, int]]): A list of tuples of (int, int), where the first value denotes
            the start of the drift, and the second value the end of the drift.
        """
        return [(drift.start, drift.end) for drift in self.drifts]

    def get_drift_intensities(self):
        """
        Returns the intensities for each range in the drift sequence. Each drift has a base intensity of 1,
        and when multiple drifts overlap, the intensity becomes the number of the drifts present in the given
        range.

        Returns:
            (dict[tuple[int, int], int]): A dictionary with tuples as keys and ints as values.
            The keys indicate the range of the drift intensity, and the values indicate the intensity.
        """
        intensities = {}
        drift_intensities_array = np.zeros(
            (len(self.drifts), np.max([drift.end for drift in self.drifts]) + 1)
        )
        for i, drift in enumerate(self.drifts):
            drift_intensities_array[i, drift.start : drift.end + 1] = 1
        stacked_drift_intensities = np.sum(drift_intensities_array, axis=0)

        for intensity in range(1, np.max(stacked_drift_intensities).astype(int) + 1):
            indices = np.where(stacked_drift_intensities == intensity)[0]
            split_indices = np.where(np.diff(indices) > 1)[0] + 1
            bounds = np.split(indices, split_indices)
            for start, end in [(bound[0], bound[-1]) for bound in bounds]:
                intensities[(start, end)] = intensity
        return intensities

    def _validate_drifts(self, drifts):
        # Group drifts by their feature and their dimension they apply on.
        drifts_sorted = sorted(
            drifts, key=lambda drift: (drift.feature, drift.dimension)
        )
        drifts_grouped = groupby(
            drifts_sorted, key=lambda drift: (drift.feature, drift.dimension)
        )
        # Check within these groups if an overlap exists.
        for (feature, dimension), curr_drifts in drifts_grouped:
            curr_drifts = list(curr_drifts)
            for i, j in combinations(range(len(curr_drifts)), 2):
                drift1 = curr_drifts[i]
                drift2 = curr_drifts[j]
                if drift1.start <= drift2.end and drift2.start <= drift1.end:
                    raise ValueError(
                        f"Drifts are not allowed to overlap. "
                        f"Overlapping drift at feature {feature} in dimension {dimension}"
                    )

__init__(drifts)

Parameters:

Name Type Description Default
drifts list[Drift]

A list of drifts which are being used for the transformation.

required
Source code in driftbench/data_generation/drifts.py
def __init__(self, drifts):
    """
    Args:
        drifts (list[Drift]): A list of drifts which are being used for the transformation.
    """
    self._validate_drifts(drifts)
    self.drifts = sorted(drifts, key=lambda drift: drift.start)

apply(X)

Applies the transformation by the given drifts on the latent information input.

Parameters:

Name Type Description Default
X list[LatentInformation]

The list of latent information the drifts are applied on.

required

Returns:

Type Description
list[LatentInformation]

A list of drifted latent information according to the drift sequence.

Source code in driftbench/data_generation/drifts.py
def apply(self, X):
    """
    Applies the transformation by the given drifts on the latent information input.

    Args:
        X (list[LatentInformation]): The list of latent information the drifts are applied on.

    Returns:
        (list[LatentInformation]): A list of drifted latent information according to the drift sequence.
    """
    drifted = copy.deepcopy(X)
    for drift in self.drifts:
        feature = np.array([getattr(x, drift.feature) for x in drifted])
        feature[:, drift.dimension] = drift.transform(
            feature[:, drift.dimension]
        ).flatten()
        for i, x in enumerate(drifted):
            setattr(x, drift.feature, feature[i])
    return drifted

get_aggregated_drift_bounds()

Returns the aggregated drift bounds, i.e. the maximum range where drifts are applied.

Returns:

Type Description
tuple[int, int]

A tuple of (int, int), where the first value denotes the start

index and the second value the end index of the aggregated drift bounds.

Source code in driftbench/data_generation/drifts.py
def get_aggregated_drift_bounds(self):
    """
    Returns the aggregated drift bounds, i.e. the maximum range where drifts are applied.

    Returns:
        (tuple[int, int]): A tuple of (int, int), where the first value denotes the start
        index and the second value the end index of the aggregated drift bounds.
    """
    start = self.drifts[0].start
    end = self.drifts[-1].end
    return start, end

get_drift_intensities()

Returns the intensities for each range in the drift sequence. Each drift has a base intensity of 1, and when multiple drifts overlap, the intensity becomes the number of the drifts present in the given range.

Returns:

Type Description
dict[tuple[int, int], int]

A dictionary with tuples as keys and ints as values.

The keys indicate the range of the drift intensity, and the values indicate the intensity.

Source code in driftbench/data_generation/drifts.py
def get_drift_intensities(self):
    """
    Returns the intensities for each range in the drift sequence. Each drift has a base intensity of 1,
    and when multiple drifts overlap, the intensity becomes the number of the drifts present in the given
    range.

    Returns:
        (dict[tuple[int, int], int]): A dictionary with tuples as keys and ints as values.
        The keys indicate the range of the drift intensity, and the values indicate the intensity.
    """
    intensities = {}
    drift_intensities_array = np.zeros(
        (len(self.drifts), np.max([drift.end for drift in self.drifts]) + 1)
    )
    for i, drift in enumerate(self.drifts):
        drift_intensities_array[i, drift.start : drift.end + 1] = 1
    stacked_drift_intensities = np.sum(drift_intensities_array, axis=0)

    for intensity in range(1, np.max(stacked_drift_intensities).astype(int) + 1):
        indices = np.where(stacked_drift_intensities == intensity)[0]
        split_indices = np.where(np.diff(indices) > 1)[0] + 1
        bounds = np.split(indices, split_indices)
        for start, end in [(bound[0], bound[-1]) for bound in bounds]:
            intensities[(start, end)] = intensity
    return intensities

get_individual_drift_bounds()

Returns the drift bounds for each individual drift in the drift sequence.

Returns:

Type Description
list[tuple[int, int]]

A list of tuples of (int, int), where the first value denotes

the start of the drift, and the second value the end of the drift.

Source code in driftbench/data_generation/drifts.py
def get_individual_drift_bounds(self):
    """
    Returns the drift bounds for each individual drift in the drift sequence.

    Returns:
        (list[tuple[int, int]]): A list of tuples of (int, int), where the first value denotes
        the start of the drift, and the second value the end of the drift.
    """
    return [(drift.start, drift.end) for drift in self.drifts]

LinearDrift

Bases: Drift

Represents a linear drift for a 1d or 2d-input, i.e. a drift where the input data is drifted in a linear fashion.

Source code in driftbench/data_generation/drifts.py
class LinearDrift(Drift):
    """
    Represents a linear drift for a 1d or 2d-input, i.e. a drift
    where the input data is drifted in a linear fashion.
    """

    def __init__(self, start, end, m, feature=None, dimension=0):
        """
        Args:
            start (int): The start index.
            end (int): The end index.
            m (float): The slope of the linear drift. Usually in the range (-1, 1)
            feature (str): The feature the drift should be applied on.
            dimension (int): The dimension the drift should be applied on.
        """
        super().__init__(start, end, feature=feature, dimension=dimension)
        self.m = m

    def transform(self, X):
        drifted = np.copy(X).astype(float)
        if drifted.ndim == 1:
            drifted = drifted.reshape(-1, 1)
        # Use 0 based x indices for computing the slope at a given position
        xs = np.arange(self.end - self.start + 1).reshape(-1, 1)
        drifted[self.start : self.end + 1, :] += self.m * xs
        # Maintain data according to new data after drift happened.
        after_drift_idx = drifted.shape[0] - self.end
        drifted[-after_drift_idx + 1 :, :] += self.m * xs[-1]
        return drifted

__init__(start, end, m, feature=None, dimension=0)

Parameters:

Name Type Description Default
start int

The start index.

required
end int

The end index.

required
m float

The slope of the linear drift. Usually in the range (-1, 1)

required
feature str

The feature the drift should be applied on.

None
dimension int

The dimension the drift should be applied on.

0
Source code in driftbench/data_generation/drifts.py
def __init__(self, start, end, m, feature=None, dimension=0):
    """
    Args:
        start (int): The start index.
        end (int): The end index.
        m (float): The slope of the linear drift. Usually in the range (-1, 1)
        feature (str): The feature the drift should be applied on.
        dimension (int): The dimension the drift should be applied on.
    """
    super().__init__(start, end, feature=feature, dimension=dimension)
    self.m = m

JaxCurveGenerationSolver

Bases: Solver

Fits latent information according to a given function.

Source code in driftbench/data_generation/solvers.py
class JaxCurveGenerationSolver(Solver):
    """
    Fits latent information according to a given function.
    """

    def __init__(self, f, w0, max_fit_attemps):
        """
        Args:
            f (Callable): The function.
            w0 (list-like): The initial guess for the solution.
            max_fit_attemps (int): The maxmium number of attempts to refit a curve, if optimization didn't succeed.
            random_seed (int): The random seed for the random number generator.
        """
        self.f = jit(vmap(partial(f), in_axes=(None, 0)))
        df_dx = grad(f, argnums=1)
        df_dx2 = grad(df_dx, argnums=1)
        self.df_dx = jit(vmap(partial(df_dx), in_axes=(None, 0)))
        self.df_dx2 = jit(vmap(partial(df_dx2), in_axes=(None, 0)))
        self.w0 = jnp.array(w0)
        self.max_fit_attempts = max_fit_attemps

    def solve(self, X, callback=None):
        coefficients = []
        solution = self.w0
        for i, latent in enumerate(X):
            result = _minimize(
                self.f,
                self.df_dx,
                self.df_dx2,
                solution,
                latent.y0,
                latent.x0,
                latent.y1,
                latent.x1,
                latent.y2,
                latent.x2,
            )
            if not result.success:
                result = self._refit(self.f, self.df_dx, self.df_dx2, latent)
            solution = result.x
            if callback:
                jax.debug.callback(callback, i, solution)
            coefficients.append(solution)
        return jnp.array(coefficients)

    def _refit(self, p, dp_dx, dp_dx2, latent):
        # Restart with initial guess in order to be independent of previous solutions.
        solution = self.w0
        current_fit_attempts = 0
        success = False
        result = None
        # Fallback strategy: If fit is not successful, try again and use previous solution
        # for the same problem as starting point until convergence.
        while not success and current_fit_attempts < self.max_fit_attempts:
            current_fit_attempts += 1
            result = _minimize(
                p,
                dp_dx,
                dp_dx2,
                solution,
                latent.y0,
                latent.x0,
                latent.y1,
                latent.x1,
                latent.y2,
                latent.x2,
            )
            solution = result.x
            success = result.success
        return result

__init__(f, w0, max_fit_attemps)

Parameters:

Name Type Description Default
f Callable

The function.

required
w0 list - like

The initial guess for the solution.

required
max_fit_attemps int

The maxmium number of attempts to refit a curve, if optimization didn't succeed.

required
random_seed int

The random seed for the random number generator.

required
Source code in driftbench/data_generation/solvers.py
def __init__(self, f, w0, max_fit_attemps):
    """
    Args:
        f (Callable): The function.
        w0 (list-like): The initial guess for the solution.
        max_fit_attemps (int): The maxmium number of attempts to refit a curve, if optimization didn't succeed.
        random_seed (int): The random seed for the random number generator.
    """
    self.f = jit(vmap(partial(f), in_axes=(None, 0)))
    df_dx = grad(f, argnums=1)
    df_dx2 = grad(df_dx, argnums=1)
    self.df_dx = jit(vmap(partial(df_dx), in_axes=(None, 0)))
    self.df_dx2 = jit(vmap(partial(df_dx2), in_axes=(None, 0)))
    self.w0 = jnp.array(w0)
    self.max_fit_attempts = max_fit_attemps

Solver

Represents a backend for solving an optimization problem.

Source code in driftbench/data_generation/solvers.py
class Solver(metaclass=ABCMeta):
    """
    Represents a backend for solving an optimization problem.
    """

    @abstractmethod
    def solve(self, X):
        """
        Solves an optimization problem defined by the solver.

        Args:
            X (list-like): Input to optimize according to solver instance.

        Returns:
            (np.ndarray|jnp.ndarray): The parameters obtained by solving the optimzation problem.
        """
        pass

solve(X) abstractmethod

Solves an optimization problem defined by the solver.

Parameters:

Name Type Description Default
X list - like

Input to optimize according to solver instance.

required

Returns:

Type Description
ndarray | ndarray

The parameters obtained by solving the optimzation problem.

Source code in driftbench/data_generation/solvers.py
@abstractmethod
def solve(self, X):
    """
    Solves an optimization problem defined by the solver.

    Args:
        X (list-like): Input to optimize according to solver instance.

    Returns:
        (np.ndarray|jnp.ndarray): The parameters obtained by solving the optimzation problem.
    """
    pass

LatentInformation dataclass

Represents the local latent information for high-dimensional object, which is used to generate such high-dimensional data. Currently, this structure is designed for creating curves meeting the conditions provided by the attributes defined in this class.

Parameters:

Name Type Description Default
y0 list - like

The y-values of a function.

required
x0 list - like

The x-values of a function. Hence, no duplicates are allowed.

required
y1 list - like

The y-values of the derivative of a function.

required
x1 list - like

The x-values of the derivative of a function.

required
y2 list - like

The y-values of the derivative of a function.

required
x2 list - like

The x-values of the second derivative of a function.

required
Source code in driftbench/data_generation/latent_information.py
@dataclass
class LatentInformation:
    """
    Represents the local latent information for high-dimensional object,
    which is used to generate such high-dimensional data. Currently, this
    structure is designed for creating curves meeting the conditions provided
    by the attributes defined in this class.

    Args:
        y0 (list-like): The y-values of a function.
        x0 (list-like): The x-values of a function. Hence, no duplicates are allowed.
        y1 (list-like): The y-values of the derivative of a function.
        x1 (list-like): The x-values of the derivative of a function.
        Hence, no duplicates are allowed.
        y2 (list-like): The y-values of the derivative of a function.
        x2 (list-like): The x-values of the second derivative of a function.
        Hence, no duplicates are allowed.
    """

    y0: np.ndarray
    x0: np.ndarray
    y1: np.ndarray
    x1: np.ndarray
    y2: np.ndarray
    x2: np.ndarray

    def __post_init__(self):
        self._validate_duplicates()
        self._validate_1d_array()
        self._validate_matching_shapes()

    def _validate_matching_shapes(self):
        if self.y0.shape != self.x0.shape:
            raise ValueError(
                "Features y0 and x0 are not allowed to have different shape"
            )
        if self.y1.shape != self.x1.shape:
            raise ValueError(
                "Features y1 and x1 are not allowed to have different shape"
            )
        if self.y2.shape != self.x2.shape:
            raise ValueError(
                "Features y2 and x2 are not allowed to have different shape"
            )

    def _validate_1d_array(self):

        if self.y0.ndim != 1:
            raise ValueError("Feature y0 has to be 1d-array.")
        if self.x0.ndim != 1:
            raise ValueError("Feature x0 has to be 1d-array.")
        if self.y1.ndim != 1:
            raise ValueError("Feature y1 has to be 1d-array.")
        if self.x1.ndim != 1:
            raise ValueError("Feature x1 has to be 1d-array.")
        if self.y2.ndim != 1:
            raise ValueError("Feature y2 has to be 1d-array.")
        if self.x2.ndim != 1:
            raise ValueError("Feature x2 has to be 1d-array.")

    def _validate_duplicates(self):
        _, x0_counts = np.unique(self.x0, return_counts=True)
        _, x1_counts = np.unique(self.x1, return_counts=True)
        _, x2_counts = np.unique(self.x2, return_counts=True)
        if np.any(x0_counts > 1):
            raise ValueError("Feature x0 is not allowed to contain duplicates.")
        if np.any(x1_counts > 1):
            raise ValueError("Feature x1 is not allowed to contain duplicates.")
        if np.any(x2_counts > 1):
            raise ValueError("Feature x2 is not allowed to contain duplicates.")

Drift detection

AggregateFeatureAlgorithm

Bases: Detector

Detector that aggregates features over temporal axis.

Source code in driftbench/drift_detection/detectors.py
class AggregateFeatureAlgorithm(Detector):
    """Detector that aggregates features over temporal axis.
    """
    def __init__(self, agg_feature_func=None, algorithm=None):
        self.algorithm = algorithm

        if agg_feature_func is None:
            agg_feature_func = np.mean
        self.agg_feature_func = agg_feature_func

    def predict(self, X):
        input_with_feature = np.apply_along_axis(self.agg_feature_func, 1, X)
        return self.algorithm.predict(input_with_feature)

    @property
    def name(self):
        return self.algorithm.name

AlwaysGuessDriftDetector

Bases: Detector

A baseline detector.

Source code in driftbench/drift_detection/detectors.py
class AlwaysGuessDriftDetector(Detector):
    """A baseline detector."""
    def predict(self, X):
        return np.ones(X.shape[0])

AutoencoderDetector

Bases: Detector, Module

Parameters:

Name Type Description Default
hidden_layers list

List of number of neurons in each layer after input of encoder

required
retrain_always bool

If true, model is always retrained when predict is called.

False
Source code in driftbench/drift_detection/detectors.py
class AutoencoderDetector(Detector, nn.Module):
    """

    Args:
        hidden_layers (list): List of number of neurons in each layer after input of encoder
        retrain_always (bool): If true, model is always retrained when predict is called.
    """
    _activation_functions = {
        "relu": nn.ReLU(),
        "sigmoid": nn.Sigmoid(),
        "tanh": nn.Tanh(),
    }

    _optimizers = {
        "adam": optim.Adam,
        "sgd": optim.SGD,
    }

    _device = "cuda:0" if torch.cuda.is_available() else "cpu"

    hparams = ['activation', 'lr', 'optim', 'num_epochs', 'hidden_layers']

    def __init__(self, hidden_layers, detector, activation='tanh', num_epochs=10,
                 batch_size=32, optim="adam", lr=0.001, retrain_always=False):
        Detector.__init__(self)
        nn.Module.__init__(self)

        self.lr = lr
        self.retrain = retrain_always
        self.activation = activation
        self.detector = detector
        self.optim = optim
        self.criterion = nn.MSELoss()
        self.num_epochs = num_epochs
        self.batch_size = batch_size
        self.losses = []
        self.hidden_layers = hidden_layers
        self.is_trained = False
        self.scaler = MinMaxScaler(feature_range=(-1, 1))

    def forward(self, x):
        latent_space = self.encoder(x)
        reconstructed_x = self.decoder(latent_space)
        return reconstructed_x

    def _build_model(self, input_size):
        encoder_layers = [input_size] + self.hidden_layers
        decoder_layers = self.hidden_layers[::-1] + [input_size]
        self.encoder = self._build_encoder(encoder_layers, self.activation)
        self.decoder = self._build_decoder(decoder_layers, self.activation)
        self.optimizer = self._optimizers[self.optim](self.parameters(), lr=self.lr)
        self.to(self._device)

    def _train(self, X):
        # Reset losses from possible previous training
        self.losses = []
        # Actual training loop
        dataset = torch.utils.data.TensorDataset(X)
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
        for epoch in range(self.num_epochs):
            for i, inputs in enumerate(dataloader):
                self.optimizer.zero_grad()
                inputs = inputs[0]
                outputs = self(inputs)
                loss = self.criterion(outputs, inputs)
                self.losses.append(loss.item())
                loss.backward()
                self.optimizer.step()
        self.is_trained = True

    def _build_encoder(self, layers, activation):
        encoder_layers = []
        for i in range(1, len(layers)):
            encoder_layers.append(nn.Linear(layers[i - 1], layers[i]))
            encoder_layers.append(self._activation_functions[activation])
        return nn.Sequential(*encoder_layers)

    def _build_decoder(self, layers, activation):
        decoder_layers = []
        for i in range(1, len(layers)):
            decoder_layers.append(nn.Linear(layers[i - 1], layers[i]))
            # Don't append activation function for output layer
            if i < len(layers) - 1:
                decoder_layers.append(self._activation_functions[activation])
        return nn.Sequential(*decoder_layers)

    def _prepare_data(self, X):
        """
        Scales (if necessary) the data and places it afterwards on device
        """
        if self.retrain or not self.is_trained:
            X = self.scaler.fit_transform(X)
        else:
            X = self.scaler.transform(X)
        return torch.tensor(X, dtype=torch.float32).to(self._device)

    def predict(self, X):
        X = self._prepare_data(X)

        if self.retrain or not self.is_trained:
            self._build_model(X.shape[1])
            self._train(X)

        with torch.no_grad():
            latent_space = self.encoder(X).detach().cpu().numpy()
        return self.detector.predict(latent_space)

ClusterDetector

Bases: Detector

Cluster based drift detector.

Source code in driftbench/drift_detection/detectors.py
class ClusterDetector(Detector):
    """Cluster based drift detector."""
    supported_methods = ["kmeans", "gaussian mixture"]

    hparams = ['method', 'n_centers']

    def __init__(self, n_centers, method="kmeans", random_state=42):
        if not self._validate_cluster_method(method):
            raise ValueError(
                f"Unknown method {method}: Supported cluster methods are {ClusterDetector.supported_methods}.")
        self.n_centers = n_centers
        self.method = method
        self.random_state = random_state

    def _validate_cluster_method(self, method):
        return method in ClusterDetector.supported_methods

    def predict(self, X):
        warnings.simplefilter("ignore")
        if self.method == "kmeans":
            return np.min(KMeans(n_clusters=self.n_centers, random_state=self.random_state).fit_transform(X), axis=1)
        elif self.method == "gaussian mixture":
            gm = GaussianMixture(n_components=self.n_centers, random_state=self.random_state)
            gm.fit(X)
            return -1.0 * gm.score_samples(X)

Detector

Detector base class.

Source code in driftbench/drift_detection/detectors.py
class Detector(metaclass=ABCMeta):
    """Detector base class."""
    hparams = []

    @abstractmethod
    def predict(self, X):
        pass

    def evaluate(self, X, y, metric):
        if not isinstance(metric, Metric):
            raise ValueError("The metric has to be an instance of the metric class.")
        prediction = self.predict(X)
        return metric(prediction, y)

    @property
    def name(self):
        return self.__class__.__name__

    def get_hparams(self):
        return {
            hp: getattr(self, hp) for hp in self.hparams
        }

MMDDetector

Bases: Detector

Implementation of MMD algorithm as drift detector based on the Maximum Mean Discrepancy as defined in Arthur Gretton, Karsten M Borgwardt, Malte J Rasch, Bernhard Schölkopf, and Alexander Smola. A kernel two-sample test. Journal of Machine Learning Research, 13(Mar):723–773, 2012. This implementation is based on the blog post of Onur Tunali in https://www.onurtunali.com/ml/2019/03/08/maximum-mean-discrepancy-in-machine-learning.html

Source code in driftbench/drift_detection/detectors.py
class MMDDetector(Detector):
    """
    Implementation of MMD algorithm as drift detector based on the Maximum Mean Discrepancy as defined in
    Arthur Gretton, Karsten M Borgwardt, Malte J Rasch, Bernhard Schölkopf, and Alexander Smola.
    A kernel two-sample test.
    Journal of Machine Learning Research, 13(Mar):723–773, 2012.
    This implementation is based on the blog post of Onur Tunali in
    https://www.onurtunali.com/ml/2019/03/08/maximum-mean-discrepancy-in-machine-learning.html
    """
    _device = "cuda:0" if torch.cuda.is_available() else "cpu"

    def __init__(self, window_size, stat_size, offset, kernel="multiscale"):
        self.window_size = window_size
        self.stat_size = stat_size
        self.offset = offset
        self.kernel = kernel

    def _mmd_score(self, P, Q, kernel):
        P = torch.from_numpy(P).to(self._device)
        Q = torch.from_numpy(Q).to(self._device)
        xx, yy, zz = torch.mm(P, P.t()), torch.mm(Q, Q.t()), torch.mm(P, Q.t())
        rx = (xx.diag().unsqueeze(0).expand_as(xx))
        ry = (yy.diag().unsqueeze(0).expand_as(yy))

        dxx = rx.t() + rx - 2. * xx  # Used for A in (1)
        dyy = ry.t() + ry - 2. * yy  # Used for B in (1)
        dxy = rx.t() + ry - 2. * zz  # Used for C in (1)

        XX, YY, XY = (torch.zeros(xx.shape).to(self._device),
                      torch.zeros(xx.shape).to(self._device),
                      torch.zeros(xx.shape).to(self._device))

        if kernel == "multiscale":

            bandwidth_range = [0.2, 0.5, 0.9, 1.3]
            for a in bandwidth_range:
                XX += a ** 2 * (a ** 2 + dxx) ** -1
                YY += a ** 2 * (a ** 2 + dyy) ** -1
                XY += a ** 2 * (a ** 2 + dxy) ** -1

        if kernel == "rbf":

            bandwidth_range = [10, 15, 20, 50]
            for a in bandwidth_range:
                XX += torch.exp(-0.5 * dxx / a)
                YY += torch.exp(-0.5 * dyy / a)
                XY += torch.exp(-0.5 * dxy / a)

        return torch.mean(XX + YY - 2. * XY)

    def predict(self, X):
        N = X.shape[0]
        prediction = np.full(N, np.nan)
        # Store last calculated score for the data batch containing not enough data.
        last_score = 1.
        for i in range(N - self.stat_size + 1):
            # Break if data window doesn't have enough data for next window anymore
            if i + self.offset + self.window_size > N:
                break
            stat_batch = X[i:i + self.stat_size]
            data_batch = X[i + self.offset :i + self.offset + self.window_size]
            score = self._mmd_score(stat_batch, data_batch, kernel=self.kernel)
            prediction[i + self.window_size - 1] = score.detach().cpu().item()
            last_score = score
        prediction = np.nan_to_num(prediction, nan=last_score.detach().cpu().item())
        return prediction

RandomGuessDetector

Bases: Detector

A random guess detector.

Source code in driftbench/drift_detection/detectors.py
class RandomGuessDetector(Detector):
    """A random guess detector."""
    def __init__(self, random_seed=42):
        self.rng = np.random.RandomState(random_seed)

    def predict(self, X):
        N = X.shape[0]
        scores = self.rng.normal(size=N)
        scores = np.cumsum(scores)
        return scores

RollingMeanDifferenceDetector

Bases: Detector

Calculates the maximum value over a rolling mean across time and returns the absolute difference between subsequent steps.

Source code in driftbench/drift_detection/detectors.py
class RollingMeanDifferenceDetector(Detector):
    """Calculates the maximum value over a rolling mean across time and returns the absolute
    difference between subsequent steps."""

    hparams = ['window_size', 'center', 'fillna_strategy']

    def __init__(self, window_size, center=False, fillna_strategy=None):
        self.window_size = window_size
        self.center = center
        self.fillna_strategy = fillna_strategy

    def predict(self, X):
        series_data = pd.DataFrame(X)
        prediction = series_data.rolling(self.window_size, center=self.center).mean().max(axis=1).diff().abs()
        if self.fillna_strategy:
            fill_value = self.fillna_strategy(prediction)
            prediction = prediction.fillna(fill_value)
        return prediction.values

RollingMeanStandardDeviationDetector

Bases: Detector

Detector that applies a rolling mean followed by a rolling standard deviation and returns the result as the drift score.

Source code in driftbench/drift_detection/detectors.py
class RollingMeanStandardDeviationDetector(Detector):
    """Detector that applies a rolling mean followed by a rolling 
    standard deviation and returns the result as the drift score."""
    def __init__(self, window_size, center=False, fillna_strategy=None):
        self.window_size = window_size
        self.center = center
        self.fillna_strategy = fillna_strategy

    def predict(self, X):
        df_data = pd.DataFrame(X)
        prediction = df_data.rolling(self.window_size, center=self.center).mean().max(axis=1).rolling(
            self.window_size).std()
        if self.fillna_strategy:
            fill_value = self.fillna_strategy(prediction)
            prediction = prediction.fillna(fill_value)
        return prediction.values

SlidingKSWINDetector

Bases: Detector

Detector based on KS-test.

Source code in driftbench/drift_detection/detectors.py
class SlidingKSWINDetector(Detector):
    """Detector based on KS-test."""
    def __init__(self, window_size, stat_size, offset):
        self.window_size = window_size
        self.stat_size = stat_size
        self.offset = offset

    def predict(self, X):
        N = X.shape[0]
        prediction = np.ones((N,))
        stat_batches = np.array([X[i:i + self.stat_size] for i in range(N - self.stat_size + 1)])
        data_batches = np.array([X[i:i + self.window_size] for i in range(self.offset, N - self.window_size + 1)])
        num_batches = np.min([stat_batches.shape[0], data_batches.shape[0]])
        # Store last calculated score for the data batch containing not enough data.
        last_score = 1.
        for i in range(num_batches):
            stat_batch, data_batch = stat_batches[i], data_batches[i]
            _, p_value = ks_2samp(stat_batch, data_batch, method="auto")
            score = np.log1p(1.0 / p_value)
            prediction[i + self.window_size - 1] = score
            last_score = score
        prediction[:self.window_size] = last_score
        return prediction

The metrics module.

AUC

Bases: Metric

The area under the curve.

Source code in driftbench/drift_detection/metrics.py
class AUC(Metric):
    """The area under the curve."""

    def __call__(self, prediction, target):
        prediction = np.nan_to_num(prediction, 0)
        return metrics.roc_auc_score(target, prediction)

SoftTAUC

Bases: Metric

A softened version of the TAUC.

Source code in driftbench/drift_detection/metrics.py
class SoftTAUC(Metric):
    """A softened version of the TAUC."""
    _supported_integration_rules = ["step", "trapez"]

    def __init__(self, rule="step"):
        self.rule = rule

    def __call__(self, prediction, targets, return_scores=False):
        overlap_score = SoftOverlapScore()
        thresholds = np.unique(prediction)
        thresholds = np.append(thresholds, np.inf)
        thresholds.sort()
        scores = np.zeros((thresholds.shape[0], 2))
        for i, threshold in enumerate(thresholds):
            bin_predictions = (prediction >= threshold).astype(int)
            fpr = (bin_predictions[targets == 0] == 1).sum() / (targets == 0).sum()
            scores[i] = [overlap_score(bin_predictions, targets), fpr]
        os, fpr = scores[:, 0], scores[:, 1]

        if return_scores:
            return thresholds, fpr, os

        if self.rule == "trapez":
            return metrics.auc(fpr, os)
        elif self.rule == "step":
            return np.sum(np.diff(fpr[::-1]) * os[::-1][:-1])

TemporalAUC

Bases: Metric

The temporal area under the curve.

Source code in driftbench/drift_detection/metrics.py
class TemporalAUC(Metric):
    """The temporal area under the curve."""
    _supported_integration_rules = ["step", "trapez"]

    def __init__(self, rule="step"):
        if not self._is_valid_rule(rule):
            raise ValueError(
                f"Unknown rule {rule}: Supported integration rules are {TemporalAUC._supported_integration_rules}.")
        self.rule = rule

    def _is_valid_rule(self, rule):
        return rule in TemporalAUC._supported_integration_rules

    def __call__(self, prediction, targets, return_scores=False):
        overlap_score = OverlapScore()
        thresholds = np.unique(prediction)
        thresholds = np.append(thresholds, np.inf)
        thresholds.sort()
        scores = np.zeros((thresholds.shape[0], 2))
        for i, threshold in enumerate(thresholds):
            bin_predictions = (prediction >= threshold).astype(int)
            fpr = (bin_predictions[targets == 0] == 1).sum() / (targets == 0).sum()
            scores[i] = [overlap_score(bin_predictions, targets), fpr]
        os, fpr = scores[:, 0], scores[:, 1]

        if return_scores:
            return thresholds, fpr, os

        if self.rule == "trapez":
            return metrics.auc(fpr, os)
        elif self.rule == "step":
            return np.sum(np.diff(fpr[::-1]) * os[::-1][:-1])

    @property
    def name(self):
        return f'TAUC-{self.rule}'

Benchmarks

Dataset

Represents a container class for a dataset specification for benchmarking purposes.

Source code in driftbench/benchmarks/data.py
class Dataset:
    """
    Represents a container class for a dataset specification for benchmarking purposes.
    """

    def __init__(self, name, spec, f=None, w0=None, n_variations=5):
        """
        Args:
            name (str): The name of the dataset specification.
            spec (dict): The yaml-specification of the dataset.
            f (Callable): The function to fit the curves.
            w0 (np.ndarray): The inital value for the internal parameters.
            n_variations (int): The number of variations each dataset is sampled.
            Each dataset is sampled as many times as `n_variations` is set, each time with a
            different random seed.
        """
        self.spec = spec
        self.name = name
        self.n_variations = n_variations
        self.w0 = w0
        self.f = f

        drift_bounds = self.spec["drifts"].get_individual_drift_bounds()
        self.Y = transform_drift_segments_into_binary(drift_bounds, self.spec["N"])

    def _generate(self, random_state):
        _, _, curves = sample_curves(
            dataset_specification=self.spec,
            f=self.f,
            w0=self.w0,
            random_state=random_state,
        )
        return curves

    def __iter__(self):
        for i in range(self.n_variations):
            X = self._generate(random_state=i)
            yield i, X, self.Y

__init__(name, spec, f=None, w0=None, n_variations=5)

Parameters:

Name Type Description Default
name str

The name of the dataset specification.

required
spec dict

The yaml-specification of the dataset.

required
f Callable

The function to fit the curves.

None
w0 ndarray

The inital value for the internal parameters.

None
n_variations int

The number of variations each dataset is sampled.

5
Source code in driftbench/benchmarks/data.py
def __init__(self, name, spec, f=None, w0=None, n_variations=5):
    """
    Args:
        name (str): The name of the dataset specification.
        spec (dict): The yaml-specification of the dataset.
        f (Callable): The function to fit the curves.
        w0 (np.ndarray): The inital value for the internal parameters.
        n_variations (int): The number of variations each dataset is sampled.
        Each dataset is sampled as many times as `n_variations` is set, each time with a
        different random seed.
    """
    self.spec = spec
    self.name = name
    self.n_variations = n_variations
    self.w0 = w0
    self.f = f

    drift_bounds = self.spec["drifts"].get_individual_drift_bounds()
    self.Y = transform_drift_segments_into_binary(drift_bounds, self.spec["N"])