Source code for ax.modelbridge.base

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import time
from abc import ABC
from collections import OrderedDict
from copy import deepcopy
from typing import Any, Dict, List, MutableMapping, Optional, Set, Tuple, Type

from ax.core.arm import Arm
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun, extract_arm_predictions
from ax.core.observation import (
    Observation,
    ObservationData,
    ObservationFeatures,
    observations_from_data,
    separate_observations,
)
from ax.core.optimization_config import OptimizationConfig
from ax.core.search_space import SearchSpace
from ax.core.types import TConfig, TGenMetadata, TModelCov, TModelMean, TModelPredict
from ax.modelbridge.transforms.base import Transform
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import not_none


logger = get_logger("ModelBridge")


[docs]class ModelBridge(ABC): """The main object for using models in Ax. ModelBridge specifies 3 methods for using models: - predict: Make model predictions. This method is not optimized for speed and so should be used primarily for plotting or similar tasks and not inside an optimization loop. - gen: Use the model to generate new candidates. - cross_validate: Do cross validation to assess model predictions. ModelBridge converts Ax types like Data and Arm to types that are meant to be consumed by the models. The data sent to the model will depend on the implementation of the subclass, which will specify the actual API for external model. This class also applies a sequence of transforms to the input data and problem specification which can be used to ensure that the external model receives appropriate inputs. Subclasses will implement what is here referred to as the "terminal transform," which is a transform that changes types of the data and problem specification. """ def __init__( self, search_space: SearchSpace, model: Any, transforms: Optional[List[Type[Transform]]] = None, experiment: Optional[Experiment] = None, data: Optional[Data] = None, transform_configs: Optional[Dict[str, TConfig]] = None, status_quo_name: Optional[str] = None, status_quo_features: Optional[ObservationFeatures] = None, optimization_config: Optional[OptimizationConfig] = None, fit_out_of_design: bool = False, ) -> None: """ Applies transforms and fits model. Args: experiment: Is used to get arm parameters. Is not mutated. search_space: Search space for fitting the model. Constraints need not be the same ones used in gen. data: Ax Data. model: Interface will be specified in subclass. If model requires initialization, that should be done prior to its use here. transforms: List of uninitialized transform classes. Forward transforms will be applied in this order, and untransforms in the reverse order. transform_configs: A dictionary from transform name to the transform config dictionary. status_quo_name: Name of the status quo arm. Can only be used if Data has a single set of ObservationFeatures corresponding to that arm. status_quo_features: ObservationFeatures to use as status quo. Either this or status_quo_name should be specified, not both. optimization_config: Optimization config defining how to optimize the model. """ t_fit_start = time.time() self._metric_names: Set[str] = set() self._training_data: List[Observation] = [] self._optimization_config: Optional[OptimizationConfig] = optimization_config self._training_in_design: List[bool] = [] self._status_quo: Optional[Observation] = None self._arms_by_signature: Optional[Dict[str, Arm]] = None self.transforms: MutableMapping[str, Transform] = OrderedDict() self._model_key: Optional[str] = None self._model_kwargs: Optional[Dict[str, Any]] = None self._bridge_kwargs: Optional[Dict[str, Any]] = None self._model_space = search_space.clone() self._raw_transforms = transforms self._transform_configs: Optional[Dict[str, TConfig]] = transform_configs self._fit_out_of_design = fit_out_of_design if experiment is not None: if self._optimization_config is None: self._optimization_config = experiment.optimization_config self._arms_by_signature = experiment.arms_by_signature observations = ( # pyre-fixme[6]: Expected `Experiment` for 1st param but got `None`. observations_from_data(experiment, data) if experiment is not None and data is not None else [] ) obs_feats_raw, obs_data_raw = self._set_training_data( observations=observations, search_space=search_space ) # Set model status quo # NOTE: training data must be set before setting the status quo. self._set_status_quo( experiment=experiment, status_quo_name=status_quo_name, status_quo_features=status_quo_features, ) obs_feats, obs_data, search_space = self._transform_data( obs_feats=obs_feats_raw, obs_data=obs_data_raw, search_space=search_space, transforms=transforms, transform_configs=transform_configs, ) # Save model, apply terminal transform, and fit self.model = model try: self._fit( model=model, search_space=search_space, observation_features=obs_feats, observation_data=obs_data, ) self.fit_time = time.time() - t_fit_start self.fit_time_since_gen = float(self.fit_time) except NotImplementedError: self.fit_time = 0.0 self.fit_time_since_gen = 0.0 def _transform_data( self, obs_feats: List[ObservationFeatures], obs_data: List[ObservationData], search_space: SearchSpace, transforms: Optional[List[Type[Transform]]], transform_configs: Optional[Dict[str, TConfig]], ) -> Tuple[List[ObservationFeatures], List[ObservationData], SearchSpace]: """Initialize transforms and apply them to provided data.""" # Initialize transforms search_space = search_space.clone() if transforms is not None: if transform_configs is None: transform_configs = {} for t in transforms: t_instance = t( search_space=search_space, observation_features=obs_feats, observation_data=obs_data, config=transform_configs.get(t.__name__, None), ) search_space = t_instance.transform_search_space(search_space) obs_feats = t_instance.transform_observation_features(obs_feats) obs_data = t_instance.transform_observation_data(obs_data, obs_feats) self.transforms[t.__name__] = t_instance return obs_feats, obs_data, search_space def _prepare_training_data( self, observations: List[Observation] ) -> Tuple[List[ObservationFeatures], List[ObservationData]]: observation_features, observation_data = separate_observations(observations) if len(observation_features) != len(set(observation_features)): raise ValueError( "Observation features not unique." "Something went wrong constructing training data..." ) return observation_features, observation_data def _set_training_data( self, observations: List[Observation], search_space: SearchSpace ) -> Tuple[List[ObservationFeatures], List[ObservationData]]: """Store training data, not-transformed. If the modelbridge specifies _fit_out_of_design, all training data is returned. Otherwise, only in design points are returned. """ observation_features, observation_data = self._prepare_training_data( observations=observations ) self._training_data = deepcopy(observations) self._metric_names: Set[str] = set() for obsd in observation_data: self._metric_names.update(obsd.metric_names) return self._process_in_design( search_space=search_space, observation_features=observation_features, observation_data=observation_data, ) def _extend_training_data( self, observations: List[Observation] ) -> Tuple[List[ObservationFeatures], List[ObservationData]]: """Extend and return training data, not-transformed. If the modelbridge specifies _fit_out_of_design, all training data is returned. Otherwise, only in design points are returned. Args: observations: New observations. Returns: observation_features: New + old observation features. observation_data: New + old observation data. """ observation_features, observation_data = self._prepare_training_data( observations=observations ) for obsd in observation_data: for metric_name in obsd.metric_names: if metric_name not in self._metric_names: raise ValueError( f"Unrecognised metric {metric_name}; cannot update " "training data with metrics that were not in the original " "training data." ) # Initialize with all points in design. self._training_data.extend(deepcopy(observations)) all_observation_features, all_observation_data = separate_observations( self.get_training_data() ) return self._process_in_design( search_space=self._model_space, observation_features=all_observation_features, observation_data=all_observation_data, ) def _process_in_design( self, search_space: SearchSpace, observation_features: List[ObservationFeatures], observation_data: List[ObservationData], ) -> Tuple[List[ObservationFeatures], List[ObservationData]]: """Set training_in_design, and decide whether to filter out of design points.""" in_design = [ search_space.check_membership(obsf.parameters) for obsf in observation_features ] self.training_in_design = in_design # Don't filter points. if self._fit_out_of_design: return observation_features, observation_data in_design_indices = [i for i, in_design in enumerate(in_design) if in_design] in_design_features = [observation_features[i] for i in in_design_indices] in_design_data = [observation_data[i] for i in in_design_indices] return in_design_features, in_design_data def _set_status_quo( self, experiment: Optional[Experiment], status_quo_name: Optional[str], status_quo_features: Optional[ObservationFeatures], ) -> None: """Set model status quo. First checks for status quo in inputs status_quo_name and status_quo_features. If neither of these is provided, checks the experiment for a status quo. If that is set, it is handled by name in the same way as input status_quo_name. Args: experiment: Experiment that will be checked for status quo. status_quo_name: Name of status quo arm. status_quo_features: Features for status quo. """ self._status_quo: Optional[Observation] = None if ( status_quo_name is None and status_quo_features is None and experiment is not None and experiment.status_quo is not None ): # pyre-fixme[16]: `Optional` has no attribute `name`. status_quo_name = experiment.status_quo.name if status_quo_name is not None: if status_quo_features is not None: raise ValueError( "Specify either status_quo_name or status_quo_features, not both." ) sq_obs = [ obs for obs in self._training_data if obs.arm_name == status_quo_name ] if len(sq_obs) == 0: logger.warning(f"Status quo {status_quo_name} not present in data") elif len(sq_obs) > 1: logger.warning( # pragma: no cover f"Status quo {status_quo_name} found in data with multiple " "features. Use status_quo_features to specify which to use." ) else: self._status_quo = sq_obs[0] elif status_quo_features is not None: sq_obs = [ obs for obs in self._training_data if (obs.features.parameters == status_quo_features.parameters) and (obs.features.trial_index == status_quo_features.trial_index) ] if len(sq_obs) == 0: logger.warning( f"Status quo features {status_quo_features} not found in data." ) else: # len(sq_obs) will not be > 1, # unique features verified in _set_training_data. self._status_quo = sq_obs[0] @property def status_quo(self) -> Optional[Observation]: """Observation corresponding to status quo, if any.""" return self._status_quo @property def metric_names(self) -> Set[str]: """Metric names present in training data.""" return self._metric_names @property def model_space(self) -> SearchSpace: """SearchSpace used to fit model. """ return self._model_space
[docs] def get_training_data(self) -> List[Observation]: """A copy of the (untransformed) data with which the model was fit. """ return deepcopy(self._training_data)
@property def training_in_design(self) -> List[bool]: """For each observation in the training data, a bool indicating if it is in-design for the model. """ return self._training_in_design @training_in_design.setter def training_in_design(self, training_in_design: List[bool]) -> None: if len(training_in_design) != len(self._training_data): raise ValueError( f"In-design indicators not same length ({len(training_in_design)})" f" as training data ({len(self._training_data)})." ) # Identify out-of-design arms if sum(training_in_design) < len(training_in_design): ood_names = [] for i, obs in enumerate(self._training_data): if not training_in_design[i] and obs.arm_name is not None: ood_names.append(obs.arm_name) ood_str = ", ".join(set(ood_names)) logger.info(f"Leaving out out-of-design observations for arms: {ood_str}") self._training_in_design = training_in_design def _fit( self, model: Any, search_space: SearchSpace, observation_features: List[ObservationFeatures], observation_data: List[ObservationData], ) -> None: """Apply terminal transform and fit model.""" raise NotImplementedError # pragma: no cover def _batch_predict( self, observation_features: List[ObservationFeatures] ) -> List[ObservationData]: """Predict a list of ObservationFeatures together.""" # Get modifiable version observation_features = deepcopy(observation_features) # Transform for t in self.transforms.values(): observation_features = t.transform_observation_features( observation_features ) # Apply terminal transform and predict observation_data = self._predict(observation_features) # Apply reverse transforms, in reverse order # pyre-fixme[6]: Expected `Sequence[_T]` for 1st param but got `ValuesView[Tr... for t in reversed(self.transforms.values()): # noqa T484 observation_features = t.untransform_observation_features( observation_features ) observation_data = t.untransform_observation_data( observation_data, observation_features ) return observation_data def _single_predict( self, observation_features: List[ObservationFeatures] ) -> List[ObservationData]: """Predict one ObservationFeature at a time.""" observation_data = [] for obsf in observation_features: try: obsd = self._batch_predict([obsf]) observation_data += obsd except (TypeError, ValueError) as e: # If the prediction is not out of design, this is a real error. # Let's re-raise. if self.model_space.check_membership(obsf.parameters): logger.debug(obsf.parameters) logger.debug(self.model_space) raise e from None # Prediction is out of design. # Training data is untranformed already. observation = next( ( data for data in self.get_training_data() if obsf.parameters == data.features.parameters and obsf.trial_index == data.features.trial_index ), None, ) if not observation: raise ValueError( "Out-of-design point could not be transformed, and was " "not found in the training data." ) observation_data.append(observation.data) return observation_data
[docs] def predict(self, observation_features: List[ObservationFeatures]) -> TModelPredict: """Make model predictions (mean and covariance) for the given observation features. Predictions are made for all outcomes. If an out-of-design observation can successfully be transformed, the predicted value will be returned. Othwerise, we will attempt to find that observation in the training data and return the raw value. Args: observation_features: observation features Returns: 2-element tuple containing - Dictionary from metric name to list of mean estimates, in same order as observation_features. - Nested dictionary with cov['metric1']['metric2'] a list of cov(metric1@x, metric2@x) for x in observation_features. """ # Predict in single batch. try: observation_data = self._batch_predict(observation_features) # Predict one by one. except (TypeError, ValueError): observation_data = self._single_predict(observation_features) f, cov = unwrap_observation_data(observation_data) return f, cov
def _predict( self, observation_features: List[ObservationFeatures] ) -> List[ObservationData]: """Apply terminal transform, predict, and reverse terminal transform on output. """ raise NotImplementedError # pragma: no cover
[docs] def update(self, data: Data, experiment: Experiment) -> None: """Update the model bridge and the underlying model with new data. This method should be used instead of `fit`, in cases where the underlying model does not need to be re-fit from scratch, but rather updated. Note: `update` expects only new data (obtained since the model initialization or last update) to be passed in, not all data in the experiment. Args: data: data from the experiment obtained since the last update experiment: experiment, in which this data was obtained """ t_update_start = time.time() observations = ( observations_from_data(experiment, data) if experiment is not None and data is not None else [] ) obs_feats_raw, obs_data_raw = self._extend_training_data( observations=observations ) obs_feats, obs_data, search_space = self._transform_data( obs_feats=obs_feats_raw, obs_data=obs_data_raw, search_space=self._model_space, transforms=self._raw_transforms, transform_configs=self._transform_configs, ) self._update(observation_features=obs_feats, observation_data=obs_data) self.fit_time += time.time() - t_update_start self.fit_time_since_gen += time.time() - t_update_start
def _update( self, observation_features: List[ObservationFeatures], observation_data: List[ObservationData], ) -> None: """Apply terminal transform and update model. Note: This function requires ALL observation_features and observation_data observed thus far, not just the new data to update with. Args: observation_features: All observation features observed so far. observation_data: All observation data observed so far. """ raise NotImplementedError # pragma: no cover
[docs] def gen( self, n: int, search_space: Optional[SearchSpace] = None, optimization_config: Optional[OptimizationConfig] = None, pending_observations: Optional[Dict[str, List[ObservationFeatures]]] = None, fixed_features: Optional[ObservationFeatures] = None, model_gen_options: Optional[TConfig] = None, ) -> GeneratorRun: """ Args: n: Number of points to generate search_space: Search space optimization_config: Optimization config pending_observations: A map from metric name to pending observations for that metric. fixed_features: An ObservationFeatures object containing any features that should be fixed at specified values during generation. model_gen_options: A config dictionary that is passed along to the model. """ t_gen_start = time.time() if pending_observations is None: pending_observations = {} if fixed_features is None: fixed_features = ObservationFeatures({}) # Get modifiable versions if search_space is None: search_space = self._model_space search_space = search_space.clone() if optimization_config is None: optimization_config = ( # pyre-fixme[16]: `Optional` has no attribute `clone`. self._optimization_config.clone() if self._optimization_config is not None else None ) else: optimization_config = optimization_config.clone() # TODO(T34225037): replace deepcopy with native clone() in Ax pending_observations = deepcopy(pending_observations) fixed_features = deepcopy(fixed_features) # Transform for t in self.transforms.values(): search_space = t.transform_search_space(search_space) if optimization_config is not None: optimization_config = t.transform_optimization_config( optimization_config=optimization_config, modelbridge=self, fixed_features=fixed_features, ) for metric, po in pending_observations.items(): pending_observations[metric] = t.transform_observation_features(po) fixed_features = t.transform_observation_features([fixed_features])[0] # Apply terminal transform and gen observation_features, weights, best_obsf, gen_metadata = self._gen( n=n, search_space=search_space, optimization_config=optimization_config, pending_observations=pending_observations, fixed_features=fixed_features, model_gen_options=model_gen_options, ) # Apply reverse transforms # pyre-fixme[6]: Expected `Sequence[_T]` for 1st param but got `ValuesView[Tr... for t in reversed(self.transforms.values()): # noqa T484 observation_features = t.untransform_observation_features( observation_features ) if best_obsf is not None: best_obsf = t.untransform_observation_features([best_obsf])[0] best_point_predictions = None try: model_predictions = self.predict(observation_features) if best_obsf is not None: best_point_predictions = extract_arm_predictions( model_predictions=self.predict([best_obsf]), arm_idx=0 ) except NotImplementedError: # pragma: no cover model_predictions = None best_arm = ( None if best_obsf is None else gen_arms( observation_features=[best_obsf], arms_by_signature=self._arms_by_signature, )[0] ) gr = GeneratorRun( arms=gen_arms( observation_features=observation_features, arms_by_signature=self._arms_by_signature, ), weights=weights, optimization_config=optimization_config, search_space=search_space, model_predictions=model_predictions, best_arm_predictions=None if best_arm is None else (best_arm, best_point_predictions), fit_time=self.fit_time_since_gen, gen_time=time.time() - t_gen_start, model_key=self._model_key, model_kwargs=self._model_kwargs, bridge_kwargs=self._bridge_kwargs, gen_metadata=gen_metadata, model_state_after_gen=self._get_model_state(), ) self.fit_time_since_gen = 0.0 return gr
def _gen( self, n: int, search_space: SearchSpace, optimization_config: Optional[OptimizationConfig], pending_observations: Dict[str, List[ObservationFeatures]], fixed_features: ObservationFeatures, model_gen_options: Optional[TConfig], ) -> Tuple[ List[ObservationFeatures], List[float], Optional[ObservationFeatures], TGenMetadata, ]: """Apply terminal transform, gen, and reverse terminal transform on output. """ raise NotImplementedError # pragma: no cover
[docs] def cross_validate( self, cv_training_data: List[Observation], cv_test_points: List[ObservationFeatures], ) -> List[ObservationData]: """Make a set of cross-validation predictions. Args: cv_training_data: The training data to use for cross validation. cv_test_points: The test points at which predictions will be made. Returns: A list of predictions at the test points. """ # Apply transforms to cv_training_data and cv_test_points cv_test_points = deepcopy(cv_test_points) obs_feats, obs_data = separate_observations( observations=cv_training_data, copy=True ) for t in self.transforms.values(): obs_feats = t.transform_observation_features(obs_feats) obs_data = t.transform_observation_data(obs_data, obs_feats) cv_test_points = t.transform_observation_features(cv_test_points) # Apply terminal transform, and get predictions. cv_predictions = self._cross_validate( obs_feats=obs_feats, obs_data=obs_data, cv_test_points=cv_test_points ) # Apply reverse transforms, in reverse order # pyre-fixme[6]: Expected `Sequence[_T]` for 1st param but got `ValuesView[Tr... for t in reversed(self.transforms.values()): cv_test_points = t.untransform_observation_features(cv_test_points) cv_predictions = t.untransform_observation_data( cv_predictions, cv_test_points ) return cv_predictions
def _cross_validate( self, obs_feats: List[ObservationFeatures], obs_data: List[ObservationData], cv_test_points: List[ObservationFeatures], ) -> List[ObservationData]: """Apply the terminal transform, make predictions on the test points, and reverse terminal transform on the results. """ raise NotImplementedError # pragma: no cover def _set_kwargs_to_save( self, model_key: str, model_kwargs: Dict[str, Any], bridge_kwargs: Dict[str, Any], ) -> None: """Set properties used to save the model that created a given generator run, on the `GeneratorRun` object. Each generator run produced by the `gen` method of this model bridge will have the model key and kwargs fields set as provided in arguments to this function. """ self._model_key = model_key self._model_kwargs = model_kwargs self._bridge_kwargs = bridge_kwargs def _get_model_state(self) -> Dict[str, Any]: """Obtains the state of the underlying model if using a stateful one.""" return not_none(self.model)._get_state()
[docs] def feature_importances(self, metric_name: str) -> Dict[str, float]: raise NotImplementedError( "Feature importance not available for this model type" )
[docs]def unwrap_observation_data(observation_data: List[ObservationData]) -> TModelPredict: """Converts observation data to the format for model prediction outputs. That format assumes each observation data has the same set of metrics. """ metrics = set(observation_data[0].metric_names) f: TModelMean = {metric: [] for metric in metrics} cov: TModelCov = {m1: {m2: [] for m2 in metrics} for m1 in metrics} for od in observation_data: if set(od.metric_names) != metrics: raise ValueError( "Each ObservationData should use same set of metrics. " "Expected {exp}, got {got}.".format( exp=metrics, got=set(od.metric_names) ) ) for i, m1 in enumerate(od.metric_names): f[m1].append(od.means[i]) for j, m2 in enumerate(od.metric_names): cov[m1][m2].append(od.covariance[i, j]) return f, cov
[docs]def gen_arms( observation_features: List[ObservationFeatures], arms_by_signature: Optional[Dict[str, Arm]] = None, ) -> List[Arm]: """Converts observation features to arms.""" # TODO(T34225939): handle static context (which is stored on observation_features) arms = [] for of in observation_features: arm = Arm(parameters=of.parameters) if arms_by_signature is not None and arm.signature in arms_by_signature: existing_arm = arms_by_signature[arm.signature] arm = Arm(name=existing_arm.name, parameters=existing_arm.parameters) arms.append(arm) return arms