Source code for ax.models.torch.utils

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import Callable, Dict, List, Optional, Tuple

import botorch.utils.sampling as botorch_sampling
import numpy as np
import torch
from ax.exceptions.model import ModelError
from ax.models.model_utils import filter_constraints_and_fixed_features, get_observed
from ax.models.random.sobol import SobolGenerator
from botorch.models import ModelListGP, SingleTaskGP
from botorch.models.model import Model
from torch import Tensor


NOISELESS_MODELS = {SingleTaskGP}


# Distributions
SIMPLEX = "simplex"
HYPERSPHERE = "hypersphere"


[docs]def is_noiseless(model: Model) -> bool: """Check if a given (single-task) botorch model is noiseless""" if isinstance(model, ModelListGP): raise ModelError( "Checking for noisless models only applies to sub-models of ModelListGP" ) return model.__class__ in NOISELESS_MODELS
def _filter_X_observed( Xs: List[Tensor], objective_weights: Tensor, bounds: List[Tuple[float, float]], outcome_constraints: Optional[Tuple[Tensor, Tensor]] = None, linear_constraints: Optional[Tuple[Tensor, Tensor]] = None, fixed_features: Optional[Dict[int, float]] = None, ) -> Optional[Tensor]: r"""Filter input points to those appearing in objective or constraints. Args: Xs: The input tensors of a model. objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. bounds: A list of (lower, upper) tuples for each column of X. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. (Not used by single task models) linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. (Not used by single task models) fixed_features: A map {feature_index: value} for features that should be fixed to a particular value during generation. Returns: Tensor: All points that are feasible and appear in the objective or the constraints. None if there are no such points. """ # Get points observed for all objective and constraint outcomes X_obs = get_observed( Xs=Xs, objective_weights=objective_weights, outcome_constraints=outcome_constraints, ) # Filter to those that satisfy constraints. X_obs = filter_constraints_and_fixed_features( X=X_obs, bounds=bounds, linear_constraints=linear_constraints, fixed_features=fixed_features, ) if len(X_obs) > 0: return torch.as_tensor(X_obs) # please the linter def _get_X_pending_and_observed( Xs: List[Tensor], objective_weights: Tensor, bounds: List[Tuple[float, float]], pending_observations: Optional[List[Tensor]] = None, outcome_constraints: Optional[Tuple[Tensor, Tensor]] = None, linear_constraints: Optional[Tuple[Tensor, Tensor]] = None, fixed_features: Optional[Dict[int, float]] = None, ) -> Tuple[Optional[Tensor], Optional[Tensor]]: r"""Get pending and observed points. Args: Xs: The input tensors of a model. pending_observations: A list of m (k_i x d) feature tensors X for m outcomes and k_i pending observations for outcome i. (Only used if n > 1). objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. bounds: A list of (lower, upper) tuples for each column of X. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. (Not used by single task models) linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. (Not used by single task models) fixed_features: A map {feature_index: value} for features that should be fixed to a particular value during generation. Returns: Tensor: Pending points that are feasible and appear in the objective or the constraints. None if there are no such points. Tensor: Observed points that are feasible and appear in the objective or the constraints. None if there are no such points. """ if pending_observations is None: X_pending = None else: X_pending = _filter_X_observed( Xs=pending_observations, objective_weights=objective_weights, outcome_constraints=outcome_constraints, bounds=bounds, linear_constraints=linear_constraints, fixed_features=fixed_features, ) X_observed = _filter_X_observed( Xs=Xs, objective_weights=objective_weights, outcome_constraints=outcome_constraints, bounds=bounds, linear_constraints=linear_constraints, fixed_features=fixed_features, ) return X_pending, X_observed def _generate_sobol_points( n_sobol: int, bounds: List[Tuple[float, float]], device: torch.device, linear_constraints: Optional[Tuple[Tensor, Tensor]] = None, fixed_features: Optional[Dict[int, float]] = None, rounding_func: Optional[Callable[[Tensor], Tensor]] = None, ) -> Tensor: linear_constraints_array = None if linear_constraints is not None: linear_constraints_array = ( linear_constraints[0].detach().numpy(), linear_constraints[1].detach().numpy(), ) array_rounding_func = None if rounding_func is not None: array_rounding_func = tensor_callable_to_array_callable( tensor_func=rounding_func, device=device ) sobol = SobolGenerator(deduplicate=False, seed=np.random.randint(10000)) array_X, _ = sobol.gen( n=n_sobol, bounds=bounds, linear_constraints=linear_constraints_array, fixed_features=fixed_features, rounding_func=array_rounding_func, ) return torch.from_numpy(array_X).to(device)
[docs]def normalize_indices(indices: List[int], d: int) -> List[int]: r"""Normalize a list of indices to ensure that they are positive. Args: indices: A list of indices (may contain negative indices for indexing "from the back"). d: The dimension of the tensor to index. Returns: A normalized list of indices such that each index is between `0` and `d-1`. """ normalized_indices = [] for i in indices: if i < 0: i = i + d if i < 0 or i > d - 1: raise ValueError(f"Index {i} out of bounds for tensor or length {d}.") normalized_indices.append(i) return normalized_indices
[docs]def subset_model( model: Model, objective_weights: Tensor, outcome_constraints: Optional[Tuple[Tensor, Tensor]] = None, ) -> Tuple[Model, Tensor, Optional[Tuple[Tensor, Tensor]]]: """Subset a botorch model to the outputs used in the optimization. Args: model: A BoTorch Model. If the model does not implement the `subset_outputs` method, this function is a null-op and returns the input arguments. objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. (Not used by single task models) Returns: A three-tuple of model, objective_weights, and outcome_constraints, all subset to only those outputs that appear in either the objective weights or the outcome constraints. """ nonzero = objective_weights != 0 if outcome_constraints is not None: A, _ = outcome_constraints nonzero = nonzero | torch.any(A != 0, dim=0) idcs = torch.arange(nonzero.size(0))[nonzero].tolist() if len(idcs) == model.num_outputs: # if we use all model outputs, just return the inputs return model, objective_weights, outcome_constraints elif len(idcs) > model.num_outputs: raise RuntimeError( "Model size inconsistency. Tryting to subset a model with " f"{model.num_outputs} outputs to {len(idcs)} outputs" ) try: model = model.subset_output(idcs=idcs) objective_weights = objective_weights[nonzero] if outcome_constraints is not None: A, b = outcome_constraints outcome_constraints = A[:, nonzero], b except NotImplementedError: pass return model, objective_weights, outcome_constraints
def _to_inequality_constraints( linear_constraints: Optional[Tuple[Tensor, Tensor]] = None ) -> Optional[List[Tuple[Tensor, Tensor, float]]]: if linear_constraints is not None: A, b = linear_constraints inequality_constraints = [] k, d = A.shape for i in range(k): indicies = A[i, :].nonzero().squeeze() coefficients = -A[i, indicies] rhs = -b[i, 0] inequality_constraints.append((indicies, coefficients, rhs)) else: inequality_constraints = None return inequality_constraints
[docs]def sample_simplex(dim: int) -> Tensor: """Sample uniformly from a dim-simplex.""" return botorch_sampling.sample_simplex(dim, dtype=torch.double).squeeze()
[docs]def sample_hypersphere_positive_quadrant(dim: int) -> Tensor: """Sample uniformly from the positive quadrant of a dim-sphere.""" return torch.abs( botorch_sampling.sample_hypersphere(dim, dtype=torch.double).squeeze() )
[docs]def tensor_callable_to_array_callable( tensor_func: Callable[[Tensor], Tensor], device: torch.device ) -> Callable[[np.ndarray], np.ndarray]: """"transfer a tensor callable to an array callable""" # TODO: move this reuseable function and its equivalent reverse functions # to some utils files def array_func(x: np.ndarray) -> np.ndarray: return tensor_func(torch.from_numpy(x).to(device)).detach().numpy() return array_func