#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from functools import partial
from logging import Logger
import numpy as np
import torch
from ax.core.experiment import Experiment
from ax.core.map_data import MapData
from ax.core.objective import ScalarizedObjective
from ax.core.optimization_config import (
MultiObjectiveOptimizationConfig,
OptimizationConfig,
)
from ax.core.types import TModelPredictArm, TParameterization
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.modelbridge.modelbridge_utils import (
extract_objective_thresholds,
extract_objective_weights,
extract_outcome_constraints,
observed_hypervolume,
predicted_hypervolume,
validate_and_apply_final_transform,
)
from ax.modelbridge.registry import get_model_from_generator_run, ModelRegistryBase
from ax.modelbridge.torch import TorchModelBridge
from ax.modelbridge.transforms.derelativize import Derelativize
from ax.models.torch.botorch_moo_defaults import (
get_outcome_constraint_transforms,
get_weighted_mc_objective_and_objective_thresholds,
)
from ax.plot.pareto_utils import get_tensor_converter_model
from ax.service.utils import best_point as best_point_utils
from ax.service.utils.best_point import (
extract_Y_from_data,
fill_missing_thresholds_from_nadir,
)
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import checked_cast
from botorch.utils.multi_objective.box_decompositions import DominatedPartitioning
from pyre_extensions import none_throws
logger: Logger = get_logger(__name__)
NUM_BINS_PER_TRIAL = 3
[docs]
class BestPointMixin(metaclass=ABCMeta):
[docs]
@abstractmethod
def get_best_trial(
self,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> tuple[int, TParameterization, TModelPredictArm | None] | None:
"""Identifies the best parameterization tried in the experiment so far.
First attempts to do so with the model used in optimization and
its corresponding predictions if available. Falls back to the best raw
objective based on the data fetched from the experiment.
NOTE: ``TModelPredictArm`` is of the form:
({metric_name: mean}, {metric_name_1: {metric_name_2: cov_1_2}})
Args:
optimization_config: Optimization config to use in place of the one stored
on the experiment.
trial_indices: Indices of trials for which to retrieve data. If None will
retrieve data from all available trials.
use_model_predictions: Whether to extract the best point using
model predictions or directly observed values. If ``True``,
the metric means and covariances in this method's output will
also be based on model predictions and may differ from the
observed values.
Returns:
Tuple of trial index, parameterization and model predictions for it.
"""
pass
[docs]
def get_best_parameters(
self,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> tuple[TParameterization, TModelPredictArm | None] | None:
"""Identifies the best parameterization tried in the experiment so far.
First attempts to do so with the model used in optimization and
its corresponding predictions if available. Falls back to the best raw
objective based on the data fetched from the experiment.
NOTE: ``TModelPredictArm`` is of the form:
({metric_name: mean}, {metric_name_1: {metric_name_2: cov_1_2}})
Args:
optimization_config: Optimization config to use in place of the one stored
on the experiment.
trial_indices: Indices of trials for which to retrieve data. If None will
retrieve data from all available trials.
use_model_predictions: Whether to extract the best point using
model predictions or directly observed values. If ``True``,
the metric means and covariances in this method's output will
also be based on model predictions and may differ from the
observed values.
Returns:
Tuple of parameterization and model predictions for it.
"""
res = self.get_best_trial(
optimization_config=optimization_config,
trial_indices=trial_indices,
use_model_predictions=use_model_predictions,
)
if res is None:
return res
_, parameterization, vals = res
return parameterization, vals
[docs]
@abstractmethod
def get_pareto_optimal_parameters(
self,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> dict[int, tuple[TParameterization, TModelPredictArm]]:
"""Identifies the best parameterizations tried in the experiment so far,
using model predictions if ``use_model_predictions`` is true and using
observed values from the experiment otherwise. By default, uses model
predictions to account for observation noise.
NOTE: The format of this method's output is as follows:
{ trial_index --> (parameterization, (means, covariances) }, where means
are a dictionary of form { metric_name --> metric_mean } and covariances
are a nested dictionary of form
{ one_metric_name --> { another_metric_name: covariance } }.
Args:
optimization_config: Optimization config to use in place of the one stored
on the experiment.
trial_indices: Indices of trials for which to retrieve data. If None will
retrieve data from all available trials.
use_model_predictions: Whether to extract the Pareto frontier using
model predictions or directly observed values. If ``True``,
the metric means and covariances in this method's output will
also be based on model predictions and may differ from the
observed values.
Returns:
A mapping from trial index to the tuple of:
- the parameterization of the arm in that trial,
- two-item tuple of metric means dictionary and covariance matrix
(model-predicted if ``use_model_predictions=True`` and observed
otherwise).
Raises a `NotImplementedError` if extracting the Pareto frontier is
not possible. Note that the returned dict may be empty.
"""
pass
[docs]
@abstractmethod
def get_hypervolume(
self,
optimization_config: MultiObjectiveOptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> float:
"""Calculate hypervolume of a pareto frontier based on either the posterior
means of given observation features or observed data.
Args:
optimization_config: Optimization config to use in place of the one stored
on the experiment.
trial_indices: Indices of trials for which to retrieve data. If None will
retrieve data from all available trials.
use_model_predictions: Whether to extract the Pareto frontier using
model predictions or directly observed values. If ``True``,
the metric means and covariances in this method's output will
also be based on model predictions and may differ from the
observed values.
"""
pass
[docs]
@abstractmethod
def get_trace(
optimization_config: OptimizationConfig | None = None,
) -> list[float]:
"""Get the optimization trace of the given experiment.
The output is equivalent to calling `_get_hypervolume` or `_get_best_trial`
repeatedly, with an increasing sequence of `trial_indices` and with
`use_model_predictions = False`, though this does it more efficiently.
Args:
experiment: The experiment to get the trace for.
optimization_config: An optional optimization config to use for computing
the trace. This allows computing the traces under different objectives
or constraints without having to modify the experiment.
Returns:
A list of observed hypervolumes or best values.
"""
pass
[docs]
@abstractmethod
def get_trace_by_progression(
optimization_config: OptimizationConfig | None = None,
bins: list[float] | None = None,
final_progression_only: bool = False,
) -> tuple[list[float], list[float]]:
"""Get the optimization trace with respect to trial progressions instead of
`trial_indices` (which is the behavior used in `get_trace`). Note that this
method does not take into account the parallelism of trials and essentially
assumes that trials are run one after another, in the sense that it considers
the total number of progressions "used" at the end of trial k to be the
cumulative progressions "used" in trials 0,...,k. This method assumes that the
final value of a particular trial is used and does not take the best value
of a trial over its progressions.
The best observed value is computed at each value in `bins` (see below for
details). If `bins` is not supplied, the method defaults to a heuristic of
approximately `NUM_BINS_PER_TRIAL` per trial, where each trial is assumed to
run until maximum progression (inferred from the data).
Args:
experiment: The experiment to get the trace for.
optimization_config: An optional optimization config to use for computing
the trace. This allows computing the traces under different objectives
or constraints without having to modify the experiment.
bins: A list progression values at which to calculate the best observed
value. The best observed value at bins[i] is defined as the value
observed in trials 0,...,j where j = largest trial such that the total
progression in trials 0,...,j is less than bins[i].
final_progression_only: If True, considers the value of the last step to be
the value of the trial. If False, considers the best along the curve to
be the value of the trial.
Returns:
A tuple containing (1) the list of observed hypervolumes or best values and
(2) a list of associated x-values (i.e., progressions) useful for plotting.
"""
pass
@staticmethod
def _get_best_trial(
experiment: Experiment,
generation_strategy: GenerationStrategy,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> tuple[int, TParameterization, TModelPredictArm | None] | None:
optimization_config = optimization_config or none_throws(
experiment.optimization_config
)
if optimization_config.is_moo_problem:
raise NotImplementedError(
"Please use `get_pareto_optimal_parameters` for multi-objective "
"problems."
)
# TODO[drfreund]: Find a way to include data for last trial in the
# calculation of best parameters.
if use_model_predictions:
current_model = generation_strategy._curr.model_spec_to_gen_from.model_enum
# Cover for the case where source of `self._curr.model` was not a `Models`
# enum but a factory function, in which case we cannot do
# `get_model_from_generator_run` (since we don't have model type and inputs
# recorded on the generator run.
models_enum = (
current_model.__class__
if isinstance(current_model, ModelRegistryBase)
else None
)
if models_enum is not None:
res = best_point_utils.get_best_parameters_from_model_predictions_with_trial_index( # noqa
experiment=experiment,
models_enum=models_enum,
optimization_config=optimization_config,
trial_indices=trial_indices,
)
if res is not None:
return res
return best_point_utils.get_best_by_raw_objective_with_trial_index(
experiment=experiment,
optimization_config=optimization_config,
trial_indices=trial_indices,
)
@staticmethod
def _get_best_observed_value(
experiment: Experiment,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
) -> float | None:
"""Identifies the best objective value observed in the experiment
among the trials indicated by `trial_indices`.
Args:
experiment: The experiment to get the best objective value for.
optimization_config: Optimization config to use in place of the one stored
on the experiment.
trial_indices: Indices of trials for which to retrieve data. If None will
retrieve data from all available trials.
Returns:
The best objective value so far.
"""
if optimization_config is None:
optimization_config = none_throws(experiment.optimization_config)
if optimization_config.is_moo_problem:
raise NotImplementedError(
"Please use `get_hypervolume` for multi-objective problems."
)
res = best_point_utils.get_best_by_raw_objective_with_trial_index(
experiment=experiment,
optimization_config=optimization_config,
trial_indices=trial_indices,
)
predictions = res[2] if res is not None else None
if predictions is None:
return None
means = none_throws(predictions)[0]
objective = optimization_config.objective
if isinstance(objective, ScalarizedObjective):
value = 0
for metric, weight in objective.metric_weights:
value += means[metric.name] * weight
return value
else:
name = objective.metric_names[0]
return means[name]
@staticmethod
def _get_pareto_optimal_parameters(
experiment: Experiment,
generation_strategy: GenerationStrategy,
optimization_config: OptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> dict[int, tuple[TParameterization, TModelPredictArm]]:
optimization_config = optimization_config or none_throws(
experiment.optimization_config
)
if not optimization_config.is_moo_problem:
raise NotImplementedError(
"Please use `get_best_parameters` for single-objective problems."
)
return best_point_utils.get_pareto_optimal_parameters(
experiment=experiment,
generation_strategy=generation_strategy,
optimization_config=optimization_config,
trial_indices=trial_indices,
use_model_predictions=use_model_predictions,
)
@staticmethod
def _get_hypervolume(
experiment: Experiment,
generation_strategy: GenerationStrategy,
optimization_config: MultiObjectiveOptimizationConfig | None = None,
trial_indices: Iterable[int] | None = None,
use_model_predictions: bool = True,
) -> float:
data = experiment.lookup_data()
if len(data.df) == 0:
return 0.0
moo_optimization_config = checked_cast(
MultiObjectiveOptimizationConfig,
optimization_config or experiment.optimization_config,
)
if use_model_predictions:
current_model = generation_strategy._curr.model_spec_to_gen_from.model_enum
# Cover for the case where source of `self._curr.model` was not a `Models`
# enum but a factory function, in which case we cannot do
# `get_model_from_generator_run` (since we don't have model type and inputs
# recorded on the generator run.
models_enum = (
current_model.__class__
if isinstance(current_model, ModelRegistryBase)
else None
)
if models_enum is None:
raise ValueError(
f"Model {current_model} is not in the ModelRegistry, cannot "
"calculate predicted hypervolume."
)
model = get_model_from_generator_run(
generator_run=none_throws(generation_strategy.last_generator_run),
experiment=experiment,
data=experiment.fetch_data(trial_indices=trial_indices),
models_enum=models_enum,
)
if not isinstance(model, TorchModelBridge):
raise ValueError(
f"Model {current_model} is not of type TorchModelBridge, cannot "
"calculate predicted hypervolume."
)
return predicted_hypervolume(
modelbridge=model, optimization_config=optimization_config
)
minimal_model = get_tensor_converter_model(
experiment=experiment,
data=experiment.lookup_data(trial_indices=trial_indices),
)
return observed_hypervolume(
modelbridge=minimal_model, optimization_config=moo_optimization_config
)
@staticmethod
def _get_trace(
experiment: Experiment,
optimization_config: OptimizationConfig | None = None,
) -> list[float]:
"""Compute the optimization trace at each iteration.
Given an experiment and an optimization config, compute the performance
at each iteration. For multi-objective, the performance is computed as
the hypervolume. For single objective, the performance is computed as
the best observed objective value.
An iteration here refers to a completed or early-stopped (batch) trial.
There will be one performance metric in the trace for each iteration.
Args:
experiment: The experiment to get the trace for.
optimization_config: Optimization config to use in place of the one
stored on the experiment.
Returns:
A list of performance values at each iteration.
"""
optimization_config = optimization_config or none_throws(
experiment.optimization_config
)
# Get the names of the metrics in optimization config.
metric_names = set(optimization_config.objective.metric_names)
for cons in optimization_config.outcome_constraints:
metric_names.update({cons.metric.name})
metric_names = list(metric_names)
# Convert data into a tensor.
Y, trial_indices = extract_Y_from_data(
experiment=experiment, metric_names=metric_names
)
if Y.numel() == 0:
return []
# Derelativize the optimization config.
tf = Derelativize(
search_space=None, observations=None, config={"use_raw_status_quo": True}
)
optimization_config = tf.transform_optimization_config(
optimization_config=optimization_config.clone(),
modelbridge=get_tensor_converter_model(
experiment=experiment, data=experiment.lookup_data()
),
fixed_features=None,
)
# Extract weights, constraints, and objective_thresholds.
objective_weights = extract_objective_weights(
objective=optimization_config.objective, outcomes=metric_names
)
outcome_constraints = extract_outcome_constraints(
outcome_constraints=optimization_config.outcome_constraints,
outcomes=metric_names,
)
to_tensor = partial(
torch.as_tensor, dtype=torch.double, device=torch.device("cpu")
)
if optimization_config.is_moo_problem:
objective_thresholds = extract_objective_thresholds(
objective_thresholds=fill_missing_thresholds_from_nadir(
experiment=experiment, optimization_config=optimization_config
),
objective=optimization_config.objective,
outcomes=metric_names,
)
objective_thresholds = to_tensor(none_throws(objective_thresholds))
else:
objective_thresholds = None
(
objective_weights,
outcome_constraints,
_,
_,
_,
) = validate_and_apply_final_transform(
objective_weights=objective_weights,
outcome_constraints=outcome_constraints,
linear_constraints=None,
pending_observations=None,
final_transform=to_tensor,
)
# Get weighted tensor objectives.
if optimization_config.is_moo_problem:
(
obj,
weighted_objective_thresholds,
) = get_weighted_mc_objective_and_objective_thresholds(
objective_weights=objective_weights,
objective_thresholds=none_throws(objective_thresholds),
)
Y_obj = obj(Y)
infeas_value = weighted_objective_thresholds
else:
Y_obj = Y @ objective_weights
infeas_value = Y_obj.min()
# Account for feasibility.
if outcome_constraints is not None:
cons_tfs = none_throws(
get_outcome_constraint_transforms(outcome_constraints)
)
feas = torch.all(torch.stack([c(Y) <= 0 for c in cons_tfs], dim=-1), dim=-1)
# Set the infeasible points to reference point or the worst observed value.
Y_obj[~feas] = infeas_value
# Get unique trial indices. Note: only completed/early-stopped
# trials are present.
unique_trial_indices = trial_indices.unique().sort().values.tolist()
# compute the performance at each iteration (completed/early-stopped
# trial).
# For `BatchTrial`s, there is one performance value per iteration, even
# if the iteration (`BatchTrial`) has multiple arms.
if optimization_config.is_moo_problem:
# Compute the hypervolume trace.
partitioning = DominatedPartitioning(
ref_point=weighted_objective_thresholds.double()
)
# compute hv for each iteration (trial_index)
hvs = []
for trial_index in unique_trial_indices:
new_Y = Y_obj[trial_indices == trial_index]
# update with new point
partitioning.update(Y=new_Y)
hv = partitioning.compute_hypervolume().item()
hvs.append(hv)
return hvs
running_max = float("-inf")
raw_maximum = np.zeros(len(unique_trial_indices))
# Find the best observed value for each iterations.
# Enumerate the unique trial indices because only indices
# of completed/early-stopped trials are present.
for i, trial_index in enumerate(unique_trial_indices):
new_Y = Y_obj[trial_indices == trial_index]
running_max = max(running_max, new_Y.max().item())
raw_maximum[i] = running_max
if optimization_config.objective.minimize:
# Negate the result if it is a minimization problem.
raw_maximum = -raw_maximum
return raw_maximum.tolist()
@staticmethod
def _get_trace_by_progression(
experiment: Experiment,
optimization_config: OptimizationConfig | None = None,
bins: list[float] | None = None,
final_progression_only: bool = False,
) -> tuple[list[float], list[float]]:
optimization_config = optimization_config or none_throws(
experiment.optimization_config
)
objective = optimization_config.objective.metric.name
minimize = optimization_config.objective.minimize
map_data = experiment.lookup_data()
if not isinstance(map_data, MapData):
raise ValueError("`get_trace_by_progression` requires MapData.")
map_df = map_data.map_df
# assume the first map_key is progression
map_key = map_data.map_keys[0]
map_df = map_df[map_df["metric_name"] == objective]
map_df = map_df.sort_values(by=["trial_index", map_key])
df = (
map_df.drop_duplicates(MapData.DEDUPLICATE_BY_COLUMNS, keep="last")
if final_progression_only
else map_df
)
# compute cumulative steps
prev_steps_df = map_df.drop_duplicates(
MapData.DEDUPLICATE_BY_COLUMNS, keep="last"
)[["trial_index", map_key]].copy()
# shift the cumsum by one so that we count cumulative steps not including
# the current trial
prev_steps_df[map_key] = (
prev_steps_df[map_key].cumsum().shift(periods=1).fillna(0)
)
prev_steps_df = prev_steps_df.rename(columns={map_key: "prev_steps"})
df = df.merge(prev_steps_df, on=["trial_index"])
df["cumulative_steps"] = df[map_key] + df["prev_steps"]
progressions = df["cumulative_steps"].to_numpy()
if bins is None:
# this assumes that there is at least one completed trial that
# reached the maximum progression
prog_per_trial = df[map_key].max()
num_trials = len(experiment.trials)
bins = np.linspace(
0, prog_per_trial * num_trials, NUM_BINS_PER_TRIAL * num_trials
)
else:
bins = np.array(bins) # pyre-ignore[9]
# pyre-fixme[9]: bins has type `Optional[List[float]]`; used as
# `ndarray[typing.Any, dtype[typing.Any]]`.
bins = np.expand_dims(bins, axis=0)
# compute for each bin value the largest trial index finished by then
# (interpreting the bin value as a cumulative progression)
best_observed_idcs = np.maximum.accumulate(
np.argmax(np.expand_dims(progressions, axis=1) >= bins, axis=0)
)
obj_vals = (df["mean"].cummin() if minimize else df["mean"].cummax()).to_numpy()
best_observed = obj_vals[best_observed_idcs]
# pyre-fixme[16]: Item `List` of `Union[List[float], ndarray[typing.Any,
# np.dtype[typing.Any]]]` has no attribute `squeeze`.
return best_observed.tolist(), bins.squeeze(axis=0).tolist()