Source code for ax.models.torch.botorch_modular.utils

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import warnings
from collections import OrderedDict
from collections.abc import Sequence
from dataclasses import dataclass, field
from logging import Logger
from typing import Any

import torch
from ax.core.search_space import SearchSpaceDigest
from ax.exceptions.core import AxError, AxWarning, UnsupportedError
from ax.models.torch_base import TorchOptConfig
from ax.models.types import TConfig
from ax.utils.common.constants import Keys
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import checked_cast
from botorch.acquisition.acquisition import AcquisitionFunction
from botorch.acquisition.logei import qLogNoisyExpectedImprovement
from botorch.acquisition.multi_objective.logei import (
    qLogNoisyExpectedHypervolumeImprovement,
)
from botorch.fit import fit_fully_bayesian_model_nuts, fit_gpytorch_mll
from botorch.models.fully_bayesian import SaasFullyBayesianSingleTaskGP
from botorch.models.gp_regression import SingleTaskGP
from botorch.models.gp_regression_fidelity import SingleTaskMultiFidelityGP
from botorch.models.gp_regression_mixed import MixedSingleTaskGP
from botorch.models.gpytorch import BatchedMultiOutputGPyTorchModel, GPyTorchModel
from botorch.models.model import Model, ModelList
from botorch.models.multitask import MultiTaskGP
from botorch.models.pairwise_gp import PairwiseGP
from botorch.models.transforms.input import InputTransform
from botorch.models.transforms.outcome import OutcomeTransform
from botorch.utils.datasets import SupervisedDataset
from botorch.utils.transforms import is_fully_bayesian
from gpytorch.kernels.kernel import Kernel
from gpytorch.likelihoods import Likelihood
from gpytorch.mlls.exact_marginal_log_likelihood import ExactMarginalLogLikelihood
from gpytorch.mlls.marginal_log_likelihood import MarginalLogLikelihood
from pyre_extensions import none_throws
from torch import Tensor

MIN_OBSERVED_NOISE_LEVEL = 1e-7
logger: Logger = get_logger(__name__)


