#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import logging
import time
from abc import abstractmethod
from collections import defaultdict
from typing import Dict, List, NamedTuple, Optional, Tuple
import numpy as np
import pandas as pd
from ax.benchmark.benchmark_problem import BenchmarkProblem
from ax.core.batch_trial import BatchTrial
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun
from ax.core.observation import ObservationFeatures
from ax.core.optimization_config import OptimizationConfig
from ax.core.trial import Trial
from ax.core.types import ComparisonOp
from ax.modelbridge.base import gen_arms
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.runners.synthetic import SyntheticRunner
from ax.utils.common.logger import get_logger
logger: logging.Logger = get_logger(__name__)
ALLOWED_RUN_RETRIES = 5
PROBLEM_METHOD_DELIMETER = "_on_"
RUN_DELIMETER = "_run_"
[docs]class BenchmarkResult(NamedTuple):
# {method_name -> [[best objective per trial] per benchmark run]}
objective_at_true_best: Dict[str, np.ndarray]
# {method_name -> trials where generation strategy changed}
generator_changes: Dict[str, Optional[List[int]]]
optimum: float
# {method_name -> [total fit time per run]}
fit_times: Dict[str, List[float]]
# {method_name -> [total gen time per run]}
gen_times: Dict[str, List[float]]
[docs]class BenchmarkSetup(Experiment):
"""An extension of `Experiment`, specific to benchmarking. Contains
additional data, such as the benchmarking problem, iterations to run per
benchmarking method and problem combination, etc.
Args:
problem: description of the benchmarking problem for
this setup
total_iterations: how many optimization iterations to run
batch_size: if this benchmark requires batch trials,
batch size for those. Defaults to None
"""
problem: BenchmarkProblem
total_iterations: int
batch_size: int
def __init__(
self, problem: BenchmarkProblem, total_iterations: int = 20, batch_size: int = 1
) -> None:
super().__init__(
name=problem.name,
search_space=problem.search_space,
runner=SyntheticRunner(),
optimization_config=problem.optimization_config,
)
self.problem = problem
self.total_iterations = total_iterations
self.batch_size = batch_size
[docs] def clone_reset(self) -> "BenchmarkSetup":
"""Create a clean copy of this benchmarking setup, with no run data
attached to it."""
return BenchmarkSetup(self.problem, self.total_iterations, self.batch_size)
[docs] def evaluation_function(self, x: List[float]) -> float:
"""
An interface for directly calling the benchmark problem evaluation
function. Tracks each call as a Trial. Only works for unconstrained
problems with a batch size of 1.
Args:
x: A vector of the point to evaluate
Returns: Value of the objective at x
"""
if len(self.trials) >= self.total_iterations:
raise Exception("Evaluation budget exhausted.")
assert self.batch_size == 1
# Create an ObservationFeatures
param_dict = {
pname: x[i] for i, pname in enumerate(self.search_space.parameters.keys())
}
obsf = ObservationFeatures(parameters=param_dict) # pyre-ignore
# Get the time since last call
if len(self.trials) == 0:
gen_time = None
else:
gen_time = (
time.time() - self.trials[len(self.trials) - 1].time_created.timestamp()
)
# Create a GR
gr = GeneratorRun(
arms=gen_arms(
observation_features=[obsf], arms_by_signature=self.arms_by_signature
),
gen_time=gen_time,
)
# Add it as a trial
trial = self.new_trial().add_generator_run(gr).run()
# Evaluate function
df = trial.fetch_data().df
if len(df) > 1:
raise Exception("Does not support multiple outcomes") # pragma: no cover
return float(df["mean"].values[0])
# pyre-fixme[44]: `BenchmarkRunner` non-abstract class with abstract methods.
[docs]class BenchmarkRunner:
"""Runner that keeps track of benchmark runs and failures encountered
during benchmarking.
"""
_failed_runs: List[Tuple[str, str]]
_runs: Dict[Tuple[str, str, int], BenchmarkSetup]
_error_messages: List[str]
_generator_changes: Dict[Tuple[str, str, int], Optional[List[int]]]
def __init__(self) -> None:
self._runs = {}
self._failed_runs = []
self._error_messages = []
self._generator_changes = {}
[docs] @abstractmethod
def run_benchmark_run(
self, setup: BenchmarkSetup, generation_strategy: GenerationStrategy
) -> BenchmarkSetup:
"""Run a single full benchmark run of the given problem and method
combination.
"""
pass # pragma: no cover
[docs] def run_benchmark_test(
self,
setup: BenchmarkSetup,
generation_strategy: GenerationStrategy,
num_runs: int = 20,
raise_all_errors: bool = False,
) -> Dict[Tuple[str, str, int], BenchmarkSetup]:
"""Run full benchmark test for the given method and problem combination.
A benchmark test consists of repeated full benchmark runs.
Args:
setup: setup, runs on which to execute; includes
a benchmarking problem, total number of iterations, etc.
generation strategy: generation strategy that defines which
generation methods should be used in this benchmarking test
num_runs: how many benchmark runs of given problem and method
combination to run with the given setup for one benchmark test
"""
num_failures = 0
benchmark_runs: Dict[Tuple[str, str, int], BenchmarkSetup] = {}
logger.info(f"Testing {generation_strategy.name} on {setup.name}:")
for run_idx in range(num_runs):
logger.info(f"Run {run_idx}")
run_key = (setup.name, generation_strategy.name, run_idx)
# If this run has already been executed, log and skip it.
if run_key in self._runs:
self._error_messages.append( # pragma: no cover
f"Run {run_idx} of {generation_strategy.name} on {setup.name} "
"has already been executed in this benchmarking suite."
"Check that this method + problem combination is not "
"included in the benchmarking suite twice. Only the first "
"run will be recorded."
)
continue
# When number of failures in this test exceeds the allowed max,
# we consider the whole run failed.
while num_failures < ALLOWED_RUN_RETRIES:
try:
benchmark_runs[run_key] = self.run_benchmark_run(
setup.clone_reset(), generation_strategy.clone_reset()
)
self._generator_changes[
run_key
] = generation_strategy.generator_changes
break
except Exception as err: # pragma: no cover
if raise_all_errors:
raise err
logger.exception(err)
num_failures += 1
self._error_messages.append(f"Error in {run_key}: {err}")
if num_failures >= 5:
self._error_messages.append(
f"Considering {generation_strategy.name} on {setup.name} failed"
)
self._failed_runs.append((setup.name, generation_strategy.name))
else:
self._runs.update(benchmark_runs)
return self._runs
[docs] def aggregate_results(self) -> Dict[str, BenchmarkResult]:
"""Pull results from each of the runs (BenchmarkSetups aka Experiments)
and aggregate them into a BenchmarkResult for each problem.
"""
n_iters: Dict[Tuple[str, str], int] = {}
optima: Dict[str, float] = {}
# Results will be put in nested dictionaries problem -> method -> results
objective_at_true_best: Dict[str, Dict[str, List[np.ndarray]]] = {}
fit_times: Dict[str, Dict[str, List[float]]] = {}
gen_times: Dict[str, Dict[str, List[float]]] = {}
generator_changes: Dict[str, Dict[str, Optional[List[int]]]] = {}
for (p, m, r), setup in self._runs.items():
for res_dict in [
objective_at_true_best,
fit_times,
gen_times,
generator_changes,
]:
if p not in res_dict:
res_dict[p] = defaultdict(list)
optima[p] = setup.problem.fbest
generator_changes[p][m] = self._generator_changes[(p, m, r)]
# Extract iterations for this pmr
names = []
for trial in setup.trials.values():
for i, arm in enumerate(trial.arms):
if isinstance(trial, BatchTrial):
reps = int(trial.weights[i])
else:
reps = 1
names.extend([arm.name] * reps)
# Make sure every run has the same number of iterations, so we can safely
# stack them in a matrix.
if (p, m) in n_iters:
if len(names) != n_iters[(p, m)]:
raise ValueError( # pragma: no cover
f"Expected {n_iters[(p, m)]} iterations, got {len(names)}"
)
else:
n_iters[(p, m)] = len(names)
# Get true values for every outcome for each iteration
iters_df = pd.DataFrame({"arm_name": names})
data_df = setup.fetch_data(noisy=False).df
metrics = data_df["metric_name"].unique()
true_values = {}
for metric in metrics:
df_m = data_df[data_df["metric_name"] == metric]
# Get one row per arm
df_m = df_m.groupby("arm_name").first().reset_index()
df_b = pd.merge(iters_df, df_m, how="left", on="arm_name")
true_values[metric] = df_b["mean"].values
# Compute the things we care about
# 1. True best objective value.
objective_at_true_best[p][m].append(
true_best_objective(
optimization_config=setup.problem.optimization_config,
true_values=true_values,
)
)
# 2. Time
fit_time, gen_time = get_model_times(setup)
fit_times[p][m].append(fit_time)
gen_times[p][m].append(gen_time)
# 3. True objective value of model-predicted best (TODO)
# 4. True feasiblity of model-predicted best (TODO)
# 5. Model prediction MSE for each gen run (TODO)
# Combine methods for each problem for the BenchmarkResult
res: Dict[str, BenchmarkResult] = {}
for p in objective_at_true_best:
res[p] = BenchmarkResult(
objective_at_true_best={
m: np.array(v) for m, v in objective_at_true_best[p].items()
},
generator_changes=generator_changes[p],
optimum=optima[p],
fit_times=fit_times[p],
gen_times=gen_times[p],
)
return res
@property
def errors(self) -> List[str]:
"""Messages from errors encoutered while running benchmark test."""
return self._error_messages
[docs]class BanditBenchmarkRunner(BenchmarkRunner):
[docs] def run_benchmark_run(
self, setup: BenchmarkSetup, generation_strategy: GenerationStrategy
) -> BenchmarkSetup:
pass # pragma: no cover TODO[drfreund]
[docs]class BOBenchmarkRunner(BenchmarkRunner):
[docs] def run_benchmark_run(
self, setup: BenchmarkSetup, generation_strategy: GenerationStrategy
) -> BenchmarkSetup:
remaining_iterations = setup.total_iterations
updated_trials = []
while remaining_iterations > 0:
num_suggestions = min(remaining_iterations, setup.batch_size)
generator_run = generation_strategy.gen(
experiment=setup,
new_data=Data.from_multiple_data(
[setup._fetch_trial_data(idx) for idx in updated_trials]
),
n=setup.batch_size,
)
updated_trials = []
if setup.batch_size > 1: # pragma: no cover
trial = setup.new_batch_trial().add_generator_run(generator_run).run()
else:
trial = setup.new_trial(generator_run=generator_run).run()
updated_trials.append(trial.index)
remaining_iterations -= num_suggestions
return setup
[docs]def true_best_objective(
optimization_config: OptimizationConfig, true_values: Dict[str, np.ndarray]
) -> np.ndarray:
"""Compute the true best objective value found by each iteration.
Args:
optimization_config: Optimization config
true_values: Dictionary from metric name to array of value at each
iteration.
Returns: Array of cumulative best feasible value.
"""
# Get objective at each iteration
objective = optimization_config.objective
f = true_values[objective.metric.name]
# Set infeasible points to have inf bad values
if objective.minimize:
infeas_val = np.Inf
else:
infeas_val = -np.Inf
for oc in optimization_config.outcome_constraints:
if oc.relative:
raise ValueError(
"Benchmark aggregation does not support relative constraints"
)
g = true_values[oc.metric.name]
if oc.op == ComparisonOp.LEQ:
feas = g <= oc.bound
else:
feas = g >= oc.bound
f[~feas] = infeas_val
# Get cumulative best
if objective.minimize:
return np.minimum.accumulate(f)
else:
return np.maximum.accumulate(f)
[docs]def get_model_times(setup: BenchmarkSetup) -> Tuple[float, float]:
fit_time = 0.0
gen_time = 0.0
for trial in setup.trials.values():
if isinstance(trial, BatchTrial): # pragma: no cover
gr = trial._generator_run_structs[0].generator_run
elif isinstance(trial, Trial):
gr = trial.generator_run
else:
raise ValueError("Unexpected trial type") # pragma: no cover
if gr is None: # for typing
raise ValueError(
"Unexpected trial with no generator run"
) # pragma: no cover
if gr.fit_time is not None:
# pyre-fixme[6]: Expected `float` for 1st param but got `Optional[float]`.
fit_time += gr.fit_time
if gr.gen_time is not None:
# pyre-fixme[6]: Expected `float` for 1st param but got `Optional[float]`.
gen_time += gr.gen_time
return fit_time, gen_time