Source code for ax.benchmark.benchmark_runner

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import warnings
from collections.abc import Callable, Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from math import sqrt
from typing import Any

import numpy as np
import numpy.typing as npt
import pandas as pd

from ax.benchmark.benchmark_test_function import BenchmarkTestFunction
from ax.benchmark.benchmark_trial_metadata import BenchmarkTrialMetadata
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.batch_trial import BatchTrial
from ax.core.runner import Runner
from ax.core.types import TParamValue
from ax.exceptions.core import UnsupportedError
from ax.runners.simulated_backend import SimulatedBackendRunner
from ax.utils.common.serialization import TClassDecoderRegistry, TDecoderRegistry
from ax.utils.testing.backend_simulator import BackendSimulator, BackendSimulatorOptions
from pyre_extensions import assert_is_instance


def _dict_of_arrays_to_df(
    Y_true_by_arm: Mapping[str, npt.NDArray], outcome_names: Sequence[str]
) -> pd.DataFrame:
    """
    Return a DataFrame with columns ["metric_name", "arm_name",
    "Y_true", "t"].

    Args:
        Y_true_by_arm: A mapping from arm name to a 2D arrays each with shape
            (len(outcome_names), n_time_intervals).
        outcome_names: The names of the outcomes; will be mapped to the first
            dimension of each array in ``Y_true_by_arm``.
    """
    df = pd.concat(
        [
            pd.DataFrame(
                {
                    "metric_name": outcome_name,
                    "arm_name": arm_name,
                    "Y_true": y_true[i, :],
                    "t": np.arange(y_true.shape[1]),
                }
            )
            for i, outcome_name in enumerate(outcome_names)
            for arm_name, y_true in Y_true_by_arm.items()
        ],
        ignore_index=True,
    )
    return df


def _add_noise(
    df: pd.DataFrame,
    noise_stds: Mapping[str, float],
    arm_weights: Mapping[str, float] | None,
) -> pd.DataFrame:
    """
    For each ``Y_true`` in ``df``, with metric name ``metric_name`` and
    arm name ``arm_name``, add noise with standard deviation
    ``noise_stds[metric_name] / sqrt_nlzd_arm_weights[arm_name]``,
    where ``sqrt_nlzd_arm_weights = sqrt(arm_weights[arm_name] /
    sum(arm_weights.values())])``.

    Args:
        df: A DataFrame with columns including
            ["metric_name", "arm_name", "Y_true"].
        noise_stds: A mapping from metric name to what the standard
            deviation would be if one arm received the entire
            sample budget.
        arm_weights: Either ``None`` if there is only one ``Arm``, or a
            mapping from ``Arm`` name to the arm's allocation. Using arm
            weights will increase noise levels, since each ``Arm`` is
            assumed to receive a fraction of the total sample budget.

    Returns:
        The original ``df``, now with additional columns ["mean", "sem"].
    """
    noiseless = all(v == 0 for v in noise_stds.values())
    if not noiseless:
        noise_std_ser = df["metric_name"].map(noise_stds)
        if arm_weights is not None:
            nlzd_arm_weights_sqrt = {
                arm_name: sqrt(weight / sum(arm_weights.values()))
                for arm_name, weight in arm_weights.items()
            }
            arm_weights_ser = df["arm_name"].map(nlzd_arm_weights_sqrt)
            df["sem"] = noise_std_ser / arm_weights_ser

        else:
            df["sem"] = noise_std_ser

        df["mean"] = df["Y_true"] + np.random.normal(len(df)) * df["sem"]

    else:
        df["sem"] = 0.0
        df["mean"] = df["Y_true"]
    return df


