Source code for ax.benchmark.benchmark

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

"""
Module for benchmarking Ax algorithms.

Key terms used:

* Trial –– usual Ax `Trial` or `BatchTral`, one execution of a given arm or
  group of arms.
* Replication –– one run of an optimization loop; 1 method + problem combination.
* Test –– multiple replications, ran for statistical significance.
* Full run –– multiple tests: run all methods with all problems.
* Method –– (one of) the algorithm(s) being benchmarked.
* Problem –– a synthetic function, a surrogate surface, or an ML model, on which
  to assess the performance of algorithms.

"""

import time
from types import FunctionType
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from ax.benchmark import utils
from ax.benchmark.benchmark_problem import BenchmarkProblem, SimpleBenchmarkProblem
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun
from ax.core.observation import ObservationFeatures
from ax.core.parameter import RangeParameter
from ax.modelbridge.base import gen_arms
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.runners.synthetic import SyntheticRunner
from ax.service.ax_client import AxClient
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import not_none
from ax.utils.measurement.synthetic_functions import SyntheticFunction


logger = get_logger(__name__)


# To bypass catching of exceptions during benchmarking, since all other exceptions
# will be caught and recorded, but will not necessarily terminate the benchmarking
# run.
[docs]class NonRetryableBenchmarkingError(ValueError): """Error that indicates an issue with the benchmarking setup (e.g. unexpected problem setup, a benchmarking function called incorrectly, etc.) –– something that prevents the benchmarking suite itself from running, rather than an error that occurs during the runs of the benchmarking trials, replications, or tests. """ pass
[docs]def benchmark_trial( parameterization: Optional[np.ndarray] = None, evaluation_function: Optional[Union[SyntheticFunction, FunctionType]] = None, experiment: Optional[Experiment] = None, trial_index: Optional[int] = None, ) -> Union[Tuple[float, float], Data]: # Mean and SEM or a Data object. """Evaluates one trial from benchmarking replication (an Ax trial or batched trial). Evaluation requires either the `parameterization` and `evalution_ function` parameters or the `experiment` and `trial_index` parameters. Note: evaluation function relies on the ordering of items in the parameterization nd-array. Args: parameterization: The parameterization to evaluate. evaluation_function: The evaluation function for the benchmark objective. experiment: Experiment, for a trial on which to fetch data. trial_index: Index of the trial, for which to fetch data. """ use_Service_API = parameterization is not None and evaluation_function is not None use_Dev_API = experiment is not None and trial_index is not None if not use_Service_API ^ use_Dev_API: raise NonRetryableBenchmarkingError( # TODO[T53975770]: test "A parameterization and an evaluation function required for Service-" "API-style trial evaluation and an experiment and trial index are " "required for Dev API trial evalution via fetching metric data." ) if use_Service_API: sem = 0.0 if isinstance(evaluation_function, SyntheticFunction) else None return evaluation_function(parameterization), sem # pyre-ignore[29]: call err. else: trial_index = not_none(trial_index) return not_none(not_none(experiment).trials.get(trial_index)).fetch_data()
[docs]def benchmark_replication( # One optimization loop. problem: BenchmarkProblem, method: GenerationStrategy, num_trials: int, replication_index: Optional[int] = None, batch_size: int = 1, raise_all_exceptions: bool = False, benchmark_trial: FunctionType = benchmark_trial, verbose_logging: bool = True, # Number of trials that need to fail for a replication to be considered failed. failed_trials_tolerated: int = 5, ) -> Experiment: """Runs one benchmarking replication (equivalent to one optimization loop). Args: problem: Problem to benchmark on. method: Method to benchmark, represented as generation strategies. num_trials: Number of trials in each test experiment. batch_size: Batch size for this replication, defaults to 1. raise_all_exceptions: If set to True, any encountered exception will be raised; alternatively, failure tolerance thresholds are used and a few number of trials `failed_trials_tolerated` can fail before a replication is considered failed. benchmark_trial: Function that runs a single trial. Defaults to `benchmark_trial` in this module and must have the same signature. verbose_logging: Whether logging level should be set to `INFO`. failed_trials_tolerated: How many trials can fail before a replication is considered failed and aborted. Defaults to 5. """ trial_exceptions = [] experiment_name = f"{method.name}_on_{problem.name}" if replication_index is not None: experiment_name += f"__v{replication_index}" # Make sure the generation strategy starts from the beginning. method = method.clone_reset() # Choose whether to run replication via Service or Developer API, based on # whether the problem was set up using Ax classes like `SearchSpace` and # `OptimizationConfig` or using "RESTful" Service API-like constructs like # dict parameter representations and `SyntheticFunction`-s or custom callables # for evaluation function. replication_runner = ( _benchmark_replication_Service_API if isinstance(problem, SimpleBenchmarkProblem) else _benchmark_replication_Dev_API ) experiment, exceptions = replication_runner( problem=problem, # pyre-ignore[6] method=method, num_trials=num_trials, experiment_name=experiment_name, batch_size=batch_size, raise_all_exceptions=raise_all_exceptions, benchmark_trial=benchmark_trial, verbose_logging=verbose_logging, failed_trials_tolerated=failed_trials_tolerated, ) trial_exceptions.extend(exceptions) return experiment
[docs]def benchmark_test( # One test, multiple replications. problem: BenchmarkProblem, method: GenerationStrategy, num_trials: int, num_replications: int = 20, batch_size: int = 1, raise_all_exceptions: bool = False, benchmark_replication: FunctionType = benchmark_replication, benchmark_trial: FunctionType = benchmark_trial, verbose_logging: bool = True, # Number of trials that need to fail for a replication to be considered failed. failed_trials_tolerated: int = 5, # Number of replications that need to fail for a test to be considered failed. failed_replications_tolerated: int = 3, ) -> List[Experiment]: """Runs one benchmarking test (equivalent to one problem-method combination), translates into `num_replication` replications, ran for statistical significance of the results. Args: problem: Problem to benchmark on. method: Method to benchmark, represented as generation strategies. num_replications: Number of times to run each test (each problem-method combination), for an aggregated result. num_trials: Number of trials in each test experiment, defaults to 20. batch_size: Batch size for this test, defaults to 1. raise_all_exceptions: If set to True, any encountered exception will be raised; alternatively, failure tolerance thresholds are used and a few number of trials `failed_trials_tolerated` can fail before a replication is considered failed, as well some replications `failed_replications_tolerated` can fail before a benchmarking test is considered failed. benchmark_replication: Function that runs a single benchmarking replication. Defaults to `benchmark_replication` in this module and must have the same signature. benchmark_trial: Function that runs a single trial. Defaults to `benchmark_trial` in this module and must have the same signature. verbose_logging: Whether logging level should be set to `INFO`. failed_trials_tolerated: How many trials can fail before a replication is considered failed and aborted. Defaults to 5. failed_replications_tolerated: How many replications can fail before a test is considered failed and aborted. Defaults to 3. """ replication_exceptions = [] test_replications = [] for replication_idx in range(num_replications): try: test_replications.append( benchmark_replication( problem=problem, method=method, replication_index=replication_idx, num_trials=num_trials, batch_size=batch_size, raise_all_exceptions=raise_all_exceptions, verbose_logging=verbose_logging, failed_trials_tolerated=failed_trials_tolerated, ) ) except Exception as err: if raise_all_exceptions: raise replication_exceptions.append(err) # TODO[T53975770]: test if len(replication_exceptions) > failed_replications_tolerated: raise RuntimeError( # TODO[T53975770]: test f"More than {failed_replications_tolerated} failed for " "{method.name}_on_{problem.name}." ) return test_replications
[docs]def full_benchmark_run( # Full run, multiple tests. problems: Optional[Union[List[BenchmarkProblem], List[str]]] = None, methods: Optional[Union[List[GenerationStrategy], List[str]]] = None, num_trials: Union[int, List[List[int]]] = 20, num_replications: int = 20, batch_size: Union[int, List[List[int]]] = 1, raise_all_exceptions: bool = False, benchmark_test: FunctionType = benchmark_test, benchmark_replication: FunctionType = benchmark_replication, benchmark_trial: FunctionType = benchmark_trial, verbose_logging: bool = True, # Number of trials that need to fail for a replication to be considered failed. failed_trials_tolerated: int = 5, # Number of replications that need to fail for a test to be considered failed. failed_replications_tolerated: int = 3, ) -> Dict[str, Dict[str, List[Experiment]]]: """Full run of the benchmarking suite. To make benchmarking distrubuted at a level of a test, a replication, or a trial (or any combination of those), by passing in a wrapped (in some scheduling logic) version of a corresponding function from this module. Args: problems: Problems to benchmark on, represented as BenchmarkProblem-s or string keys (must be in standard BOProblems). Defaults to all standard BOProblems. methods: Methods to benchmark, represented as generation strategies or or string keys (must be in standard BOMethods). Defaults to all standard BOMethods. num_replications: Number of times to run each test (each problem-method combination), for an aggregated result. num_trials: Number of trials in each test experiment. raise_all_exceptions: If set to True, any encountered exception will be raised; alternatively, failure tolerance thresholds are used and a few number of trials `failed_trials_tolerated` can fail before a replication is considered failed, as well some replications `failed_replications_tolerated` can fail before a benchmarking test is considered failed. benchmark_test: Function that runs a single benchmarking test. Defaults to `benchmark_test` in this module and must have the same signature. benchmark_replication: Function that runs a single benchmarking replication. Defaults to `benchmark_replication` in this module and must have the same signature. benchmark_trial: Function that runs a single trial. Defaults to `benchmark_trial` in this module and must have the same signature. verbose_logging: Whether logging level should be set to `INFO`. failed_trials_tolerated: How many trials can fail before a replication is considered failed and aborted. Defaults to 5. failed_replications_tolerated: How many replications can fail before a test is considered failed and aborted. Defaults to 3. """ exceptions = [] tests: Dict[str, Dict[str, List[Experiment]]] = {} problems, methods = utils.get_problems_and_methods( problems=problems, methods=methods ) for problem_idx, problem in enumerate(problems): tests[problem.name] = {} for method_idx, method in enumerate(methods): tests[problem.name][method.name] = [] try: tests[problem.name][method.name] = benchmark_test( problem=problem, method=method, num_replications=num_replications, # For arguments passed as either numbers, or matrices, # xtract corresponding values for the given combination. num_trials=utils.get_corresponding( num_trials, problem_idx, method_idx ), batch_size=utils.get_corresponding( batch_size, problem_idx, method_idx ), benchmark_replication=benchmark_replication, benchmark_trial=benchmark_trial, raise_all_exceptions=raise_all_exceptions, verbose_logging=verbose_logging, failed_replications_tolerated=failed_replications_tolerated, failed_trials_tolerated=failed_trials_tolerated, ) except Exception as err: if raise_all_exceptions: raise exceptions.append(err) # TODO[T53975770]: test logger.info(f"Obtained benchmarking test experiments: {tests}") return tests
def _benchmark_replication_Service_API( problem: SimpleBenchmarkProblem, method: GenerationStrategy, num_trials: int, experiment_name: str, batch_size: int = 1, raise_all_exceptions: bool = False, benchmark_trial: FunctionType = benchmark_trial, verbose_logging: bool = True, # Number of trials that need to fail for a replication to be considered failed. failed_trials_tolerated: int = 5, ) -> Tuple[Experiment, List[Exception]]: """Run a benchmark replication via the Service API because the problem was set up in a simplified way, without the use of Ax classes like `OptimizationConfig` or `SearchSpace`. """ exceptions = [] if batch_size == 1: ax_client = AxClient( generation_strategy=method, verbose_logging=verbose_logging ) else: # pragma: no cover, TODO[T53975770] assert batch_size > 1, "Batch size of 1 or greater is expected." raise NotImplementedError( "Batched benchmarking on `SimpleBenchmarkProblem`-s not yet implemented." ) ax_client.create_experiment( name=experiment_name, parameters=problem.domain_as_ax_client_parameters(), minimize=problem.minimize, objective_name=problem.name, ) parameter_names = list(ax_client.experiment.search_space.parameters.keys()) assert num_trials > 0 for _ in range(num_trials): parameterization, idx = ax_client.get_next_trial() param_values = np.array([parameterization.get(x) for x in parameter_names]) try: mean, sem = benchmark_trial( parameterization=param_values, evaluation_function=problem.f ) # If problem indicates a noise level and is using a synthetic callable, # add normal noise to the measurement of the mean. if problem.uses_synthetic_function and problem.noise_sd != 0.0: noise = np.random.randn() * problem.noise_sd sem = (sem or 0.0) + problem.noise_sd logger.info( f"Adding noise of {noise} to the measurement mean ({mean})." f"Problem noise SD setting: {problem.noise_sd}." ) mean = mean + noise ax_client.complete_trial(trial_index=idx, raw_data=(mean, sem)) except Exception as err: # TODO[T53975770]: test if raise_all_exceptions: raise exceptions.append(err) if len(exceptions) > failed_trials_tolerated: raise RuntimeError( # TODO[T53975770]: test f"More than {failed_trials_tolerated} failed for {experiment_name}." ) return ax_client.experiment, exceptions def _benchmark_replication_Dev_API( problem: BenchmarkProblem, method: GenerationStrategy, num_trials: int, experiment_name: str, batch_size: int = 1, raise_all_exceptions: bool = False, benchmark_trial: FunctionType = benchmark_trial, verbose_logging: bool = True, # Number of trials that need to fail for a replication to be considered failed. failed_trials_tolerated: int = 5, ) -> Tuple[Experiment, List[Exception]]: """Run a benchmark replication via the Developer API because the problem was set up with Ax classes (likely to allow for additional complexity like adding constraints or non-range parameters). """ exceptions = [] experiment = Experiment( name=experiment_name, search_space=problem.search_space, optimization_config=problem.optimization_config, runner=SyntheticRunner(), ) for _ in range(num_trials): try: gr = method.gen(experiment=experiment, n=batch_size) if batch_size == 1: experiment.new_trial(generator_run=gr).run() else: assert batch_size > 1 experiment.new_batch_trial(generator_run=gr).run() except Exception as err: # TODO[T53975770]: test if raise_all_exceptions: raise exceptions.append(err) if len(exceptions) > failed_trials_tolerated: raise RuntimeError( # TODO[T53975770]: test f"More than {failed_trials_tolerated} failed for {experiment_name}." ) return experiment, exceptions
[docs]def benchmark_minimize_callable( problem: BenchmarkProblem, num_trials: int, method_name: str, replication_index: Optional[int] = None, ) -> Tuple[Experiment, Callable[[List[float]], float]]: """ An interface for evaluating external methods on Ax benchmark problems. The arms run and performance will be tracked by Ax, so the external method can be evaluated alongside Ax methods. It is designed around methods that implement an interface like scipy.optimize.minimize. This function will return a callable evaluation function that takes in an array of parameter values and returns a float objective value. The evaluation function should always be minimized: if the benchmark problem is a maximization problem, then the value returned by the evaluation function will be negated so it can be used directly by methods that minimize. This callable can be given to an external minimization function, and Ax will track all of the calls made to it and the arms that were evaluated. This will also return an Experiment object that will track the arms evaluated by the external method in the same way as done for Ax internal benchmarks. This function should thus be used for each benchmark replication. Args: problem: The Ax benchmark problem to be used to construct the evalutaion function. num_trials: The maximum number of trials for a benchmark run. method_name: Name of the method being tested. replication_index: Replicate number, if multiple replicates are being run. """ # Some validation if isinstance(problem, SimpleBenchmarkProblem): raise NonRetryableBenchmarkingError("`SimpleBenchmarkProblem` not supported.") if not all( isinstance(p, RangeParameter) for p in problem.search_space.parameters.values() ): raise NonRetryableBenchmarkingError("Only continuous search spaces supported.") if any( p.log_scale for p in problem.search_space.parameters.values() # pyre-ignore ): raise NonRetryableBenchmarkingError("Log-scale parameters not supported.") # Create Ax experiment experiment_name = f"{method_name}_on_{problem.name}" if replication_index is not None: experiment_name += f"__v{replication_index}" experiment = Experiment( name=experiment_name, search_space=problem.search_space, optimization_config=problem.optimization_config, runner=SyntheticRunner(), ) max_trials = num_trials # to be used below # Construct the evaluation function def evaluation_function(x: List[float]) -> float: # Check if we have exhuasted the evaluation budget if len(experiment.trials) >= max_trials: raise ValueError(f"Evaluation budget ({max_trials} trials) exhuasted.") # Create an ObservationFeatures param_dict = { pname: x[i] for i, pname in enumerate(problem.search_space.parameters.keys()) } obsf = ObservationFeatures(parameters=param_dict) # pyre-ignore # Get the time since last call num_trials = len(experiment.trials) if num_trials == 0: gen_time = None else: previous_ts = experiment.trials[num_trials - 1].time_created.timestamp() gen_time = time.time() - previous_ts # Create a GR gr = GeneratorRun( arms=gen_arms( observation_features=[obsf], arms_by_signature=experiment.arms_by_signature, ), gen_time=gen_time, ) # Add it as a trial trial = experiment.new_trial().add_generator_run(gr).run() # Evaluate function df = trial.fetch_data().df if len(df) > 1: raise Exception("Does not support multiple outcomes") # pragma: no cover obj = float(df["mean"].values[0]) if not problem.optimization_config.objective.minimize: obj = -obj return obj return experiment, evaluation_function