[docs] @dataclass class ModelConfig: """Configuration for the BoTorch Model used in Surrogate. Args: botorch_model_class: ``Model`` class to be used as the underlying BoTorch model. If None is provided a model class will be selected (either one for all outcomes or a ModelList with separate models for each outcome) will be selected automatically based off the datasets at `construct` time. model_options: Dictionary of options / kwargs for the BoTorch ``Model`` constructed during ``Surrogate.fit``. Note that the corresponding attribute will later be updated to include any additional kwargs passed into ``BoTorchModel.fit``. mll_class: ``MarginalLogLikelihood`` class to use for model-fitting. This argument is deprecated in favor of model_configs. mll_options: Dictionary of options / kwargs for the MLL. outcome_transform_classes: List of BoTorch outcome transforms classes. Passed down to the BoTorch ``Model``. Multiple outcome transforms can be chained together using ``ChainedOutcomeTransform``. outcome_transform_options: Outcome transform classes kwargs. The keys are class string names and the values are dictionaries of outcome transform kwargs. For example, ` outcome_transform_classes = [Standardize] outcome_transform_options = { "Standardize": {"m": 1}, ` For more options see `botorch/models/transforms/outcome.py`. input_transform_classes: List of BoTorch input transforms classes. Passed down to the BoTorch ``Model``. Multiple input transforms will be chained together using ``ChainedInputTransform``. input_transform_options: Input transform classes kwargs. The keys are class string names and the values are dictionaries of input transform kwargs. For example, ` input_transform_classes = [Normalize, Round] input_transform_options = { "Normalize": {"d": 3}, "Round": {"integer_indices": [0], "categorical_features": {1: 2}}, } ` For more input options see `botorch/models/transforms/input.py`. covar_module_class: Covariance module class. This gets initialized after parsing the ``covar_module_options`` in ``covar_module_argparse``, and gets passed to the model constructor as ``covar_module``. covar_module_options: Covariance module kwargs. in favor of model_configs. likelihood: ``Likelihood`` class. This gets initialized with ``likelihood_options`` and gets passed to the model constructor. This argument is deprecated in favor of model_configs. likelihood_options: Likelihood options. name: Name of the model config. This is used to identify the model config. """ botorch_model_class: type[Model] | None = None model_options: dict[str, Any] = field(default_factory=dict) mll_class: type[MarginalLogLikelihood] = ExactMarginalLogLikelihood mll_options: dict[str, Any] = field(default_factory=dict) input_transform_classes: list[type[InputTransform]] | None = None input_transform_options: dict[str, dict[str, Any]] | None = field( default_factory=dict ) outcome_transform_classes: list[type[OutcomeTransform]] | None = None outcome_transform_options: dict[str, dict[str, Any]] = field(default_factory=dict) covar_module_class: type[Kernel] | None = None covar_module_options: dict[str, Any] = field(default_factory=dict) likelihood_class: type[Likelihood] | None = None likelihood_options: dict[str, Any] = field(default_factory=dict) name: str | None = None
[docs] def use_model_list( datasets: Sequence[SupervisedDataset], botorch_model_class: type[Model], model_configs: list[ModelConfig] | None = None, metric_to_model_configs: dict[str, list[ModelConfig]] | None = None, allow_batched_models: bool = True, ) -> bool: model_configs = model_configs or [] metric_to_model_configs = metric_to_model_configs or {} if len(datasets) == 1 and datasets[0].Y.shape[-1] == 1: # There is only one outcome, so we can use a single model. return False elif ( len(model_configs) > 1 or len(metric_to_model_configs) > 0 or any(len(model_config) for model_config in metric_to_model_configs.values()) ): # There are multiple outcomes and outcomes might be modeled with different # models return True # Otherwise, the same model class is used for all outcomes. # Determine what the model class is. if len(model_configs) > 0: botorch_model_class = ( model_configs[0].botorch_model_class or botorch_model_class ) if issubclass(botorch_model_class, SaasFullyBayesianSingleTaskGP): # SAAS models do not support multiple outcomes. # Use model list if there are multiple outcomes. return len(datasets) > 1 or datasets[0].Y.shape[-1] > 1 elif issubclass(botorch_model_class, MultiTaskGP): # We wrap multi-task models into `ModelListGP` when there are # multiple outcomes. return len(datasets) > 1 or datasets[0].Y.shape[-1] > 1 elif len(datasets) == 1: # This method is called before multiple datasets are merged into # one if using a batched model. If there is one dataset here, # there should be a reason that a single model should be used: # e.g. a contextual model, where we want to jointly model the metric # each context (and context-level metrics are different outcomes). return False elif issubclass(botorch_model_class, BatchedMultiOutputGPyTorchModel) and all( torch.equal(datasets[0].X, ds.X) for ds in datasets[1:] ): # Use batch models if allowed return not allow_batched_models # If there are multiple Xs and they are not all equal, we use `ModelListGP`. return True
[docs] def choose_model_class( datasets: Sequence[SupervisedDataset], search_space_digest: SearchSpaceDigest, ) -> type[Model]: """Chooses a BoTorch `Model` using the given data (currently just Yvars) and its properties (information about task and fidelity features). Args: Yvars: List of tensors, each representing observation noise for a given outcome, where outcomes are in the same order as in Xs. task_features: List of columns of X that are tasks. fidelity_features: List of columns of X that are fidelity parameters. Returns: A BoTorch `Model` class. """ if len(search_space_digest.fidelity_features) > 1: raise NotImplementedError( "Only a single fidelity feature supported " f"(got: {search_space_digest.fidelity_features})." ) if len(search_space_digest.task_features) > 1: raise NotImplementedError( f"Only a single task feature supported " f"(got: {search_space_digest.task_features})." ) if search_space_digest.task_features and search_space_digest.fidelity_features: raise NotImplementedError( "Multi-task multi-fidelity optimization not yet supported." ) is_fixed_noise = [ds.Yvar is not None for ds in datasets] all_inferred = not any(is_fixed_noise) if not all_inferred and not all(is_fixed_noise): raise ValueError( "Mix of known and unknown variances indicates valuation function " "errors. Variances should all be specified, or none should be." ) # Multi-task case (when `task_features` is specified). if search_space_digest.task_features: model_class = MultiTaskGP # Single-task multi-fidelity cases. elif search_space_digest.fidelity_features: model_class = SingleTaskMultiFidelityGP # Mixed optimization case. Note that presence of categorical # features in search space digest indicates that downstream in the # stack we chose not to perform continuous relaxation on those # features. elif search_space_digest.categorical_features: model_class = MixedSingleTaskGP # Single-task single-fidelity cases. else: model_class = SingleTaskGP logger.debug(f"Chose BoTorch model class: {model_class}.") return model_class
[docs] def choose_botorch_acqf_class( torch_opt_config: TorchOptConfig, ) -> type[AcquisitionFunction]: """Chooses a BoTorch ``AcquisitionFunction`` class. Current logic relies on ``TorchOptConfig.is_moo`` field to determine whether to use qLogNEHVI (for MOO) or qLogNEI for (SOO). """ if torch_opt_config.is_moo: acqf_class = qLogNoisyExpectedHypervolumeImprovement else: acqf_class = qLogNoisyExpectedImprovement logger.debug(f"Chose BoTorch acquisition function class: {acqf_class}.") return acqf_class
[docs] def construct_acquisition_and_optimizer_options( acqf_options: TConfig, model_gen_options: TConfig | None = None ) -> tuple[TConfig, TConfig]: """Extract acquisition and optimizer options from `model_gen_options`.""" acq_options = acqf_options.copy() opt_options = {} if model_gen_options: acq_options.update( checked_cast(dict, model_gen_options.get(Keys.ACQF_KWARGS, {})) ) # TODO: Add this if all acq. functions accept the `subset_model` # kwarg or opt for kwarg filtering. # acq_options[SUBSET_MODEL] = model_gen_options.get(SUBSET_MODEL) opt_options = checked_cast( dict, model_gen_options.get(Keys.OPTIMIZER_KWARGS, {}) ).copy() return acq_options, opt_options
[docs] def convert_to_block_design( datasets: Sequence[SupervisedDataset], force: bool = False, ) -> list[SupervisedDataset]: # Convert data to "block design". TODO: Figure out a better # solution for this using the data containers (pass outcome # names as properties of the data containers) is_fixed = [ds.Yvar is not None for ds in datasets] if any(is_fixed) and not all(is_fixed): raise UnsupportedError( "Cannot convert mixed data with and without variance " "observations to `block design`." ) is_fixed = all(is_fixed) Xs = [dataset.X for dataset in datasets] for dset in datasets[1:]: if dset.feature_names != datasets[0].feature_names: raise ValueError( "Feature names must be the same across all datasets, " f"got {dset.feature_names} and {datasets[0].feature_names}" ) # Join the outcome names of datasets. outcome_names = sum([ds.outcome_names for ds in datasets], []) if len({X.shape for X in Xs}) != 1 or not all( torch.equal(X, Xs[0]) for X in Xs[1:] ): if not force: raise UnsupportedError( "Cannot convert data to non-block design data. " "To force this and drop data not shared between " "outcomes use `force=True`." ) warnings.warn( "Forcing conversion of data not complying to a block design " "to block design by dropping observations that are not shared " "between outcomes.", AxWarning, stacklevel=3, ) X_shared, idcs_shared = _get_shared_rows(Xs=Xs) Y = torch.cat([ds.Y[i] for ds, i in zip(datasets, idcs_shared)], dim=-1) if is_fixed: Yvar = torch.cat( # pyre-fixme[16]: `Optional` has no attribute `__getitem__`. [ds.Yvar[i] for ds, i in zip(datasets, idcs_shared)], dim=-1, ) else: Yvar = None datasets = [ SupervisedDataset( X=X_shared, Y=Y, Yvar=Yvar, feature_names=datasets[0].feature_names, outcome_names=outcome_names, ) ] return datasets # data complies to block design, can concat with impunity Y = torch.cat([ds.Y for ds in datasets], dim=-1) if is_fixed: Yvar = torch.cat([none_throws(ds.Yvar) for ds in datasets], dim=-1) else: Yvar = None datasets = [ SupervisedDataset( X=Xs[0], Y=Y, Yvar=Yvar, feature_names=datasets[0].feature_names, outcome_names=outcome_names, ) ] return datasets
def _get_shared_rows(Xs: list[Tensor]) -> tuple[Tensor, list[Tensor]]: """Extract shared rows from a list of tensors Args: Xs: A list of m two-dimensional tensors with shapes `(n_1 x d), ..., (n_m x d)`. It is not required that the `n_i` are the same. Returns: A two-tuple containing (i) a Tensor with the rows that are shared between all the Tensors in `Xs`, and (ii) a list of index tensors that indicate the location of these rows in the respective elements of `Xs`. """ idcs_shared = [] Xs_sorted = sorted(Xs, key=len) X_shared = Xs_sorted[0].clone() for X in Xs_sorted[1:]: X_shared = X_shared[(X_shared == X.unsqueeze(-2)).all(dim=-1).any(dim=-2)] # get indices for X in Xs: same = (X_shared == X.unsqueeze(-2)).all(dim=-1).any(dim=-1) idcs_shared.append(torch.arange(same.shape[-1], device=X_shared.device)[same]) return X_shared, idcs_shared
[docs] def fit_botorch_model( model: Model, mll_class: type[MarginalLogLikelihood], mll_options: dict[str, Any] | None = None, ) -> None: """Fit a BoTorch model.""" mll_options = mll_options or {} models = model.models if isinstance(model, ModelList) else [model] for m in models: # TODO: Support deterministic models when we support `ModelList` if is_fully_bayesian(m): fit_fully_bayesian_model_nuts( m, disable_progbar=True, **mll_options, ) elif isinstance(m, (GPyTorchModel, PairwiseGP)): mll_options = mll_options or {} mll = mll_class(likelihood=m.likelihood, model=m, **mll_options) fit_gpytorch_mll(mll) else: raise NotImplementedError( f"Model of type {m.__class__.__name__} is currently not supported." )
def _tensor_difference(A: Tensor, B: Tensor) -> Tensor: """Used to return B sans any Xs that also appear in A""" C = torch.cat((A, B), dim=0) D, inverse_ind = torch.unique(C, return_inverse=True, dim=0) n = A.shape[0] A_indices = inverse_ind[:n].tolist() B_indices = inverse_ind[n:].tolist() Bi_set = set(B_indices) - set(A_indices) return D[list(Bi_set)]
[docs] def check_outcome_dataset_match( outcome_names: Sequence[str], datasets: Sequence[SupervisedDataset], exact_match: bool, ) -> None: """Check that the given outcome names match those of datasets. Based on `exact_match` we either require that outcome names are a subset of all outcomes or require the them to be the same. Also checks that there are no duplicates in outcome names. Args: outcome_names: A list of outcome names. datasets: A list of `SupervisedDataset` objects. exact_match: If True, outcome_names must be the same as the union of outcome names of the datasets. Otherwise, we check that the outcome_names are a subset of all outcomes. Raises: ValueError: If there is no match. """ all_outcomes = sum((ds.outcome_names for ds in datasets), []) set_all_outcomes = set(all_outcomes) set_all_spec_outcomes = set(outcome_names) if len(set_all_outcomes) != len(all_outcomes): raise AxError("Found duplicate outcomes in the datasets.") if len(set_all_spec_outcomes) != len(outcome_names): raise AxError("Found duplicate outcome names.") if not exact_match: if not set_all_spec_outcomes.issubset(set_all_outcomes): raise AxError( "Outcome names must be a subset of the outcome names of the datasets." f"Got {outcome_names=} but the datasets model {set_all_outcomes}." ) elif set_all_spec_outcomes != set_all_outcomes: raise AxError( "Each outcome name must correspond to an outcome in the datasets. " f"Got {outcome_names=} but the datasets model {set_all_outcomes}." )
[docs] def get_subset_datasets( datasets: Sequence[SupervisedDataset], subset_outcome_names: Sequence[str], ) -> list[SupervisedDataset]: """Get the list of datasets corresponding to the given subset of outcome names. This is used to separate out datasets that are used by one surrogate. Args: datasets: A list of `SupervisedDataset` objects. subset_outcome_names: A list of outcome names to get datasets for. Returns: A list of `SupervisedDataset` objects corresponding to the given subset of outcome names. """ check_outcome_dataset_match( outcome_names=subset_outcome_names, datasets=datasets, exact_match=False ) single_outcome_datasets = { ds.outcome_names[0]: ds for ds in datasets if len(ds.outcome_names) == 1 } multi_outcome_datasets = { tuple(ds.outcome_names): ds for ds in datasets if len(ds.outcome_names) > 1 } subset_datasets = [] outcomes_processed = [] for outcome_name in subset_outcome_names: if outcome_name in outcomes_processed: # This can happen if the outcome appears in a multi-outcome # dataset that is already processed. continue if outcome_name in single_outcome_datasets: # The default case of outcome with a corresponding dataset. ds = single_outcome_datasets[outcome_name] else: # The case of outcome being part of a multi-outcome dataset. for outcome_names in multi_outcome_datasets.keys(): if outcome_name in outcome_names: ds = multi_outcome_datasets[outcome_names] if not set(ds.outcome_names).issubset(subset_outcome_names): raise UnsupportedError( "Breaking up a multi-outcome dataset between " "surrogates is not supported." ) break # Pyre-ignore [61]: `ds` may not be defined but it is guaranteed to be defined. subset_datasets.append(ds) outcomes_processed.extend(ds.outcome_names) return subset_datasets
[docs] def subset_state_dict( state_dict: OrderedDict[str, Tensor], submodel_index: int, ) -> OrderedDict[str, Tensor]: """Get the state dict for a submodel from the state dict of a model list. Args: state_dict: A state dict. submodel_index: The index of the submodel to extract. Returns: The state dict for the submodel. """ expected_substring = f"models.{submodel_index}." len_substring = len(expected_substring) new_items = [ (k[len_substring:], v) for k, v in state_dict.items() if k.startswith(expected_substring) ] return OrderedDict(new_items)