#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from typing import Dict, Iterable, NamedTuple, Set, Tuple
import numpy as np
import torch
from ax.core.batch_trial import BatchTrial
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.objective import MultiObjective
from ax.core.optimization_config import (
MultiObjectiveOptimizationConfig,
OptimizationConfig,
)
from ax.core.trial import Trial
from ax.core.types import ComparisonOp
from ax.utils.common.typeutils import not_none
from botorch.utils.multi_objective.box_decompositions.dominated import (
DominatedPartitioning,
)
TArmTrial = Tuple[str, int]
# --------------------------- Data intergrity utils. ---------------------------
[docs]class MissingMetrics(NamedTuple):
objective: Dict[str, Set[TArmTrial]]
outcome_constraints: Dict[str, Set[TArmTrial]]
tracking_metrics: Dict[str, Set[TArmTrial]]
[docs]def get_missing_metrics(
data: Data, optimization_config: OptimizationConfig
) -> MissingMetrics:
"""Return all arm_name, trial_index pairs, for which some of the
observatins of optimization config metrics are missing.
Args:
data: Data to search.
optimization_config: provides metric_names to search for.
Returns:
A NamedTuple(missing_objective, Dict[str, missing_outcome_constraint])
"""
objective = optimization_config.objective
if isinstance(objective, MultiObjective): # pragma: no cover
objective_metric_names = [m.name for m in objective.metrics]
else:
objective_metric_names = [optimization_config.objective.metric.name]
outcome_constraints_metric_names = [
outcome_constraint.metric.name
for outcome_constraint in optimization_config.outcome_constraints
]
missing_objectives = {
objective_metric_name: _get_missing_arm_trial_pairs(data, objective_metric_name)
for objective_metric_name in objective_metric_names
}
missing_outcome_constraints = get_missing_metrics_by_name(
data, outcome_constraints_metric_names
)
all_metric_names = set(data.df["metric_name"])
optimization_config_metric_names = set(missing_objectives.keys()).union(
outcome_constraints_metric_names
)
missing_tracking_metric_names = all_metric_names.difference(
optimization_config_metric_names
)
missing_tracking_metrics = get_missing_metrics_by_name(
data=data, metric_names=missing_tracking_metric_names
)
return MissingMetrics(
objective={k: v for k, v in missing_objectives.items() if len(v) > 0},
outcome_constraints={
k: v for k, v in missing_outcome_constraints.items() if len(v) > 0
},
tracking_metrics={
k: v for k, v in missing_tracking_metrics.items() if len(v) > 0
},
)
[docs]def get_missing_metrics_by_name(
data: Data, metric_names: Iterable[str]
) -> Dict[str, Set[TArmTrial]]:
"""Return all arm_name, trial_index pairs missing some observations of
specified metrics.
Args:
data: Data to search.
metric_names: list of metrics to search for.
Returns:
A Dict[str, missing_metrics], one entry for each metric_name.
"""
missing_metrics = {
metric_name: _get_missing_arm_trial_pairs(data=data, metric_name=metric_name)
for metric_name in metric_names
}
return missing_metrics
def _get_missing_arm_trial_pairs(data: Data, metric_name: str) -> Set[TArmTrial]:
"""Return arm_name and trial_index pairs missing a specified metric."""
metric_df = data.df[data.df.metric_name == metric_name]
present_metric_df = metric_df[metric_df["mean"].notnull()]
arm_trial_pairs = set(zip(data.df["arm_name"], data.df["trial_index"]))
arm_trial_pairs_with_metric = set(
zip(present_metric_df["arm_name"], present_metric_df["trial_index"])
)
missing_arm_trial_pairs = arm_trial_pairs.difference(arm_trial_pairs_with_metric)
return missing_arm_trial_pairs
# -------------------- Experiment result extraction utils. ---------------------
[docs]def best_feasible_objective( # pragma: no cover
optimization_config: OptimizationConfig, values: Dict[str, np.ndarray]
) -> np.ndarray:
"""Compute the best feasible objective value found by each iteration.
Args:
optimization_config: Optimization config.
values: Dictionary from metric name to array of value at each
iteration. If optimization config contains outcome constraints, values
for them must be present in `values`.
Returns: Array of cumulative best feasible value.
"""
# Get objective at each iteration
objective = optimization_config.objective
f = values[objective.metric.name]
# Set infeasible points to have infinitely bad values
infeas_val = np.Inf if objective.minimize else -np.Inf
for oc in optimization_config.outcome_constraints:
if oc.relative:
raise ValueError( # pragma: no cover
"Benchmark aggregation does not support relative constraints"
)
g = values[oc.metric.name]
feas = g <= oc.bound if oc.op == ComparisonOp.LEQ else g >= oc.bound
f[~feas] = infeas_val
# Get cumulative best
minimize = objective.minimize
accumulate = np.minimum.accumulate if minimize else np.maximum.accumulate
return accumulate(f)
[docs]def feasible_hypervolume( # pragma: no cover
optimization_config: MultiObjectiveOptimizationConfig, values: Dict[str, np.ndarray]
) -> np.ndarray:
"""Compute the feasible hypervolume each iteration.
Args:
optimization_config: Optimization config.
values: Dictionary from metric name to array of value at each
iteration (each array is `n`-dim). If optimization config contains
outcome constraints, values for them must be present in `values`.
Returns: Array of feasible hypervolumes.
"""
# Get objective at each iteration
obj_threshold_dict = {
ot.metric.name: ot.bound for ot in optimization_config.objective_thresholds
}
f_vals = np.hstack(
[values[m.name].reshape(-1, 1) for m in optimization_config.objective.metrics]
)
obj_thresholds = np.array(
[obj_threshold_dict[m.name] for m in optimization_config.objective.metrics]
)
# Set infeasible points to be the objective threshold
for oc in optimization_config.outcome_constraints:
if oc.relative:
raise ValueError( # pragma: no cover
"Benchmark aggregation does not support relative constraints"
)
g = values[oc.metric.name]
feas = g <= oc.bound if oc.op == ComparisonOp.LEQ else g >= oc.bound
f_vals[~feas] = obj_thresholds
obj_weights = np.array(
[-1 if m.lower_is_better else 1 for m in optimization_config.objective.metrics]
)
obj_thresholds = obj_thresholds * obj_weights
f_vals = f_vals * obj_weights
partitioning = DominatedPartitioning(
ref_point=torch.from_numpy(obj_thresholds).double()
)
f_vals_torch = torch.from_numpy(f_vals).double()
# compute hv at each iteration
hvs = []
for i in range(f_vals.shape[0]):
# update with new point
partitioning.update(Y=f_vals_torch[i : i + 1])
hv = partitioning.compute_hypervolume().item()
hvs.append(hv)
return np.array(hvs)
[docs]def get_model_times(experiment: Experiment) -> Tuple[float, float]: # pragma: no cover
"""Get total times spent fitting the model and generating candidates in the
course of the experiment.
"""
fit_time = 0.0
gen_time = 0.0
for trial in experiment.trials.values():
if isinstance(trial, BatchTrial): # pragma: no cover
gr = trial._generator_run_structs[0].generator_run
elif isinstance(trial, Trial):
gr = not_none(trial.generator_run)
else:
raise ValueError("Unexpected trial type") # pragma: no cover
if gr.fit_time is not None:
fit_time += not_none(gr.fit_time)
if gr.gen_time is not None:
gen_time += not_none(gr.gen_time)
return fit_time, gen_time