#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
from inspect import signature
from typing import Any, Callable, Dict, List, NamedTuple, Optional
import pandas as pd
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun
from ax.modelbridge.base import ModelBridge
from ax.modelbridge.factory import Models
from ax.utils.common.typeutils import checked_cast, not_none
TModelFactory = Callable[..., ModelBridge]
MAX_CONDITIONS_GENERATED = 10000
def _filter_kwargs(function: Callable, **kwargs: Any) -> Any:
"""Filter out kwargs that are not applicable for a given function.
Return a copy of given kwargs dict with only the required kwargs."""
return {k: v for k, v in kwargs.items() if k in signature(function).parameters}
[docs]class GenerationStep(NamedTuple):
"""One step in the generation strategy, corresponds to a single model.
Describes the model, how many arms will be generated with this model, what
minimum number of observations is required to proceed to the next model, etc.
"""
model: Models
num_arms: int
min_arms_observed: int = 0
recommended_max_parallelism: Optional[int] = None
enforce_num_arms: bool = True
# Kwargs to pass into the Models factory function.
model_kwargs: Dict[str, Any] = None
# Kwargs to pass into the Model's `.gen` function.
model_gen_kwargs: Dict[str, Any] = None
index: Optional[int] = None # Index of this step, set internally.
[docs]class GenerationStrategy:
"""GenerationStrategy describes which model should be used to generate new
points for which trials, enabling and automating use of different models
throughout the optimization process. For instance, it allows to use one
model for the initialization trials, and another one for all subsequent
trials. In the general case, this allows to automate use of an arbitrary
number of models to generate an arbitrary numbers of arms
described in the `arms_per_model` argument.
"""
_name: Optional[str]
_steps: List[GenerationStep]
_generated: List[str] # Arms generated in the current generation step.
_observed: List[str] # Arms in the current step for which we observed data.
_model: Optional[ModelBridge] # Current model.
_data: Data # All data this strategy has been updated with.
_curr: GenerationStep # Current step in the strategy.
def __init__(self, steps: List[GenerationStep], name: Optional[str] = None) -> None:
self._name = name
self._steps = steps
assert isinstance(self._steps, list), "Steps must be a GenerationStep list."
for idx, step in enumerate(self._steps):
if step.num_arms == -1:
if idx < len(self._steps) - 1:
raise ValueError(
"Only last step in generation strategy can have num_arms "
"set to -1 to indicate that the model in the step should "
"be used to generate new arms indefinitely."
)
elif step.num_arms < 1:
raise ValueError("`num_arms` must be positive or -1 for all models.")
self._steps[idx] = step._replace(index=idx)
self._generated = []
self._observed = []
self._model = None
self._data = Data()
self._curr = steps[0]
@property
def name(self) -> str:
"""Name of this generation strategy. Defaults to a combination of model
names provided in generation steps."""
if self._name:
return self._name
# pyre-ignore[16]: "`Models` have to attribute `__name__`", but they do.
factory_names = (checked_cast(str, step.model.__name__) for step in self._steps)
# Trim the "get_" beginning of the factory function if it's there.
factory_names = (n[4:] if n[:4] == "get_" else n for n in factory_names)
return "+".join(factory_names)
@property
def generator_changes(self) -> List[int]:
"""List of arm indices where a transition happened from one model to
another."""
gen_changes = [step.num_arms for step in self._steps]
return [sum(gen_changes[: i + 1]) for i in range(len(gen_changes))][:-1]
@property
def model(self) -> Optional[ModelBridge]:
"""Current model in this strategy."""
return self._model # pragma: no cover
[docs] def gen(
self,
experiment: Experiment,
new_data: Optional[Data] = None, # Take in just the new data.
n: int = 1,
**kwargs: Any,
) -> GeneratorRun:
"""Produce the next points in the experiment."""
# Get arm signatures for each entry in new_data that is indeed new.
new_arms = self._get_new_arm_signatures(
experiment=experiment, new_data=new_data
)
enough_observed = (
len(self._observed) + len(new_arms)
) >= self._curr.min_arms_observed
unlimited_arms = self._curr.num_arms == -1
enough_generated = (
not unlimited_arms and len(self._generated) >= self._curr.num_arms
)
remaining_arms = self._curr.num_arms - len(self._generated)
# Check that minimum observed_arms is satisfied if it's enforced.
if self._curr.enforce_num_arms and enough_generated and not enough_observed:
raise ValueError(
"All trials for current model have been generated, but not enough "
"data has been observed to fit next model. Try again when more data "
"are available."
)
# TODO[Lena, T44021164]: take into account failed trials. Potentially
# reduce `_generated` count when a trial mentioned in new data failed.
if (
self._curr.enforce_num_arms
and not unlimited_arms
and 0 < remaining_arms < n
):
raise ValueError(
f"Cannot generate {n} new arms as there are only {remaining_arms} "
"remaining arms to generate using the current model."
)
all_data = (
Data.from_multiple_data(data=[self._data, new_data])
if new_data
else self._data
)
if self._model is None:
# Instantiate the first model.
self._set_current_model(experiment=experiment, data=all_data, **kwargs)
elif enough_generated and enough_observed:
# Change to the next model.
self._change_model(experiment=experiment, data=all_data, **kwargs)
elif new_data is not None:
# We're sticking with the current model, but update with new data
self._model.update(experiment=experiment, data=new_data)
gen_run = not_none(self._model).gen(n=n, **(self._curr.model_gen_kwargs or {}))
# If nothing failed, update known data, _generated, and _observed.
self._data = all_data
self._observed.extend(new_arms)
self._generated.extend(a.signature for a in gen_run.arms)
return gen_run
[docs] def clone_reset(self) -> "GenerationStrategy":
"""Copy this generation strategy without it's state."""
return GenerationStrategy(name=self.name, steps=self._steps)
def _set_current_model(
self, experiment: Experiment, data: Data, **kwargs: Any
) -> None:
"""Instantiate the current model with all available data.
"""
self._model = self._curr.model( # pyre-ignore[29] T41922457
**_filter_kwargs(
self._curr.model,
experiment=experiment,
data=data,
search_space=experiment.search_space,
**(self._curr.model_kwargs or {}),
**kwargs,
)
)
def _change_model(self, experiment: Experiment, data: Data, **kwargs: Any) -> None:
"""Get a new model for the next step.
"""
# Increment the model
if len(self._steps) == not_none(self._curr.index) + 1:
raise ValueError(f"Generation strategy {self.name} is completed.")
self._curr = self._steps[not_none(self._curr.index) + 1]
# New step => reset _generated and _observed.
self._generated, self._observed = [], []
self._set_current_model(experiment=experiment, data=data, **kwargs)
def _get_new_arm_signatures(
self, experiment: Experiment, new_data: Optional[Data]
) -> List[str]:
new_signatures = []
if new_data is not None:
for _, row in new_data.df.iterrows():
# If a row with the same trial index, arm name, and metric name
# has already been seen in this generation strategy, the
# data passed into this function is not entirely new.
if not self._data.df.empty:
if not pd.merge(
new_data.df,
self._data.df,
on=["arm_name", "metric_name", "trial_index"],
).empty:
arm = row["arm_name"]
trial = row["trial_index"]
metric = row["metric_name"]
raise ValueError(
f"Data for arm {arm} in trial {trial} for metric "
f"{metric} has already been seen. Please only pass "
"new data to `GenerationStrategy.gen`."
)
if (
row["arm_name"] in experiment.arms_by_name
and not experiment.trials.get(row["trial_index"]).status.is_failed
):
new_signatures.append(
experiment.arms_by_name.get(row["arm_name"]).signature
)
return new_signatures