Source code for ax.utils.testing.core_stubs

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict


from __future__ import annotations

from collections import OrderedDict
from datetime import datetime, timedelta
from logging import Logger
from pathlib import Path
from typing import (
    Any,
    cast,
    Dict,
    Iterable,
    List,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    Type,
)

import numpy as np
import pandas as pd
import torch
from ax.core.arm import Arm
from ax.core.base_trial import BaseTrial, TrialStatus
from ax.core.batch_trial import AbandonedArm, BatchTrial
from ax.core.data import Data
from ax.core.experiment import DataType, Experiment
from ax.core.generation_strategy_interface import GenerationStrategyInterface
from ax.core.generator_run import GeneratorRun
from ax.core.map_data import MapData, MapKeyInfo
from ax.core.map_metric import MapMetric
from ax.core.metric import Metric
from ax.core.multi_type_experiment import MultiTypeExperiment
from ax.core.objective import MultiObjective, Objective, ScalarizedObjective
from ax.core.observation import ObservationFeatures
from ax.core.optimization_config import (
    MultiObjectiveOptimizationConfig,
    OptimizationConfig,
)
from ax.core.outcome_constraint import (
    ObjectiveThreshold,
    OutcomeConstraint,
    ScalarizedOutcomeConstraint,
)
from ax.core.parameter import (
    ChoiceParameter,
    FixedParameter,
    Parameter,
    ParameterType,
    RangeParameter,
)
from ax.core.parameter_constraint import (
    OrderConstraint,
    ParameterConstraint,
    SumConstraint,
)
from ax.core.parameter_distribution import ParameterDistribution
from ax.core.risk_measures import RiskMeasure
from ax.core.runner import Runner
from ax.core.search_space import HierarchicalSearchSpace, RobustSearchSpace, SearchSpace
from ax.core.trial import Trial
from ax.core.types import (
    ComparisonOp,
    TModelCov,
    TModelMean,
    TModelPredict,
    TModelPredictArm,
    TParameterization,
    TParamValue,
)
from ax.early_stopping.strategies import (
    BaseEarlyStoppingStrategy,
    PercentileEarlyStoppingStrategy,
    ThresholdEarlyStoppingStrategy,
)
from ax.early_stopping.strategies.logical import (
    AndEarlyStoppingStrategy,
    OrEarlyStoppingStrategy,
)
from ax.exceptions.core import UserInputError
from ax.global_stopping.strategies.base import BaseGlobalStoppingStrategy
from ax.global_stopping.strategies.improvement import ImprovementGlobalStoppingStrategy
from ax.metrics.branin import AugmentedBraninMetric, BraninMetric
from ax.metrics.branin_map import BraninTimestampMapMetric
from ax.metrics.dict_lookup import DictLookupMetric
from ax.metrics.factorial import FactorialMetric
from ax.metrics.hartmann6 import AugmentedHartmann6Metric, Hartmann6Metric
from ax.modelbridge.factory import Cont_X_trans, get_factorial, get_sobol
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.modelbridge.transition_criterion import (
    MaxGenerationParallelism,
    MaxTrials,
    TrialBasedCriterion,
)
from ax.models.torch.botorch_modular.acquisition import Acquisition
from ax.models.torch.botorch_modular.model import BoTorchModel, SurrogateSpec
from ax.models.torch.botorch_modular.sebo import SEBOAcquisition
from ax.models.torch.botorch_modular.surrogate import Surrogate
from ax.models.winsorization_config import WinsorizationConfig
from ax.runners.synthetic import SyntheticRunner
from ax.service.utils.scheduler_options import SchedulerOptions, TrialType
from ax.utils.common.constants import Keys
from ax.utils.common.logger import get_logger
from ax.utils.common.random import set_rng_seed
from ax.utils.common.typeutils import checked_cast, not_none
from ax.utils.measurement.synthetic_functions import branin
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.monte_carlo import qExpectedImprovement
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.model import Model
from botorch.models.transforms.input import ChainedInputTransform, Normalize, Round
from botorch.utils.datasets import SupervisedDataset
from gpytorch.constraints import Interval
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood
from gpytorch.mlls.marginal_log_likelihood import MarginalLogLikelihood
from gpytorch.priors.torch_priors import GammaPrior

logger: Logger = get_logger(__name__)

TEST_SOBOL_SEED = 1234

##############################
# Experiments
##############################


