# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from math import sqrt
from typing import Any
import numpy as np
import numpy.typing as npt
import pandas as pd
from ax.benchmark.benchmark_step_runtime_function import TBenchmarkStepRuntimeFunction
from ax.benchmark.benchmark_test_function import BenchmarkTestFunction
from ax.benchmark.benchmark_trial_metadata import BenchmarkTrialMetadata
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.batch_trial import BatchTrial
from ax.core.runner import Runner
from ax.core.types import TParamValue
from ax.exceptions.core import UnsupportedError
from ax.runners.simulated_backend import SimulatedBackendRunner
from ax.utils.common.serialization import TClassDecoderRegistry, TDecoderRegistry
from ax.utils.testing.backend_simulator import BackendSimulator, BackendSimulatorOptions
from pyre_extensions import assert_is_instance
def _dict_of_arrays_to_df(
Y_true_by_arm: Mapping[str, npt.NDArray],
step_duration_by_arm: Mapping[str, float],
outcome_names: Sequence[str],
) -> pd.DataFrame:
"""
Return a DataFrame with columns
["metric_name", "arm_name", "Y_true", "step", and "virtual runtime"].
When the trial produces MapData, the "step" column is 0, 1, 2, ...., and
"virtual runtime" contains cumulative time for each element of the
progression. When the trial does not produce MapData, the "step" column is
just 0, and "virtual runtime" is the total runtime of the trial.
Args:
Y_true_by_arm: A mapping from arm name to a 2D arrays each with shape
(len(outcome_names), n_steps).
step_duration_by_arm: A mapping from arm name to a number representing
the runtime of each step.
outcome_names: The names of the outcomes; will be mapped to the first
dimension of each array in ``Y_true_by_arm``.
"""
df = pd.concat(
[
pd.DataFrame(
{
"metric_name": outcome_name,
"arm_name": arm_name,
"Y_true": y_true[i, :],
"step": np.arange(y_true.shape[1], dtype=int),
"virtual runtime": np.arange(1, y_true.shape[1] + 1, dtype=int)
* step_duration_by_arm[arm_name],
}
)
for i, outcome_name in enumerate(outcome_names)
for arm_name, y_true in Y_true_by_arm.items()
],
ignore_index=True,
)
return df
def _add_noise(
df: pd.DataFrame,
noise_stds: Mapping[str, float],
arm_weights: Mapping[str, float] | None,
) -> pd.DataFrame:
"""
For each ``Y_true`` in ``df``, with metric name ``metric_name`` and
arm name ``arm_name``, add noise with standard deviation
``noise_stds[metric_name] / sqrt_nlzd_arm_weights[arm_name]``,
where ``sqrt_nlzd_arm_weights = sqrt(arm_weights[arm_name] /
sum(arm_weights.values())])``.
Args:
df: A DataFrame with columns including
["metric_name", "arm_name", "Y_true"].
noise_stds: A mapping from metric name to what the standard
deviation would be if one arm received the entire
sample budget.
arm_weights: Either ``None`` if there is only one ``Arm``, or a
mapping from ``Arm`` name to the arm's allocation. Using arm
weights will increase noise levels, since each ``Arm`` is
assumed to receive a fraction of the total sample budget.
Returns:
The original ``df``, now with additional columns ["mean", "sem"].
"""
noiseless = all(v == 0 for v in noise_stds.values())
if not noiseless:
noise_std_ser = df["metric_name"].map(noise_stds)
if arm_weights is not None:
nlzd_arm_weights_sqrt = {
arm_name: sqrt(weight / sum(arm_weights.values()))
for arm_name, weight in arm_weights.items()
}
arm_weights_ser = df["arm_name"].map(nlzd_arm_weights_sqrt)
df["sem"] = noise_std_ser / arm_weights_ser
else:
df["sem"] = noise_std_ser
df["mean"] = df["Y_true"] + np.random.normal(loc=0, scale=df["sem"])
else:
df["sem"] = 0.0
df["mean"] = df["Y_true"]
return df
[docs]
def get_total_runtime(
trial: BaseTrial,
step_runtime_function: TBenchmarkStepRuntimeFunction | None,
n_steps: int,
) -> float:
"""Get the total runtime of a trial."""
# By default, each step takes 1 virtual second.
if step_runtime_function is not None:
max_step_runtime = max(
(step_runtime_function(arm.parameters) for arm in trial.arms)
)
else:
max_step_runtime = 1
return n_steps * max_step_runtime
[docs]
@dataclass(kw_only=True)
class BenchmarkRunner(Runner):
"""
A Runner that produces both observed and ground-truth values.
Observed values equal ground-truth values plus noise, with the noise added
according to the standard deviations returned by `get_noise_stds()`.
This runner does require that every benchmark has a ground truth, which
won't necessarily be true for real-world problems. Such problems fall into
two categories:
- If they are deterministic, they can be used with this runner by
viewing them as noiseless problems where the observed values are the
ground truth. The observed values will be used for tracking the
progress of optimization.
- If they are not deterministc, they are not supported. It is not
conceptually clear how to benchmark such problems, so we decided to
not over-engineer for that before such a use case arrives.
If ``max_concurrency`` is left as default (1), trials run serially and
complete immediately. Otherwise, a ``SimulatedBackendRunner`` is constructed
to track the status of trials.
Args:
test_function: A ``BenchmarkTestFunction`` from which to generate
deterministic data before adding noise.
noise_std: The standard deviation of the noise added to the data. Can be
a list or dict to be per-metric.
step_runtime_function: A function that takes in parameters
(in ``TParameterization`` format) and returns the runtime of a step.
max_concurrency: The maximum number of trials that can be running at a
given time. Typically, this is ``max_pending_trials`` from the
``scheduler_options`` on the ``BenchmarkMethod``.
"""
test_function: BenchmarkTestFunction
noise_std: float | Sequence[float] | Mapping[str, float] = 0.0
step_runtime_function: TBenchmarkStepRuntimeFunction | None = None
max_concurrency: int = 1
simulated_backend_runner: SimulatedBackendRunner | None = field(init=False)
def __post_init__(self) -> None:
if self.max_concurrency > 1:
simulator = BackendSimulator(
options=BackendSimulatorOptions(
max_concurrency=self.max_concurrency,
# Always use virtual rather than real time for benchmarking
internal_clock=0,
use_update_as_start_time=True,
),
)
self.simulated_backend_runner = SimulatedBackendRunner(
simulator=simulator,
sample_runtime_func=lambda trial: get_total_runtime(
trial=trial,
step_runtime_function=self.step_runtime_function,
n_steps=self.test_function.n_steps,
),
)
else:
self.simulated_backend_runner = None
@property
def outcome_names(self) -> Sequence[str]:
"""The names of the outcomes."""
return self.test_function.outcome_names
[docs]
def get_Y_true(self, params: Mapping[str, TParamValue]) -> npt.NDArray:
"""Evaluates the test problem.
Returns:
An array of ground truth (noiseless) evaluations, with shape
(len(outcome_names), n_intervals) if is_map is True, and
(len(outcome_names), 1) otherwise.
"""
result = np.atleast_1d(self.test_function.evaluate_true(params=params).numpy())
if result.ndim == 1:
return result[:, None]
return result
[docs]
def get_noise_stds(self) -> dict[str, float]:
noise_std = self.noise_std
if isinstance(noise_std, float):
return {name: noise_std for name in self.outcome_names}
elif isinstance(noise_std, dict):
if not set(noise_std.keys()) == set(self.outcome_names):
raise ValueError(
"Noise std must have keys equal to outcome names if given as "
"a dict."
)
return noise_std
# list of floats
return dict(
zip(self.outcome_names, assert_is_instance(noise_std, list), strict=True)
)
[docs]
def run(self, trial: BaseTrial) -> dict[str, BenchmarkTrialMetadata]:
"""Run the trial by evaluating its parameterization(s).
Args:
trial: The trial to evaluate.
Returns:
A dictionary {"benchmark_metadata": metadata}, where ``metadata`` is
a ``BenchmarkTrialMetadata``.
"""
Y_true_by_arm = {
arm.name: self.get_Y_true(arm.parameters) for arm in trial.arms
}
step_duration_by_arm = {
arm.name: 1
if self.step_runtime_function is None
else self.step_runtime_function(arm.parameters)
for arm in trial.arms
}
for arm_name, duration in step_duration_by_arm.items():
if duration < 0:
raise ValueError(
"Step duration must be non-negative for each arm. For arm "
f"{arm_name}, duration is {duration}."
)
df = _dict_of_arrays_to_df(
Y_true_by_arm=Y_true_by_arm,
step_duration_by_arm=step_duration_by_arm,
outcome_names=self.outcome_names,
)
arm_weights = (
{arm.name: w for arm, w in trial.arm_weights.items()}
if isinstance(trial, BatchTrial)
else None
)
df = _add_noise(
df=df, noise_stds=self.get_noise_stds(), arm_weights=arm_weights
)
df["trial_index"] = trial.index
df.drop(columns=["Y_true"], inplace=True)
if self.simulated_backend_runner is not None:
self.simulated_backend_runner.run(trial=trial)
dfs = {
outcome_name: df[df["metric_name"] == outcome_name]
for outcome_name in self.outcome_names
}
metadata = BenchmarkTrialMetadata(
dfs=dfs,
backend_simulator=None
if self.simulated_backend_runner is None
else self.simulated_backend_runner.simulator,
)
return {"benchmark_metadata": metadata}
[docs]
def poll_trial_status(
self, trials: Iterable[BaseTrial]
) -> dict[TrialStatus, set[int]]:
if self.simulated_backend_runner is None:
return {TrialStatus.COMPLETED: {t.index for t in trials}}
return self.simulated_backend_runner.poll_trial_status(trials=trials)
[docs]
@classmethod
# pyre-fixme [2]: Parameter `obj` must have a type other than `Any``
def serialize_init_args(cls, obj: Any) -> dict[str, Any]:
"""
It is tricky to use SerializationMixin with instances that have Ax
objects as attributes, as BenchmarkRunners do. Therefore, serialization
is not supported.
"""
raise UnsupportedError(
"serialize_init_args is not a supported method for BenchmarkRunners."
)
[docs]
@classmethod
def deserialize_init_args(
cls,
args: dict[str, Any],
decoder_registry: TDecoderRegistry | None = None,
class_decoder_registry: TClassDecoderRegistry | None = None,
) -> dict[str, Any]:
"""
It is tricky to use SerializationMixin with instances that have Ax
objects as attributes, as BenchmarkRunners do. Therefore, serialization
is not supported.
"""
raise UnsupportedError(
"deserialize_init_args is not a supported method for BenchmarkRunners."
)
[docs]
def stop(self, trial: BaseTrial, reason: str | None = None) -> dict[str, Any]:
if self.simulated_backend_runner is None:
raise UnsupportedError(
"stop() is not supported for a `BenchmarkRunner` without a "
"`simulated_backend_runner`, becauase trials complete "
"immediately."
)
return self.simulated_backend_runner.stop(trial=trial, reason=reason)