Source code for ax.benchmark.benchmark_method

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import warnings
from dataclasses import dataclass

from ax.core.experiment import Experiment
from ax.core.optimization_config import (
    MultiObjectiveOptimizationConfig,
    OptimizationConfig,
)
from ax.core.types import TParameterization
from ax.early_stopping.strategies.base import BaseEarlyStoppingStrategy

from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.service.utils.best_point_mixin import BestPointMixin
from ax.service.utils.scheduler_options import SchedulerOptions, TrialType
from ax.utils.common.base import Base
from pyre_extensions import none_throws


[docs] @dataclass(kw_only=True) class BenchmarkMethod(Base): """Benchmark method, represented in terms of Ax generation strategy (which tells us which models to use when) and scheduler options (which tell us extra execution information like maximum parallelism, early stopping configuration, etc.). Args: name: String description. generation_strategy: The `GenerationStrategy` to use. timeout_hours: Number of hours after which to stop a benchmark replication. distribute_replications: Indicates whether the replications should be run in a distributed manner. Ax itself does not use this attribute. use_model_predictions_for_best_point: Whether to use model predictions with ``get_pareto_optimal_parameters`` (if multi-objective) or `BestPointMixin._get_best_trial` (if single-objective). However, note that if multi-objective, best-point selection is not currently supported and ``get_pareto_optimal_parameters`` will raise a ``NotImplementedError``. batch_size: Number of arms per trial. If greater than 1, trials are ``BatchTrial``s; otherwise, they are ``Trial``s. Defaults to 1. This and the following arguments are passed to ``SchedulerOptions``. run_trials_in_batches: Passed to ``SchedulerOptions``. max_pending_trials: Passed to ``SchedulerOptions``. """ name: str = "DEFAULT" generation_strategy: GenerationStrategy timeout_hours: float = 4.0 distribute_replications: bool = False use_model_predictions_for_best_point: bool = False batch_size: int = 1 run_trials_in_batches: bool = False max_pending_trials: int = 1 early_stopping_strategy: BaseEarlyStoppingStrategy | None = None def __post_init__(self) -> None: if self.name == "DEFAULT": self.name = self.generation_strategy.name early_stopping_strategy = self.early_stopping_strategy if early_stopping_strategy is not None: seconds_between_polls = early_stopping_strategy.seconds_between_polls if seconds_between_polls > 0: warnings.warn( "`early_stopping_strategy.seconds_between_polls` is " f"{seconds_between_polls}, but benchmarking uses 0 seconds " "between polls. Setting " "`early_stopping_strategy.seconds_between_polls` to 0.", stacklevel=1, ) early_stopping_strategy.seconds_between_polls = 0 @property def scheduler_options(self) -> SchedulerOptions: return SchedulerOptions( # No new candidates can be generated while any are pending. # If batched, an entire batch must finish before the next can be # generated. max_pending_trials=self.max_pending_trials, # Do not throttle, as is often necessary when polling real endpoints init_seconds_between_polls=0, min_seconds_before_poll=0, trial_type=TrialType.TRIAL if self.batch_size == 1 else TrialType.BATCH_TRIAL, batch_size=self.batch_size, run_trials_in_batches=self.run_trials_in_batches, early_stopping_strategy=self.early_stopping_strategy, )
[docs] def get_best_parameters( self, experiment: Experiment, optimization_config: OptimizationConfig, n_points: int, ) -> list[TParameterization]: """ Get ``n_points`` promising points. NOTE: Only SOO with n_points = 1 is supported. The expected use case is that these points will be evaluated against an oracle for hypervolume (if multi-objective) or for the value of the best parameter (if single-objective). For multi-objective cases, ``n_points > 1`` is needed. For SOO, ``n_points > 1`` reflects setups where we can choose some points which will then be evaluated noiselessly or at high fidelity and then use the best one. Args: experiment: The experiment to get the data from. This should contain values that would be observed in a realistic setting and not contain oracle values. optimization_config: The ``optimization_config`` for the corresponding ``BenchmarkProblem``. n_points: The number of points to return. """ if isinstance(optimization_config, MultiObjectiveOptimizationConfig): raise NotImplementedError( "BenchmarkMethod.get_pareto_optimal_parameters is not currently " "supported for multi-objective problems." ) if n_points != 1: raise NotImplementedError( f"Currently only n_points=1 is supported. Got {n_points=}." ) # SOO, n=1 case. # Note: This has the same effect as Scheduler.get_best_parameters result = BestPointMixin._get_best_trial( experiment=experiment, generation_strategy=self.generation_strategy, optimization_config=optimization_config, use_model_predictions=self.use_model_predictions_for_best_point, ) if result is None: # This can happen if no points are predicted to satisfy all outcome # constraints. return [] i, params, prediction = none_throws(result) return [params]