[docs] @dataclass(kw_only=True) class BenchmarkRunner(Runner): """ A Runner that produces both observed and ground-truth values. Observed values equal ground-truth values plus noise, with the noise added according to the standard deviations returned by `get_noise_stds()`. This runner does require that every benchmark has a ground truth, which won't necessarily be true for real-world problems. Such problems fall into two categories: - If they are deterministic, they can be used with this runner by viewing them as noiseless problems where the observed values are the ground truth. The observed values will be used for tracking the progress of optimization. - If they are not deterministc, they are not supported. It is not conceptually clear how to benchmark such problems, so we decided to not over-engineer for that before such a use case arrives. If ``trial_runtime_func`` and ``max_concurrency`` are both left as default, trials run serially and complete immediately. Otherwise, a ``SimulatedBackendRunner`` is constructed to track the status of trials. Args: test_function: A ``BenchmarkTestFunction`` from which to generate deterministic data before adding noise. noise_std: The standard deviation of the noise added to the data. Can be a list or dict to be per-metric. trial_runtime_func: A callable that takes a trial and returns its runtime, in simulated seconds. If `None`, each trial completes in one simulated second. max_concurrency: The maximum number of trials that can be running at a given time. Typically, this is ``max_pending_trials`` from the ``scheduler_options`` on the ``BenchmarkMethod``. """ test_function: BenchmarkTestFunction noise_std: float | Sequence[float] | Mapping[str, float] = 0.0 trial_runtime_func: Callable[[BaseTrial], int] | None = None max_concurrency: int = 1 simulated_backend_runner: SimulatedBackendRunner | None = field(init=False) def __post_init__(self) -> None: if self.max_concurrency > 1: simulator = BackendSimulator( options=BackendSimulatorOptions( max_concurrency=self.max_concurrency, # Always use virtual rather than real time for benchmarking internal_clock=0, use_update_as_start_time=True, ), ) if self.trial_runtime_func is None: warnings.warn( "`trial_runtime_func` is not set and `max_concurrency` > 1." " Each trial will take one simulated second to run.", stacklevel=2, ) self.simulated_backend_runner = SimulatedBackendRunner( simulator=simulator, sample_runtime_func=self.trial_runtime_func if self.trial_runtime_func is not None else lambda _: 1, ) else: self.simulated_backend_runner = None @property def outcome_names(self) -> Sequence[str]: """The names of the outcomes.""" return self.test_function.outcome_names
[docs] def get_Y_true(self, params: Mapping[str, TParamValue]) -> npt.NDArray: """Evaluates the test problem. Returns: An array of ground truth (noiseless) evaluations, with shape (len(outcome_names), n_intervals) if is_map is True, and (len(outcome_names), 1) otherwise. """ result = np.atleast_1d(self.test_function.evaluate_true(params=params).numpy()) if result.ndim == 1: return result[:, None] return result
[docs] def get_noise_stds(self) -> dict[str, float]: noise_std = self.noise_std if isinstance(noise_std, float): return {name: noise_std for name in self.outcome_names} elif isinstance(noise_std, dict): if not set(noise_std.keys()) == set(self.outcome_names): raise ValueError( "Noise std must have keys equal to outcome names if given as " "a dict." ) return noise_std # list of floats return dict( zip(self.outcome_names, assert_is_instance(noise_std, list), strict=True) )
[docs] def run(self, trial: BaseTrial) -> dict[str, BenchmarkTrialMetadata]: """Run the trial by evaluating its parameterization(s). Args: trial: The trial to evaluate. Returns: A dictionary {"benchmark_metadata": metadata}, where ``metadata`` is a ``BenchmarkTrialMetadata``. """ Y_true_by_arm = { arm.name: self.get_Y_true(arm.parameters) for arm in trial.arms } df = _dict_of_arrays_to_df( Y_true_by_arm=Y_true_by_arm, outcome_names=self.outcome_names ) arm_weights = ( {arm.name: w for arm, w in trial.arm_weights.items()} if isinstance(trial, BatchTrial) else None ) df = _add_noise( df=df, noise_stds=self.get_noise_stds(), arm_weights=arm_weights ) df["trial_index"] = trial.index df.drop(columns=["Y_true"], inplace=True) if self.simulated_backend_runner is not None: self.simulated_backend_runner.run(trial=trial) dfs = { outcome_name: df[df["metric_name"] == outcome_name] for outcome_name in self.outcome_names } metadata = BenchmarkTrialMetadata( dfs=dfs, backend_simulator=None if self.simulated_backend_runner is None else self.simulated_backend_runner.simulator, ) return {"benchmark_metadata": metadata}
[docs] def poll_trial_status( self, trials: Iterable[BaseTrial] ) -> dict[TrialStatus, set[int]]: if self.simulated_backend_runner is None: return {TrialStatus.COMPLETED: {t.index for t in trials}} return self.simulated_backend_runner.poll_trial_status(trials=trials)
[docs] @classmethod # pyre-fixme [2]: Parameter `obj` must have a type other than `Any`` def serialize_init_args(cls, obj: Any) -> dict[str, Any]: """ It is tricky to use SerializationMixin with instances that have Ax objects as attributes, as BenchmarkRunners do. Therefore, serialization is not supported. """ raise UnsupportedError( "serialize_init_args is not a supported method for BenchmarkRunners." )
[docs] @classmethod def deserialize_init_args( cls, args: dict[str, Any], decoder_registry: TDecoderRegistry | None = None, class_decoder_registry: TClassDecoderRegistry | None = None, ) -> dict[str, Any]: """ It is tricky to use SerializationMixin with instances that have Ax objects as attributes, as BenchmarkRunners do. Therefore, serialization is not supported. """ raise UnsupportedError( "deserialize_init_args is not a supported method for BenchmarkRunners." )
[docs] def stop(self, trial: BaseTrial, reason: str | None = None) -> dict[str, Any]: if self.simulated_backend_runner is None: raise UnsupportedError( "stop() is not supported for a `BenchmarkRunner` without a " "`simulated_backend_runner`, becauase trials complete " "immediately." ) return self.simulated_backend_runner.stop(trial=trial, reason=reason)