[docs]def get_experiment( with_status_quo: bool = True, constrain_search_space: bool = True ) -> Experiment: return Experiment( name="test", search_space=get_search_space(constrain_search_space=constrain_search_space), optimization_config=get_optimization_config(), status_quo=get_status_quo() if with_status_quo else None, description="test description", tracking_metrics=[Metric(name="tracking")], is_test=True, )
[docs]def get_experiment_with_map_data_type() -> Experiment: return Experiment( name="test_map_data", search_space=get_search_space(), optimization_config=get_map_optimization_config(), status_quo=get_status_quo(), description="test description", tracking_metrics=[MapMetric(name="tracking")], is_test=True, default_data_type=DataType.MAP_DATA, )
[docs]def get_trial_based_criterion() -> List[TrialBasedCriterion]: return [ MaxTrials( threshold=3, only_in_statuses=[TrialStatus.RUNNING, TrialStatus.COMPLETED], not_in_statuses=None, ), MaxGenerationParallelism( threshold=5, only_in_statuses=None, not_in_statuses=[ TrialStatus.RUNNING, ], ), ]
[docs]def get_experiment_with_custom_runner_and_metric( constrain_search_space: bool = True, immutable: bool = False, multi_objective: bool = False, num_trials: int = 3, has_outcome_constraint: bool = False, ) -> Experiment: # Create experiment with custom runner and metric experiment = Experiment( name="test", search_space=get_search_space(constrain_search_space=constrain_search_space), optimization_config=( get_multi_objective_optimization_config( custom_metric=True, outcome_constraint=has_outcome_constraint, relative=False, ) if multi_objective else get_optimization_config( outcome_constraint=has_outcome_constraint, relative=False ) ), description="test description", tracking_metrics=[ CustomTestMetric(name="custom_test_metric", test_attribute="test") ], runner=CustomTestRunner(test_attribute="test"), is_test=True, ) # Create a trial, set its runner and complete it. for _ in range(num_trials): sobol_generator = get_sobol( search_space=experiment.search_space, ) sobol_run = sobol_generator.gen( n=1, optimization_config=( experiment.optimization_config if not immutable else None ), ) trial = experiment.new_trial(generator_run=sobol_run) trial.runner = experiment.runner trial.mark_running() data = Data.from_multiple_data( get_data( metric_name=metric_name, trial_index=trial.index, num_non_sq_arms=len(trial.arms), include_sq=False, ) for metric_name in experiment.metrics ) experiment.attach_data(data) trial.mark_completed() if immutable: experiment._properties = {Keys.IMMUTABLE_SEARCH_SPACE_AND_OPT_CONF: True} return experiment
[docs]def get_branin_experiment( has_optimization_config: bool = True, with_batch: bool = False, with_trial: bool = False, with_status_quo: bool = False, with_fidelity_parameter: bool = False, with_choice_parameter: bool = False, with_str_choice_param: bool = False, search_space: Optional[SearchSpace] = None, minimize: bool = False, named: bool = True, with_completed_trial: bool = False, ) -> Experiment: search_space = search_space or get_branin_search_space( with_fidelity_parameter=with_fidelity_parameter, with_choice_parameter=with_choice_parameter, with_str_choice_param=with_str_choice_param, ) exp = Experiment( name="branin_test_experiment" if named else None, search_space=search_space, optimization_config=( get_branin_optimization_config(minimize=minimize) if has_optimization_config else None ), runner=SyntheticRunner(), is_test=True, ) if with_status_quo: exp.status_quo = Arm(parameters={"x1": 0.0, "x2": 0.0}) if with_batch: sobol_generator = get_sobol(search_space=exp.search_space) sobol_run = sobol_generator.gen(n=15) exp.new_batch_trial(optimize_for_power=with_status_quo).add_generator_run( sobol_run ) if with_trial or with_completed_trial: sobol_generator = get_sobol(search_space=exp.search_space) sobol_run = sobol_generator.gen(n=1) trial = exp.new_trial(generator_run=sobol_run) if with_completed_trial: trial.mark_running(no_runner_required=True) exp.attach_data(get_branin_data(trials=[trial])) # Add data for one trial trial.mark_completed() return exp
[docs]def get_branin_experiment_with_status_quo_trials( num_sobol_trials: int = 5, multi_objective: bool = False, ) -> Tuple[Experiment, ObservationFeatures]: if multi_objective: exp = get_branin_experiment_with_multi_objective( with_batch=True, with_status_quo=True, ) else: exp = get_branin_experiment() sobol = get_sobol(search_space=exp.search_space) for _ in range(num_sobol_trials): sobol_run = sobol.gen(n=1) t = exp.new_batch_trial().add_generator_run(sobol_run) t.set_status_quo_with_weight(status_quo=t.arms[0], weight=0.5) t.run().mark_completed() status_quo_features = ObservationFeatures( parameters=exp.trials[0].status_quo.parameters, # pyre-fixme [16] trial_index=0, ) return exp, status_quo_features
[docs]def get_robust_branin_experiment( risk_measure: Optional[RiskMeasure] = None, optimization_config: Optional[OptimizationConfig] = None, num_sobol_trials: int = 2, ) -> Experiment: x1_dist = ParameterDistribution( parameters=["x1"], distribution_class="norm", distribution_parameters={} ) search_space = RobustSearchSpace( parameters=[ RangeParameter( name="x1", parameter_type=ParameterType.FLOAT, lower=-5, upper=10 ), RangeParameter( name="x2", parameter_type=ParameterType.FLOAT, lower=0, upper=15 ), ], parameter_distributions=[x1_dist], num_samples=16, ) risk_measure = risk_measure or RiskMeasure( risk_measure="CVaR", options={"n_w": 16, "alpha": 0.8}, ) optimization_config = optimization_config or OptimizationConfig( objective=Objective( metric=BraninMetric( name="branin_metric", param_names=["x1", "x2"], lower_is_better=True ), minimize=True, ), risk_measure=risk_measure, ) exp = Experiment( name="branin_experiment", search_space=search_space, optimization_config=optimization_config, runner=SyntheticRunner(), ) sobol = get_sobol(search_space=exp.search_space) for _ in range(num_sobol_trials): exp.new_trial(generator_run=sobol.gen(1)).run().mark_completed() return exp
[docs]def get_branin_experiment_with_timestamp_map_metric( with_status_quo: bool = False, rate: Optional[float] = None, map_tracking_metric: bool = False, ) -> Experiment: def get_map_metric(name: str) -> BraninTimestampMapMetric: return BraninTimestampMapMetric( name=name, param_names=["x1", "x2"], rate=rate, lower_is_better=True, ) tracking_metric = ( get_map_metric("tracking_branin_map") if map_tracking_metric else BraninMetric(name="branin", param_names=["x1", "x2"], lower_is_better=True) ) exp = Experiment( name="branin_with_timestamp_map_metric", search_space=get_branin_search_space(), optimization_config=OptimizationConfig( objective=Objective(metric=get_map_metric("branin_map"), minimize=True) ), tracking_metrics=[tracking_metric], runner=SyntheticRunner(), default_data_type=DataType.MAP_DATA, ) if with_status_quo: exp.status_quo = Arm(parameters={"x1": 0.0, "x2": 0.0}) return exp
[docs]def run_branin_experiment_with_generation_strategy( generation_strategy: GenerationStrategy, num_trials: int = 6, kwargs_for_get_branin_experiment: Optional[Dict[str, Any]] = None, ) -> Experiment: """Gets a Branin experiment using any given kwargs and runs num_trials trials using the given generation strategy.""" kwargs_for_get_branin_experiment = kwargs_for_get_branin_experiment or {} exp = get_branin_experiment(**kwargs_for_get_branin_experiment) for _ in range(num_trials): gr = generation_strategy.gen(n=1, experiment=exp) trial = exp.new_trial(generator_run=gr) trial.mark_running(no_runner_required=True) exp.attach_data(get_branin_data(trials=[trial])) trial.mark_completed() return exp
[docs]def get_test_map_data_experiment( num_trials: int, num_fetches: int, num_complete: int, map_tracking_metric: bool = False, ) -> Experiment: experiment = get_branin_experiment_with_timestamp_map_metric( rate=0.5, map_tracking_metric=map_tracking_metric ) for i in range(num_trials): trial = experiment.new_trial().add_arm(arm=get_branin_arms(n=1, seed=i)[0]) trial.run() for _ in range(num_fetches): # each time we call fetch, we grab another timestamp experiment.fetch_data() for i in range(num_complete): experiment.trials[i].mark_as(status=TrialStatus.COMPLETED) return experiment
[docs]def get_multi_type_experiment( add_trial_type: bool = True, add_trials: bool = False, num_arms: int = 10 ) -> MultiTypeExperiment: oc = OptimizationConfig(Objective(BraninMetric("m1", ["x1", "x2"]), minimize=True)) experiment = MultiTypeExperiment( name="test_exp", search_space=get_branin_search_space(), default_trial_type="type1", default_runner=SyntheticRunner(dummy_metadata="dummy1"), optimization_config=oc, ) experiment.add_trial_type( trial_type="type2", runner=SyntheticRunner(dummy_metadata="dummy2") ) # Switch the order of variables so metric gives different results experiment.add_tracking_metric( BraninMetric("m2", ["x2", "x1"]), trial_type="type2", canonical_name="m1" ) if add_trials and add_trial_type: generator = get_sobol(experiment.search_space) gr = generator.gen(num_arms) t1 = experiment.new_batch_trial(generator_run=gr, trial_type="type1") t2 = experiment.new_batch_trial(generator_run=gr, trial_type="type2") t1.set_status_quo_with_weight(status_quo=t1.arms[0], weight=0.5) t2.set_status_quo_with_weight(status_quo=t2.arms[0], weight=0.5) t1.run() t2.run() return experiment
[docs]def get_multi_type_experiment_with_multi_objective( add_trials: bool = False, ) -> MultiTypeExperiment: oc = get_branin_multi_objective_optimization_config() experiment = MultiTypeExperiment( name="test_exp", search_space=get_branin_search_space(), default_trial_type="type1", default_runner=SyntheticRunner(dummy_metadata="dummy1"), optimization_config=oc, ) experiment.add_trial_type( trial_type="type2", runner=SyntheticRunner(dummy_metadata="dummy2") ) if add_trials: generator = get_sobol(experiment.search_space) gr = generator.gen(10) t1 = experiment.new_batch_trial(generator_run=gr, trial_type="type1") t2 = experiment.new_batch_trial(generator_run=gr, trial_type="type2") t1.set_status_quo_with_weight(status_quo=t1.arms[0], weight=0.5) t2.set_status_quo_with_weight(status_quo=t2.arms[0], weight=0.5) t1.run() t2.run() return experiment
[docs]def get_factorial_experiment( has_optimization_config: bool = True, with_batch: bool = False, with_status_quo: bool = False, ) -> Experiment: exp = Experiment( name="factorial_test_experiment", search_space=get_factorial_search_space(), optimization_config=( OptimizationConfig( objective=Objective(metric=get_factorial_metric(), minimize=False) ) if has_optimization_config else None ), runner=SyntheticRunner(), is_test=True, tracking_metrics=[get_factorial_metric("secondary_metric")], ) if with_status_quo: exp.status_quo = Arm( parameters={ "factor1": "level11", "factor2": "level21", "factor3": "level31", } ) if with_batch: factorial_generator = get_factorial(search_space=exp.search_space) factorial_run = factorial_generator.gen(n=-1) exp.new_batch_trial(optimize_for_power=with_status_quo).add_generator_run( factorial_run ) return exp
[docs]def get_experiment_with_repeated_arms(num_repeated_arms: int) -> Experiment: batch_trial = get_batch_trial_with_repeated_arms(num_repeated_arms) return batch_trial.experiment
[docs]def get_experiment_with_trial() -> Experiment: trial = get_trial() return trial.experiment
[docs]def get_experiment_with_batch_trial(constrain_search_space: bool = True) -> Experiment: batch_trial = get_batch_trial(constrain_search_space=constrain_search_space) return batch_trial.experiment
[docs]def get_experiment_with_batch_and_single_trial() -> Experiment: batch_trial = get_batch_trial() batch_trial.experiment.new_trial(generator_run=GeneratorRun(arms=[get_arm()])) return batch_trial.experiment
[docs]def get_experiment_with_trial_with_ttl() -> Experiment: batch_trial = get_batch_trial() batch_trial.experiment.new_trial( generator_run=GeneratorRun(arms=[get_arm()]), ttl_seconds=1 ) return batch_trial.experiment
[docs]def get_experiment_with_data() -> Experiment: batch_trial = get_batch_trial() batch_trial.experiment.attach_data(data=get_data()) batch_trial.experiment.attach_data(data=get_data()) batch_trial.experiment.attach_data(data=get_data()) return batch_trial.experiment
[docs]def get_experiment_with_map_data() -> Experiment: experiment = get_experiment_with_map_data_type() experiment.new_trial() experiment.add_tracking_metric(MapMetric("ax_test_metric")) experiment.attach_data(data=get_map_data()) return experiment
[docs]def get_experiment_with_multi_objective() -> Experiment: optimization_config = get_multi_objective_optimization_config() exp = Experiment( name="test_experiment_multi_objective", search_space=get_branin_search_space(), optimization_config=optimization_config, description="test experiment with multi objective", runner=SyntheticRunner(), tracking_metrics=[Metric(name="tracking")], is_test=True, ) return exp
[docs]def get_branin_experiment_with_multi_objective( has_optimization_config: bool = True, has_objective_thresholds: bool = False, with_batch: bool = False, with_status_quo: bool = False, with_fidelity_parameter: bool = False, num_objectives: int = 2, ) -> Experiment: exp = Experiment( name="branin_test_experiment", search_space=get_branin_search_space( with_fidelity_parameter=with_fidelity_parameter ), optimization_config=( get_branin_multi_objective_optimization_config( has_objective_thresholds=has_objective_thresholds, num_objectives=num_objectives, ) if has_optimization_config else None ), runner=SyntheticRunner(), is_test=True, ) if with_status_quo: # Experiment chooses the name "status_quo" by default exp.status_quo = Arm(parameters={"x1": 0.0, "x2": 0.0}) if with_batch: sobol_generator = get_sobol(search_space=exp.search_space, seed=TEST_SOBOL_SEED) sobol_run = sobol_generator.gen(n=5) exp.new_batch_trial(optimize_for_power=with_status_quo).add_generator_run( sobol_run ) return exp
[docs]def get_branin_with_multi_task(with_multi_objective: bool = False) -> Experiment: exp = Experiment( name="branin_test_experiment", search_space=get_branin_search_space(), optimization_config=( get_branin_multi_objective_optimization_config( has_objective_thresholds=True, ) if with_multi_objective else get_branin_optimization_config() ), runner=SyntheticRunner(), is_test=True, ) exp.status_quo = Arm(parameters={"x1": 0.0, "x2": 0.0}, name="status_quo") sobol_generator = get_sobol(search_space=exp.search_space, seed=TEST_SOBOL_SEED) sobol_run = sobol_generator.gen(n=5) exp.new_batch_trial(optimize_for_power=True).add_generator_run(sobol_run) not_none(exp.trials.get(0)).run() exp.new_batch_trial(optimize_for_power=True).add_generator_run(sobol_run) not_none(exp.trials.get(1)).run() return exp
[docs]def get_experiment_with_scalarized_objective_and_outcome_constraint() -> Experiment: objective = get_scalarized_objective() outcome_constraints = [ get_outcome_constraint(), get_scalarized_outcome_constraint(), ] optimization_config = OptimizationConfig( objective=objective, outcome_constraints=outcome_constraints ) return Experiment( name="test_experiment_scalarized_objective and outcome constraint", search_space=get_search_space(), optimization_config=optimization_config, status_quo=get_status_quo(), description="test experiment with scalarized objective and outcome constraint", tracking_metrics=[Metric(name="tracking")], is_test=True, )
[docs]def get_hierarchical_search_space_experiment( num_observations: int = 0, ) -> Experiment: experiment = Experiment( name="test_experiment_hss", description="test experiment with hierarchical search space", search_space=get_hierarchical_search_space(), optimization_config=get_optimization_config(), ) sobol_generator = get_sobol(search_space=experiment.search_space) for i in range(num_observations): trial = experiment.new_trial(generator_run=sobol_generator.gen(1)) trial.mark_running(no_runner_required=True) data = Data( df=pd.DataFrame.from_records( [ { "arm_name": f"{i}_0", "metric_name": f"m{j + 1}", "mean": o, "sem": None, "trial_index": i, } for j, o in enumerate(torch.rand(2).tolist()) ] ) ) experiment.attach_data(data) trial.mark_completed() return experiment
[docs]def get_experiment_with_observations( observations: List[List[float]], minimize: bool = False, scalarized: bool = False, constrained: bool = False, with_tracking_metrics: bool = False, search_space: Optional[SearchSpace] = None, ) -> Experiment: if observations: multi_objective = (len(observations[0]) - constrained) > 1 else: multi_objective = False if multi_objective: metrics = [ Metric(name="m1", lower_is_better=minimize), Metric(name="m2", lower_is_better=False), ] if scalarized: optimization_config = OptimizationConfig( objective=ScalarizedObjective(metrics) ) if constrained: raise NotImplementedError else: optimization_config = MultiObjectiveOptimizationConfig( objective=MultiObjective(metrics=metrics), objective_thresholds=[ ObjectiveThreshold( metric=metrics[i], bound=0.0, relative=False, op=( ComparisonOp.LEQ if metrics[i].lower_is_better else ComparisonOp.GEQ ), ) for i in [0, 1] ], outcome_constraints=( [ OutcomeConstraint( metric=Metric(name="m3"), op=ComparisonOp.GEQ, bound=0.0, relative=False, ) ] if constrained else None ), ) else: if scalarized: raise NotImplementedError objective = Objective(metric=Metric(name="m1"), minimize=minimize) if constrained: constraint = OutcomeConstraint( metric=Metric(name="m2"), op=ComparisonOp.GEQ, bound=0.0, relative=False, ) optimization_config = OptimizationConfig( objective=objective, outcome_constraints=[constraint] ) else: optimization_config = OptimizationConfig(objective=objective) search_space = search_space or get_search_space_for_range_values(min=0.0, max=1.0) exp = Experiment( search_space=search_space, optimization_config=optimization_config, tracking_metrics=( [Metric(name=f"m{len(observations[0])}", lower_is_better=False)] if with_tracking_metrics else None ), runner=SyntheticRunner(), is_test=True, ) sobol_generator = get_sobol(search_space=search_space) for i, obs in enumerate(observations): # Create a dummy trial to add the observation. trial = exp.new_trial(generator_run=sobol_generator.gen(n=1)) data = Data( df=pd.DataFrame.from_records( [ { "arm_name": f"{i}_0", "metric_name": f"m{j + 1}", "mean": o, "sem": None, "trial_index": i, } for j, o in enumerate(obs) ] ) ) exp.attach_data(data) trial.run().complete() return exp
[docs]def get_high_dimensional_branin_experiment(with_batch: bool = False) -> Experiment: search_space = SearchSpace( # pyre-fixme[6]: In call `SearchSpace.__init__`, for 1st parameter `parameters` # expected `List[Parameter]` but got `List[RangeParameter]`. parameters=[ RangeParameter( name=f"x{i}", parameter_type=ParameterType.FLOAT, lower=-5.0, upper=10.0, ) for i in range(25) ] + [ RangeParameter( name=f"x{i + 25}", parameter_type=ParameterType.FLOAT, lower=0.0, upper=15.0, ) for i in range(25) ], ) optimization_config = OptimizationConfig( objective=Objective( metric=BraninMetric( name="objective", param_names=["x19", "x44"], ), minimize=True, ) ) exp = Experiment( name="high_dimensional_branin_experiment", search_space=search_space, optimization_config=optimization_config, runner=SyntheticRunner(), ) if with_batch: sobol_generator = get_sobol(search_space=exp.search_space) sobol_run = sobol_generator.gen(n=15) exp.new_batch_trial().add_generator_run(sobol_run) return exp
############################## # Search Spaces ##############################
[docs]def get_search_space(constrain_search_space: bool = True) -> SearchSpace: parameters: List[Parameter] = [ get_range_parameter(), get_range_parameter2(), get_choice_parameter(), get_fixed_parameter(), ] parameter_constraints = [] if constrain_search_space: parameter_constraints = [ get_order_constraint(), get_parameter_constraint(), get_sum_constraint1(), ] return SearchSpace( parameters=parameters, parameter_constraints=parameter_constraints, )
[docs]def get_branin_search_space( with_fidelity_parameter: bool = False, with_choice_parameter: bool = False, with_str_choice_param: bool = False, ) -> SearchSpace: parameters = [ RangeParameter( name="x1", parameter_type=ParameterType.FLOAT, lower=-5, upper=10 ), ( ChoiceParameter( name="x2", parameter_type=ParameterType.FLOAT, values=[float(x) for x in range(0, 16)], ) if with_choice_parameter else RangeParameter( name="x2", parameter_type=ParameterType.FLOAT, lower=0, upper=15 ) ), ] if with_str_choice_param: parameters.append( ChoiceParameter( name="str_param", parameter_type=ParameterType.STRING, values=["foo", "bar", "baz"], ) ) if with_fidelity_parameter: parameters.append( RangeParameter( name="fidelity", parameter_type=ParameterType.FLOAT, lower=0.0, upper=1.0, is_fidelity=True, target_value=1.0, ) ) return SearchSpace(parameters=cast(List[Parameter], parameters))
[docs]def get_factorial_search_space() -> SearchSpace: return SearchSpace( parameters=[ ChoiceParameter( name="factor1", parameter_type=ParameterType.STRING, values=["level11", "level12", "level13"], ), ChoiceParameter( name="factor2", parameter_type=ParameterType.STRING, values=["level21", "level22"], ), ChoiceParameter( name="factor3", parameter_type=ParameterType.STRING, values=["level31", "level32", "level33", "level34"], ), ] )
[docs]def get_large_factorial_search_space( num_levels: int = 10, num_parameters: int = 6 ) -> SearchSpace: return SearchSpace( parameters=[ ChoiceParameter( name=f"factor{j}", parameter_type=ParameterType.STRING, values=[f"level1{i}" for i in range(num_levels)], ) for j in range(num_parameters) ] )
[docs]def get_large_ordinal_search_space( n_ordinal_choice_parameters: int, n_continuous_range_parameters: int, ) -> SearchSpace: return SearchSpace( parameters=[ # pyre-ignore[6] RangeParameter( name=f"x{i}", parameter_type=ParameterType.FLOAT, lower=0.0, upper=1.0, ) for i in range(n_continuous_range_parameters) ] + [ ChoiceParameter( name=f"y{i}", parameter_type=ParameterType.INT, values=[2, 4, 8, 16], ) for i in range(n_ordinal_choice_parameters) ] )
[docs]def get_hartmann_search_space(with_fidelity_parameter: bool = False) -> SearchSpace: parameters = [ RangeParameter( name=f"x{idx + 1}", parameter_type=ParameterType.FLOAT, lower=0.0, upper=1.0 ) for idx in range(6) ] if with_fidelity_parameter: parameters.append( RangeParameter( name="fidelity", parameter_type=ParameterType.FLOAT, lower=0.0, upper=1.0, is_fidelity=True, target_value=1.0, ) ) return SearchSpace(parameters=cast(List[Parameter], parameters))
[docs]def get_search_space_for_value(val: float = 3.0) -> SearchSpace: return SearchSpace([FixedParameter("x", ParameterType.FLOAT, val)])
[docs]def get_search_space_for_range_value(min: float = 3.0, max: float = 6.0) -> SearchSpace: return SearchSpace([RangeParameter("x", ParameterType.FLOAT, min, max)])
[docs]def get_search_space_for_range_values( min: float = 3.0, max: float = 6.0 ) -> SearchSpace: return SearchSpace( [ RangeParameter("x", ParameterType.FLOAT, min, max), RangeParameter("y", ParameterType.FLOAT, min, max), ] )
[docs]def get_discrete_search_space() -> SearchSpace: return SearchSpace( [ RangeParameter("x", ParameterType.INT, 0, 3), RangeParameter("y", ParameterType.INT, 5, 7), ChoiceParameter("z", ParameterType.STRING, ["red", "panda", "bear"]), ] )
[docs]def get_small_discrete_search_space() -> SearchSpace: return SearchSpace( [ RangeParameter("x", ParameterType.INT, 0, 1), ChoiceParameter("y", ParameterType.STRING, ["red", "panda"]), ] )
[docs]def get_search_space_with_choice_parameters( num_ordered_parameters: int = 2, num_unordered_choices: int = 5, ) -> SearchSpace: parameters = [] for i in range(num_ordered_parameters): parameters.append( ChoiceParameter( name=f"ordered_{i}", parameter_type=ParameterType.INT, values=list(range(10)), # pyre-ignore is_ordered=True, ) ) parameters.append( ChoiceParameter( name="unordered", parameter_type=ParameterType.INT, values=list(range(num_unordered_choices)), # pyre-ignore is_ordered=False, ) ) return SearchSpace(parameters=parameters)
[docs]def get_hierarchical_search_space( with_fixed_parameter: bool = False, ) -> HierarchicalSearchSpace: parameters: List[Parameter] = [ get_model_parameter(with_fixed_parameter=with_fixed_parameter), get_lr_parameter(), get_l2_reg_weight_parameter(), get_num_boost_rounds_parameter(), ] if with_fixed_parameter: parameters.append(get_fixed_parameter()) return HierarchicalSearchSpace(parameters=parameters)
[docs]def get_robust_search_space( lb: float = 0.0, ub: float = 5.0, multivariate: bool = False, use_discrete: bool = False, num_samples: int = 4, # dummy ) -> RobustSearchSpace: parameters = [ RangeParameter("x", ParameterType.FLOAT, lb, ub), RangeParameter("y", ParameterType.FLOAT, lb, ub), RangeParameter("z", ParameterType.INT, lb, ub), ChoiceParameter("c", ParameterType.STRING, ["red", "panda"]), ] if multivariate: if use_discrete: raise UserInputError( "`multivariate` and `use_discrete` are not supported together." ) distributions = [ ParameterDistribution( parameters=["x", "y"], distribution_class="multivariate_normal", distribution_parameters={ "mean": [0.1, 0.2], "cov": [[0.5, 0.1], [0.1, 0.3]], }, ) ] else: distributions = [] distributions.append( ParameterDistribution( parameters=["x"], distribution_class="norm", distribution_parameters={"loc": 1.0, "scale": ub - lb}, ) ) if use_discrete: distributions.append( ParameterDistribution( parameters=["z"], distribution_class="binom", distribution_parameters={"n": 20, "p": 0.5}, ) ) else: distributions.append( ParameterDistribution( parameters=["y"], distribution_class="expon", distribution_parameters={}, ) ) return RobustSearchSpace( # pyre-ignore Incompatible parameter type [6] parameters=parameters, parameter_distributions=distributions, num_samples=num_samples, )
[docs]def get_robust_search_space_environmental( lb: float = 0.0, ub: float = 5.0, ) -> RobustSearchSpace: parameters = [ RangeParameter("x", ParameterType.FLOAT, lb, ub), RangeParameter("y", ParameterType.FLOAT, lb, ub), ] environmental_variables = [ RangeParameter("z", ParameterType.INT, lb, ub), ] distributions = [ ParameterDistribution( parameters=["z"], distribution_class="binom", distribution_parameters={"n": 5, "p": 0.5}, ) ] return RobustSearchSpace( # pyre-ignore Incompatible parameter type [6] parameters=parameters, parameter_distributions=distributions, num_samples=4, # pyre-ignore Incompatible parameter type [6] environmental_variables=environmental_variables, )
############################## # Trials ##############################
[docs]def get_batch_trial( abandon_arm: bool = True, experiment: Optional[Experiment] = None, constrain_search_space: bool = True, ) -> BatchTrial: experiment = experiment or get_experiment( constrain_search_space=constrain_search_space ) batch = experiment.new_batch_trial() arms = get_arms_from_dict(get_arm_weights1()) weights = get_weights_from_dict(get_arm_weights1()) batch.add_arms_and_weights(arms=arms, weights=weights, multiplier=0.75) if abandon_arm: batch.mark_arm_abandoned(batch.arms[0].name, "abandoned reason") batch.runner = SyntheticRunner() batch.set_status_quo_with_weight(status_quo=arms[0], weight=0.5) batch._generation_step_index = 0 return batch
[docs]def get_batch_trial_with_repeated_arms(num_repeated_arms: int) -> BatchTrial: """Create a batch that contains both new arms and N arms from the last existed trial in the experiment. Where N is equal to the input argument 'num_repeated_arms'. """ experiment = get_experiment_with_batch_trial() if len(experiment.trials) > 0: # Get last (previous) trial. prev_trial = experiment.trials[len(experiment.trials) - 1] # Take the first N arms, where N is num_repeated_arms. if len(prev_trial.arms) < num_repeated_arms: logger.warning( "There are less arms in the previous trial than the value of " "input parameter 'num_repeated_arms'. Thus all the arms from " "the last trial will be repeated in the new trial." ) prev_arms = prev_trial.arms[:num_repeated_arms] if isinstance(prev_trial, BatchTrial): prev_weights = prev_trial.weights[:num_repeated_arms] else: prev_weights = [1] * len(prev_arms) else: raise Exception( "There are no previous trials in this experiment. Thus the new " "batch was not created as no repeated arms could be added." ) # Create new (next) arms. next_arms = get_arms_from_dict(get_arm_weights2()) next_weights = get_weights_from_dict(get_arm_weights2()) # Add num_repeated_arms to the new trial. arms = prev_arms + next_arms weights = prev_weights + next_weights batch = experiment.new_batch_trial() batch.add_arms_and_weights(arms=arms, weights=weights, multiplier=1) batch.runner = SyntheticRunner() batch.set_status_quo_with_weight(status_quo=arms[0], weight=0.5) return batch
[docs]def get_trial() -> Trial: experiment = get_experiment() trial = experiment.new_trial(ttl_seconds=72) arm = get_arms_from_dict(get_arm_weights1())[0] trial.add_arm(arm) trial.runner = SyntheticRunner() trial._generation_step_index = 0 trial.update_run_metadata({"workflow_run_id": [12345]}) return trial
[docs]def get_hss_trials_with_fixed_parameter(exp: Experiment) -> Dict[int, BaseTrial]: return { 0: Trial(experiment=exp).add_arm( arm=Arm( parameters={ "model": "Linear", "learning_rate": 0.05, "l2_reg_weight": 1e-4, "num_boost_rounds": 15, "z": True, }, name="0_0", ) ), 1: Trial(experiment=exp).add_arm( arm=Arm( parameters={ "model": "XGBoost", "learning_rate": 0.05, "l2_reg_weight": 1e-4, "num_boost_rounds": 15, "z": True, }, name="1_0", ) ), }
[docs]class TestTrial(BaseTrial): "Trial class to test unsupported trial type error" _arms: List[Arm] = [] def __repr__(self) -> str: return "test" def _get_candidate_metadata(self, arm_name: str) -> Optional[Dict[str, Any]]: return None def _get_candidate_metadata_from_all_generator_runs( self, ) -> Dict[str, Optional[Dict[str, Any]]]: return {"test": None}
[docs] def abandoned_arms(self) -> str: return "test"
@property def arms(self) -> List[Arm]: return self._arms @arms.setter def arms(self, val: List[Arm]) -> None: self._arms = val
[docs] def arms_by_name(self) -> str: return "test"
[docs] def generator_runs(self) -> str: return "test"
############################## # Parameters ##############################
[docs]def get_range_parameter() -> RangeParameter: return RangeParameter( name="w", parameter_type=ParameterType.FLOAT, lower=0.5, upper=5.5, log_scale=False, digits=5, )
[docs]def get_range_parameter2() -> RangeParameter: return RangeParameter(name="x", parameter_type=ParameterType.INT, lower=1, upper=10)
[docs]def get_choice_parameter() -> ChoiceParameter: return ChoiceParameter( name="y", parameter_type=ParameterType.STRING, # Expected `List[typing.Optional[typing.Union[bool, float, str]]]` for 4th # parameter `values` to call # `ax.core.parameter.ChoiceParameter.__init__` but got `List[str]`. values=["foo", "bar", "baz"], )
[docs]def get_ordered_choice_parameter() -> ChoiceParameter: return ChoiceParameter( name="y", parameter_type=ParameterType.INT, # Expected `List[typing.Optional[typing.Union[bool, float, str]]]` for 4th # parameter `values` to call # `ax.core.parameter.ChoiceParameter.__init__` but got `List[str]`. values=[1, 2, 3], is_ordered=True, )
[docs]def get_task_choice_parameter() -> ChoiceParameter: return ChoiceParameter( name="y", parameter_type=ParameterType.INT, values=[1, 2, 3], is_task=True, target_value=1, )
[docs]def get_fixed_parameter() -> FixedParameter: return FixedParameter(name="z", parameter_type=ParameterType.BOOL, value=True)
[docs]def get_model_parameter(with_fixed_parameter: bool = False) -> ChoiceParameter: return ChoiceParameter( name="model", parameter_type=ParameterType.STRING, values=["Linear", "XGBoost"], dependents={ "Linear": ["learning_rate", "l2_reg_weight"], "XGBoost": ( ["num_boost_rounds", "z"] if with_fixed_parameter else ["num_boost_rounds"] ), }, )
[docs]def get_lr_parameter() -> RangeParameter: return RangeParameter( name="learning_rate", parameter_type=ParameterType.FLOAT, lower=0.001, upper=0.1, )
[docs]def get_l2_reg_weight_parameter() -> RangeParameter: return RangeParameter( name="l2_reg_weight", parameter_type=ParameterType.FLOAT, lower=0.00001, upper=0.001, )
[docs]def get_num_boost_rounds_parameter() -> RangeParameter: return RangeParameter( name="num_boost_rounds", parameter_type=ParameterType.INT, lower=10, upper=20, )
############################## # Parameter Constraints ##############################
[docs]def get_order_constraint() -> OrderConstraint: w = get_range_parameter() x = get_range_parameter2() return OrderConstraint(lower_parameter=x, upper_parameter=w)
[docs]def get_parameter_constraint( param_x: str = "x", param_y: str = "w" ) -> ParameterConstraint: return ParameterConstraint(constraint_dict={param_x: 1.0, param_y: -1.0}, bound=1.0)
[docs]def get_sum_constraint1() -> SumConstraint: w = get_range_parameter() x = get_range_parameter2() return SumConstraint(parameters=[x, w], is_upper_bound=False, bound=10.0)
[docs]def get_sum_constraint2() -> SumConstraint: w = get_range_parameter() x = get_range_parameter2() return SumConstraint(parameters=[x, w], is_upper_bound=True, bound=10.0)
############################## # Metrics ##############################
[docs]def get_metric() -> Metric: return Metric(name="m1", properties={"prop": "val"})
[docs]def get_branin_metric( name: str = "branin", lower_is_better: bool = True ) -> BraninMetric: param_names = ["x1", "x2"] return BraninMetric( name=name, param_names=param_names, noise_sd=0.01, lower_is_better=lower_is_better, )
[docs]def get_augmented_branin_metric(name: str = "aug_branin") -> AugmentedBraninMetric: param_names = ["x1", "x2", "fidelity"] return AugmentedBraninMetric(name=name, param_names=param_names, noise_sd=0.01)
[docs]def get_hartmann_metric(name: str = "hartmann") -> Hartmann6Metric: param_names = [f"x{idx + 1}" for idx in range(6)] return Hartmann6Metric(name=name, param_names=param_names, noise_sd=0.01)
[docs]def get_augmented_hartmann_metric( name: str = "aug_hartmann", ) -> AugmentedHartmann6Metric: param_names = [f"x{idx + 1}" for idx in range(6)] param_names.append("fidelity") return AugmentedHartmann6Metric(name=name, param_names=param_names, noise_sd=0.01)
[docs]def get_factorial_metric(name: str = "success_metric") -> FactorialMetric: coefficients: Dict[str, Dict[TParamValue, float]] = { "factor1": {"level11": 0.1, "level12": 0.2, "level13": 0.3}, "factor2": {"level21": 0.1, "level22": 0.2}, "factor3": {"level31": 0.1, "level32": 0.2, "level33": 0.3, "level34": 0.4}, } return FactorialMetric(name=name, coefficients=coefficients, batch_size=int(1e4))
[docs]def get_dict_lookup_metric() -> DictLookupMetric: return DictLookupMetric( name="test metric", param_names=["p1", "p2"], lookup_dict={ (0, 0): 0, (0, 1): 1, }, )
############################## # Outcome Constraints ##############################
[docs]def get_objective_threshold( metric_name: str = "m1", bound: float = -0.25, comparison_op: ComparisonOp = ComparisonOp.GEQ, ) -> ObjectiveThreshold: return ObjectiveThreshold( metric=Metric(name=metric_name), bound=bound, op=comparison_op )
[docs]def get_outcome_constraint(relative: bool = True) -> OutcomeConstraint: return OutcomeConstraint( metric=Metric(name="m2"), op=ComparisonOp.GEQ, bound=-0.25, relative=relative )
[docs]def get_scalarized_outcome_constraint() -> ScalarizedOutcomeConstraint: return ScalarizedOutcomeConstraint( metrics=[Metric(name="oc_m3"), Metric(name="oc_m4")], weights=[0.2, 0.8], op=ComparisonOp.GEQ, bound=-0.25, )
[docs]def get_branin_outcome_constraint() -> OutcomeConstraint: return OutcomeConstraint(metric=get_branin_metric(), op=ComparisonOp.LEQ, bound=0)
############################## # Objectives ##############################
[docs]def get_objective(minimize: bool = False) -> Objective: return Objective(metric=Metric(name="m1"), minimize=minimize)
[docs]def get_map_objective(minimize: bool = False) -> Objective: return Objective(metric=MapMetric(name="m1"), minimize=minimize)
[docs]def get_multi_objective() -> Objective: return MultiObjective( objectives=[ Objective(metric=Metric(name="m1"), minimize=False), Objective(metric=Metric(name="m3", lower_is_better=True), minimize=True), ], )
[docs]def get_custom_multi_objective() -> Objective: return MultiObjective( objectives=[ Objective( metric=CustomTestMetric(name="m1", test_attribute="test"), minimize=False, ), Objective( metric=CustomTestMetric( name="m3", lower_is_better=True, test_attribute="test" ), minimize=True, ), ], )
[docs]def get_many_branin_objective_opt_config( n_objectives: int, ) -> MultiObjectiveOptimizationConfig: return MultiObjectiveOptimizationConfig( objective=MultiObjective( objectives=[get_branin_objective(name=f"m{i}") for i in range(n_objectives)] ) )
[docs]def get_scalarized_objective() -> Objective: return ScalarizedObjective( metrics=[Metric(name="m1"), Metric(name="m3")], weights=[1.0, 2.0], minimize=False, )
[docs]def get_branin_objective(name: str = "branin", minimize: bool = False) -> Objective: return Objective( metric=get_branin_metric(name=name, lower_is_better=minimize), minimize=minimize )
[docs]def get_branin_multi_objective(num_objectives: int = 2) -> Objective: _validate_num_objectives(num_objectives=num_objectives) objectives = [ Objective(metric=get_branin_metric(name="branin_a"), minimize=True), Objective(metric=get_branin_metric(name="branin_b"), minimize=True), ] if num_objectives == 3: objectives.append( Objective(metric=get_branin_metric(name="branin_c"), minimize=True) ) return MultiObjective(objectives=objectives)
[docs]def get_augmented_branin_objective() -> Objective: return Objective(metric=get_augmented_branin_metric(), minimize=False)
[docs]def get_hartmann_objective() -> Objective: return Objective(metric=get_hartmann_metric(), minimize=False)
[docs]def get_augmented_hartmann_objective() -> Objective: return Objective(metric=get_augmented_hartmann_metric(), minimize=False)
############################## # Optimization Configs ##############################
[docs]def get_optimization_config( outcome_constraint: bool = True, relative: bool = True ) -> OptimizationConfig: objective = get_objective() outcome_constraints = ( [get_outcome_constraint(relative=relative)] if outcome_constraint else [] ) return OptimizationConfig( objective=objective, outcome_constraints=outcome_constraints )
[docs]def get_map_optimization_config() -> OptimizationConfig: objective = get_map_objective() return OptimizationConfig(objective=objective)
[docs]def get_multi_objective_optimization_config( custom_metric: bool = False, relative: bool = True, outcome_constraint: bool = True, ) -> MultiObjectiveOptimizationConfig: objective = get_custom_multi_objective() if custom_metric else get_multi_objective() outcome_constraints = ( [get_outcome_constraint(relative=relative)] if outcome_constraint else [] ) objective_thresholds = [ get_objective_threshold(metric_name="m1"), get_objective_threshold(metric_name="m3", comparison_op=ComparisonOp.LEQ), ] return MultiObjectiveOptimizationConfig( objective=objective, outcome_constraints=outcome_constraints, objective_thresholds=objective_thresholds, )
[docs]def get_optimization_config_no_constraints( minimize: bool = False, ) -> OptimizationConfig: return OptimizationConfig( objective=Objective(metric=Metric("test_metric"), minimize=minimize) )
[docs]def get_branin_optimization_config(minimize: bool = False) -> OptimizationConfig: return OptimizationConfig(objective=get_branin_objective(minimize=minimize))
def _validate_num_objectives(num_objectives: int) -> None: if num_objectives not in (2, 3): raise NotImplementedError("Only 2 and 3 objectives are supported.")
[docs]def get_branin_multi_objective_optimization_config( has_objective_thresholds: bool = False, num_objectives: int = 2, ) -> MultiObjectiveOptimizationConfig: _validate_num_objectives(num_objectives=num_objectives) # minimum Branin value is 0.397887 if has_objective_thresholds: objective_thresholds = [ ObjectiveThreshold( metric=get_branin_metric(name="branin_a"), bound=10, op=ComparisonOp.LEQ, relative=False, ), ObjectiveThreshold( metric=get_branin_metric(name="branin_b"), bound=20, op=ComparisonOp.LEQ, relative=False, ), ] if num_objectives == 3: objective_thresholds.append( ObjectiveThreshold( metric=get_branin_metric(name="branin_c"), bound=5.0, op=ComparisonOp.LEQ, relative=False, ) ) else: objective_thresholds = None return MultiObjectiveOptimizationConfig( objective=get_branin_multi_objective(num_objectives=num_objectives), objective_thresholds=objective_thresholds, )
[docs]def get_augmented_branin_optimization_config() -> OptimizationConfig: return OptimizationConfig(objective=get_augmented_branin_objective())
[docs]def get_hartmann_optimization_config() -> OptimizationConfig: return OptimizationConfig(objective=get_hartmann_objective())
[docs]def get_augmented_hartmann_optimization_config() -> OptimizationConfig: return OptimizationConfig(objective=get_augmented_hartmann_objective())
############################## # Arms ##############################
[docs]def get_arm() -> Arm: # Expected `Dict[str, typing.Optional[typing.Union[bool, float, str]]]` for 2nd # parameter `parameters` to call `ax.core.arm.Arm.__init__` but got # `Dict[str, typing.Union[float, str]]`. return Arm(parameters={"w": 0.75, "x": 1, "y": "foo", "z": True})
[docs]def get_status_quo() -> Arm: return Arm( # Expected `Dict[str, typing.Optional[typing.Union[bool, float, str]]]` for 2nd # parameter `parameters` to call `ax.core.arm.Arm.__init__` # but got `Dict[str, typing.Union[float, str]]`. parameters={"w": 0.2, "x": 1, "y": "bar", "z": False}, name="status_quo", )
[docs]def get_arm_weights1() -> MutableMapping[Arm, float]: parameters_dicts: List[TParameterization] = [ {"w": 0.85, "x": 1, "y": "baz", "z": False}, {"w": 0.75, "x": 1, "y": "foo", "z": True}, {"w": 1.4, "x": 2, "y": "bar", "z": True}, ] arms = [Arm(param_dict) for param_dict in parameters_dicts] weights = [0.25, 0.5, 0.25] return OrderedDict(zip(arms, weights))
[docs]def get_arm_weights2() -> MutableMapping[Arm, float]: # update parameters_dicts: List[TParameterization] = [ {"w": 0.96, "x": 3, "y": "hello", "z": True}, {"w": 0.16, "x": 4, "y": "dear", "z": True}, {"w": 3.1, "x": 5, "y": "world", "z": False}, ] arms = [Arm(param_dict) for param_dict in parameters_dicts] weights = [0.25, 0.5, 0.25] return OrderedDict(zip(arms, weights))
[docs]def get_arms_from_dict(arm_weights_dict: MutableMapping[Arm, float]) -> List[Arm]: return list(arm_weights_dict.keys())
[docs]def get_weights_from_dict(arm_weights_dict: MutableMapping[Arm, float]) -> List[float]: return list(arm_weights_dict.values())
[docs]def get_arms() -> List[Arm]: return list(get_arm_weights1().keys())
[docs]def get_weights() -> List[float]: return list(get_arm_weights1().values())
[docs]def get_branin_arms(n: int, seed: int) -> List[Arm]: np.random.seed(seed) x1_raw = np.random.rand(n) x2_raw = np.random.rand(n) return [ Arm(parameters={"x1": -5 + x1_raw[i] * 15, "x2": x2_raw[i] * 15}) for i in range(n) ]
[docs]def get_abandoned_arm() -> AbandonedArm: return AbandonedArm(name="0_0", reason="foobar", time=datetime.now())
############################## # Generator Runs ##############################
[docs]def get_generator_run() -> GeneratorRun: arms = get_arms_from_dict(get_arm_weights1()) weights = get_weights_from_dict(get_arm_weights1()) optimization_config = get_optimization_config() search_space = get_search_space() arm_predictions = get_model_predictions_per_arm() return GeneratorRun( arms=arms, weights=weights, optimization_config=optimization_config, search_space=search_space, model_predictions=get_model_predictions(), best_arm_predictions=(arms[0], arm_predictions[arms[0].signature]), fit_time=10.0, gen_time=5.0, model_key="Sobol", model_kwargs={"scramble": False, "torch_device": torch.device("cpu")}, bridge_kwargs={"transforms": Cont_X_trans, "torch_dtype": torch.double}, generation_step_index=0, candidate_metadata_by_arm_signature={ a.signature: {"md_key": f"md_val_{a.signature}"} for a in arms }, )
[docs]def get_generator_run2() -> GeneratorRun: arms = get_arms_from_dict(get_arm_weights1()) weights = get_weights_from_dict(get_arm_weights1()) return GeneratorRun(arms=arms, weights=weights)
############################## # Runners ##############################
[docs]def get_synthetic_runner() -> SyntheticRunner: return SyntheticRunner(dummy_metadata="foobar")
############################## # Data ##############################
[docs]def get_data( metric_name: str = "ax_test_metric", trial_index: int = 0, num_non_sq_arms: int = 4, include_sq: bool = True, ) -> Data: assert num_non_sq_arms < 5, "Only up to 4 arms currently handled." arm_names = ["status_quo"] if include_sq else [] arm_names += [f"{trial_index}_{i}" for i in range(num_non_sq_arms)] num_arms = num_non_sq_arms + 1 if include_sq else num_non_sq_arms df_dict = { "trial_index": trial_index, "metric_name": metric_name, "arm_name": arm_names, "mean": [1, 3, 2, 2.25, 1.75][:num_arms], "sem": [0, 0.5, 0.25, 0.40, 0.15][:num_arms], "n": [100, 100, 100, 100, 100][:num_arms], } return Data(df=pd.DataFrame.from_records(df_dict))
[docs]def get_non_monolithic_branin_moo_data() -> Data: now = datetime.now() return Data( df=pd.DataFrame.from_records( [ { "arm_name": "status_quo", "trial_index": "0", "metric_name": "branin_a", # Obj. metric for experiment. "mean": 2.0, "sem": 0.01, "start_time": now - timedelta(days=3), "end_time": now, }, { "arm_name": "0_0", "trial_index": "0", "metric_name": "branin_a", # Obj. metric for experiment. "mean": 1.0, "sem": 0.01, "start_time": now - timedelta(days=3), "end_time": now, }, { "arm_name": "status_quo", "trial_index": "0", "metric_name": "branin_b", # Obj. metric for experiment. "mean": 2.0, "sem": 0.01, "start_time": now - timedelta(days=2), "end_time": now - timedelta(days=1), }, { "arm_name": "0_0", "trial_index": "0", "metric_name": "branin_b", # Obj. metric for experiment. "mean": 1.0, "sem": 0.01, "start_time": now - timedelta(days=2), "end_time": now - timedelta(days=1), }, ] ) )
[docs]def get_map_data(trial_index: int = 0) -> MapData: evaluations = { "status_quo": [ ({"epoch": 1}, {"ax_test_metric": (1.0, 0.5)}), ({"epoch": 2}, {"ax_test_metric": (2.0, 0.5)}), ({"epoch": 3}, {"ax_test_metric": (3.0, 0.5)}), ({"epoch": 4}, {"ax_test_metric": (4.0, 0.5)}), ], "0_0": [ ({"epoch": 1}, {"ax_test_metric": (3.7, 0.5)}), ({"epoch": 2}, {"ax_test_metric": (3.8, 0.5)}), ({"epoch": 3}, {"ax_test_metric": (3.9, 0.5)}), ({"epoch": 4}, {"ax_test_metric": (4.0, 0.5)}), ], "0_1": [ ({"epoch": 1}, {"ax_test_metric": (3.0, 0.5)}), ({"epoch": 2}, {"ax_test_metric": (5.0, 0.5)}), ({"epoch": 3}, {"ax_test_metric": (6.0, 0.5)}), ({"epoch": 4}, {"ax_test_metric": (1.0, 0.5)}), ], } return MapData.from_map_evaluations( evaluations=evaluations, # pyre-ignore [6]: Spurious param type mismatch. trial_index=trial_index, map_key_infos=[get_map_key_info()], )
# pyre-fixme[24]: Generic type `MapKeyInfo` expects 1 type parameter.
[docs]def get_map_key_info() -> MapKeyInfo: return MapKeyInfo(key="epoch", default_value=0.0)
[docs]def get_branin_data( trial_indices: Optional[Iterable[int]] = None, trials: Optional[Iterable[Trial]] = None, ) -> Data: if trial_indices and trials: raise ValueError("Expected `trial_indices` or `trials`, not both.") if trials: df_dicts = [ { "trial_index": trial.index, "metric_name": "branin", "arm_name": not_none(checked_cast(Trial, trial).arm).name, "mean": branin( float(not_none(not_none(trial.arm).parameters["x1"])), float(not_none(not_none(trial.arm).parameters["x2"])), ), "sem": 0.0, } for trial in trials ] else: df_dicts = [ { "trial_index": trial_index, "metric_name": "branin", "arm_name": f"{trial_index}_0", "mean": 5.0, "sem": 0.0, } for trial_index in (trial_indices or [0]) ] return Data(df=pd.DataFrame.from_records(df_dicts))
[docs]def get_branin_data_batch(batch: BatchTrial) -> Data: return Data( pd.DataFrame( { "trial_index": batch.index, "arm_name": [arm.name for arm in batch.arms], "metric_name": "branin", "mean": [ branin( float(not_none(arm.parameters["x1"])), float(not_none(arm.parameters["x2"])), ) for arm in batch.arms ], "sem": 0.1, } ) )
[docs]def get_branin_data_multi_objective( trial_indices: Optional[Iterable[int]] = None, num_objectives: int = 2 ) -> Data: _validate_num_objectives(num_objectives=num_objectives) suffixes = ["a", "b"] if num_objectives == 3: suffixes.append("c") df_dicts = [ { "trial_index": trial_index, "metric_name": f"branin_{suffix}", "arm_name": f"{trial_index}_0", "mean": 5.0, "sem": 0.0, } for trial_index in (trial_indices or [0]) for suffix in suffixes ] return Data(df=pd.DataFrame.from_records(df_dicts))
[docs]def get_percentile_early_stopping_strategy() -> PercentileEarlyStoppingStrategy: return PercentileEarlyStoppingStrategy( percentile_threshold=0.25, min_progression=0.2, min_curves=10, trial_indices_to_ignore=[0, 1, 2], normalize_progressions=True, )
[docs]def get_percentile_early_stopping_strategy_with_non_objective_metric_name() -> ( PercentileEarlyStoppingStrategy ): return PercentileEarlyStoppingStrategy( metric_names=["foo"], percentile_threshold=0.25, min_progression=0.2, min_curves=10, trial_indices_to_ignore=[0, 1, 2], normalize_progressions=True, )
[docs]def get_threshold_early_stopping_strategy() -> ThresholdEarlyStoppingStrategy: return ThresholdEarlyStoppingStrategy( metric_threshold=0.1, min_progression=0.2, trial_indices_to_ignore=[0, 1, 2], normalize_progressions=True, )
[docs]def get_and_early_stopping_strategy() -> AndEarlyStoppingStrategy: return AndEarlyStoppingStrategy( left=get_percentile_early_stopping_strategy(), right=get_threshold_early_stopping_strategy(), )
[docs]def get_or_early_stopping_strategy() -> OrEarlyStoppingStrategy: return OrEarlyStoppingStrategy( left=get_percentile_early_stopping_strategy(), right=get_threshold_early_stopping_strategy(), )
[docs]class DummyEarlyStoppingStrategy(BaseEarlyStoppingStrategy): def __init__( self, early_stop_trials: Optional[Dict[int, Optional[str]]] = None ) -> None: self.early_stop_trials: Dict[int, Optional[str]] = early_stop_trials or {} self.seconds_between_polls = 1
[docs] def should_stop_trials_early( self, trial_indices: Set[int], experiment: Experiment, **kwargs: Dict[str, Any], ) -> Dict[int, Optional[str]]: return self.early_stop_trials
[docs]def get_improvement_global_stopping_strategy() -> ImprovementGlobalStoppingStrategy: return ImprovementGlobalStoppingStrategy( min_trials=30, window_size=10, improvement_bar=0.05, inactive_when_pending_trials=True, )
[docs]class DummyGlobalStoppingStrategy(BaseGlobalStoppingStrategy): """ A dummy Global Stopping Strategy which stops the optimization after a pre-specified number of trials are completed. """ def __init__(self, min_trials: int, trial_to_stop: int) -> None: super().__init__(min_trials=min_trials) self.trial_to_stop = trial_to_stop def _should_stop_optimization( self, experiment: Experiment, **kwargs: Dict[str, Any] ) -> Tuple[bool, str]: num_completed_trials = len(experiment.trials_by_status[TrialStatus.COMPLETED]) if num_completed_trials >= max([self.min_trials, self.trial_to_stop]): return True, "Stop the optimization." else: return False, ""
############################## # Instances of types from core/types.py ##############################
[docs]def get_model_mean() -> TModelMean: mean: TModelMean = {"test_metric_1": [1, 2, 3], "test_metric_2": [3, 4, 5]} return mean
[docs]def get_model_covariance() -> TModelCov: covariance: TModelCov = { "test_metric_1": {"test_metric_1": [5, 6, 7], "test_metric_2": [7, 8, 9]}, "test_metric_2": {"test_metric_1": [9, 10, 11], "test_metric_2": [11, 12, 13]}, } return covariance
[docs]def get_model_predictions() -> TModelPredict: model_predictions: TModelPredict = (get_model_mean(), get_model_covariance()) return model_predictions
[docs]def get_model_predictions_per_arm() -> Dict[str, TModelPredictArm]: arms = list(get_arm_weights1().keys()) means = get_model_mean() covariances = get_model_covariance() metric_names = list(means.keys()) m_1, m_2 = metric_names[0], metric_names[1] return { arms[i].signature: ( {m_1: means[m_1][i], m_2: means[m_2][i]}, { m_1: {m_1: covariances[m_1][m_1][i], m_2: covariances[m_1][m_2][i]}, m_2: {m_1: covariances[m_2][m_1][i], m_2: covariances[m_2][m_2][i]}, }, ) for i in range(len(arms)) }
############################## # Modular BoTorch Model Components ##############################
[docs]def get_botorch_model() -> BoTorchModel: return BoTorchModel( surrogate=get_surrogate(), acquisition_class=get_acquisition_type() )
[docs]def get_botorch_model_with_default_acquisition_class() -> BoTorchModel: return BoTorchModel( surrogate=get_surrogate(), acquisition_class=Acquisition, botorch_acqf_class=get_acquisition_function_type(), )
[docs]def get_botorch_model_with_surrogate_specs() -> BoTorchModel: return BoTorchModel( surrogate_specs={ "name": SurrogateSpec(botorch_model_kwargs={"some_option": "some_value"}) } )
[docs]def get_surrogate() -> Surrogate: return Surrogate( botorch_model_class=get_model_type(), mll_class=get_mll_type(), )
[docs]def get_acquisition_type() -> Type[Acquisition]: return Acquisition
[docs]def get_model_type() -> Type[Model]: return SingleTaskGP
[docs]def get_mll_type() -> Type[MarginalLogLikelihood]: return ExactMarginalLogLikelihood
[docs]def get_acquisition_function_type() -> Type[AcquisitionFunction]: return qExpectedImprovement
[docs]def get_sebo_acquisition_class() -> Type[SEBOAcquisition]: return SEBOAcquisition
[docs]def get_winsorization_config() -> WinsorizationConfig: return WinsorizationConfig( lower_quantile_margin=0.2, upper_quantile_margin=0.3, lower_boundary=20, upper_boundary=50, )
[docs]def get_gamma_prior() -> GammaPrior: return GammaPrior(concentration=0.9, rate=10.0)
[docs]def get_interval() -> Interval: return Interval(lower_bound=1e-6, upper_bound=0.1)
[docs]def get_chained_input_transform() -> ChainedInputTransform: bounds = torch.tensor([[0, 0], [3, 5]], dtype=torch.double) return ChainedInputTransform( round=Round( integer_indices=[1], transform_on_eval=True, transform_on_train=False, ), normalize=Normalize(d=2, bounds=bounds), )
############################## # Scheduler ##############################
[docs]def get_default_scheduler_options() -> SchedulerOptions: return SchedulerOptions()
[docs]def get_scheduler_options_batch_trial() -> SchedulerOptions: return SchedulerOptions(trial_type=TrialType.BATCH_TRIAL)
############################## # Other ##############################
[docs]def get_risk_measure() -> RiskMeasure: return RiskMeasure(risk_measure="Expectation", options={"n_w": 8})
[docs]def get_parameter_distribution() -> ParameterDistribution: return ParameterDistribution( parameters=["x"], distribution_class="norm", distribution_parameters={"loc": 1.0, "scale": 0.5}, )
[docs]def get_pathlib_path() -> Path: return Path("some/meaningless/path")
[docs]def get_dataset( num_samples: int = 2, d: int = 2, m: int = 2, has_observation_noise: bool = False, feature_names: Optional[List[str]] = None, outcome_names: Optional[List[str]] = None, tkwargs: Optional[Dict[str, Any]] = None, seed: Optional[int] = None, ) -> SupervisedDataset: """Constructs a SupervisedDataset based on the given arguments. Args: num_samples: The number of samples in the dataset. d: The dimension of the features. m: The number of outcomes. has_observation_noise: If True, includes Yvar in the dataset. feature_names: A list of feature names. Defaults to x0, x1... outcome_names: A list of outcome names. Defaults to y0, y1... tkwargs: Optional dictionary of tensor kwargs, such as dtype and device. seed: An optional seed used to generate the data. """ if seed is not None: set_rng_seed(seed) feature_names = feature_names or [f"x{i}" for i in range(d)] outcome_names = outcome_names or [f"y{i}" for i in range(m)] tkwargs = tkwargs or {} return SupervisedDataset( X=torch.rand(num_samples, d, **tkwargs), Y=torch.rand(num_samples, m, **tkwargs), Yvar=( torch.rand(num_samples, m, **tkwargs) * 0.01 if has_observation_noise else None ), feature_names=feature_names, outcome_names=outcome_names, )
############################## # Custom runner and metric ##############################
[docs]class CustomTestRunner(Runner): def __init__(self, test_attribute: str) -> None: self.test_attribute = test_attribute
[docs] def run(self, trial: BaseTrial) -> Dict[str, Any]: return {"foo": "bar"}
[docs]class CustomTestMetric(Metric): def __init__( self, name: str, test_attribute: str, lower_is_better: Optional[bool] = None, ) -> None: self.test_attribute = test_attribute super().__init__(name=name, lower_is_better=lower_is_better)
[docs]class SpecialGenerationStrategy(GenerationStrategyInterface): """A subclass of `GenerationStrategyInterface` to be used for testing how methods respond to subtypes other than `GenerationStrategy`.""" def __init__(self) -> None: self._name = "special" self._generator_runs: List[GeneratorRun] = []
[docs] def gen_for_multiple_trials_with_multiple_models( self, experiment: Experiment, num_generator_runs: int, data: Optional[Data] = None, n: int = 1, ) -> List[List[GeneratorRun]]: return []
[docs] def clone_reset(self) -> SpecialGenerationStrategy: clone = SpecialGenerationStrategy() clone._name = self._name return clone