Source code for ax.models.model_utils

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from __future__ import annotations

import itertools
import warnings
from collections.abc import Callable, Mapping
from typing import Protocol, Sequence, Union

import numpy as np
import numpy.typing as npt
import torch
from ax.core.search_space import SearchSpaceDigest
from ax.exceptions.core import SearchSpaceExhausted, UnsupportedError
from ax.models.types import TConfig
from ax.utils.common.typeutils import checked_cast
from botorch.acquisition.risk_measures import RiskMeasureMCObjective
from botorch.exceptions.warnings import OptimizationWarning
from torch import Tensor


# pyre-fixme[24]: Generic type `np.ndarray` expects 2 type parameters.
Tensoray = Union[torch.Tensor, np.ndarray]


[docs] class TorchModelLike(Protocol): """A protocol that stands in for ``TorchModel`` like objects that have a ``predict`` method. """
[docs] def predict(self, X: Tensor) -> tuple[Tensor, Tensor]: """Predicts outcomes given an input tensor. Args: X: A ``n x d`` tensor of input parameters. Returns: Tensor: The predicted posterior mean as an ``n x o``-dim tensor. Tensor: The predicted posterior covariance as a ``n x o x o``-dim tensor. """ ...
DEFAULT_MAX_RS_DRAWS = 10000
[docs] def rejection_sample( gen_unconstrained: Callable[ [int, int, npt.NDArray, dict[int, float] | None], npt.NDArray ], n: int, d: int, tunable_feature_indices: npt.NDArray, linear_constraints: tuple[npt.NDArray, npt.NDArray] | None = None, deduplicate: bool = False, max_draws: int | None = None, fixed_features: dict[int, float] | None = None, rounding_func: Callable[[npt.NDArray], npt.NDArray] | None = None, existing_points: npt.NDArray | None = None, ) -> tuple[npt.NDArray, int]: """Rejection sample in parameter space. Parameter space is typically [0, 1] for all tunable parameters. Models must implement a `gen_unconstrained` method in order to support rejection sampling via this utility. Args: gen_unconstrained: A callable that generates unconstrained points in the parameter space. This is typically the `_gen_unconstrained` method of a `RandomModel`. n: Number of samples to generate. d: Dimensionality of the parameter space. tunable_feature_indices: Indices of the tunable features in the parameter space. linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. deduplicate: If true, reject points that are duplicates of previously generated points. The points are deduplicated after applying the rounding function. max_draws: Maximum number of attemped draws before giving up. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value during generation. rounding_func: A function that rounds an optimization result appropriately (e.g., according to `round-trip` transformations). existing_points: A set of previously generated points to use for deduplication. These should be provided in the parameter space model operates in. """ # We need to perform the round trip transformation on our generated point # in order to deduplicate in the original search space. # The transformation is applied above. if deduplicate and rounding_func is None: raise ValueError("Rounding function must be provided for deduplication.") # Rejection sample with parameter constraints. points = np.zeros((n, d)) attempted_draws = 0 successful_draws = 0 if max_draws is None: max_draws = DEFAULT_MAX_RS_DRAWS while successful_draws < n and attempted_draws <= max_draws: # _gen_unconstrained returns points including fixed features. # pyre-ignore: Anonymous function w/ named args. point = gen_unconstrained( n=1, d=d, tunable_feature_indices=tunable_feature_indices, fixed_features=fixed_features, )[0] # Check parameter constraints, always in raw transformed space. if linear_constraints is not None: all_constraints_satisfied, _ = check_param_constraints( linear_constraints=linear_constraints, point=point ) else: all_constraints_satisfied = True if all_constraints_satisfied: # Apply the rounding function if the point satisfies the linear constraints. if rounding_func is not None: # NOTE: This could still fail rounding with a logger warning. But this # should be rare since the point is feasible in the continuous space. point = rounding_func(point) # Deduplicate: don't add the same point twice. duplicate = False if deduplicate: if existing_points is not None: prev_points = np.vstack( [points[:successful_draws, :], existing_points] ) else: prev_points = points[:successful_draws, :] duplicate = check_duplicate(point=point, points=prev_points) # Add point if valid. if not duplicate: points[successful_draws] = point successful_draws += 1 attempted_draws += 1 if successful_draws < n: # Only possible if attempted_draws >= max_draws. raise SearchSpaceExhausted( f"Rejection sampling error (specified maximum draws ({max_draws}) exhausted" f", without finding sufficiently many ({n}) candidates). This likely means " "that there are no new points left in the search space." ) else: return (points, attempted_draws)
[docs] def check_duplicate(point: npt.NDArray, points: npt.NDArray) -> bool: """Check if a point exists in another array. Args: point: Newly generated point to check. points: Points previously generated. Returns: True if the point is contained in points, else False """ for p in points: if np.array_equal(p, point): return True return False
[docs] def add_fixed_features( tunable_points: npt.NDArray, d: int, fixed_features: dict[int, float] | None, tunable_feature_indices: npt.NDArray, ) -> npt.NDArray: """Add fixed features to points in tunable space. Args: tunable_points: Points in tunable space. d: Dimension of parameter space. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value during generation. tunable_feature_indices: Parameter indices (in d) which are tunable. Returns: points: Points in the full d-dimensional space, defined by bounds. """ n = np.shape(tunable_points)[0] points = np.zeros((n, d)) points[:, tunable_feature_indices] = tunable_points if fixed_features: fixed_feature_indices = np.array(list(fixed_features.keys())) fixed_values = np.tile(list(fixed_features.values()), (n, 1)) points[:, fixed_feature_indices] = fixed_values return points
[docs] def check_param_constraints( linear_constraints: tuple[npt.NDArray, npt.NDArray], point: npt.NDArray, ) -> tuple[bool, npt.NDArray]: """Check if a point satisfies parameter constraints. Args: linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. point: A candidate point in d-dimensional space, as a (1 x d) matrix. Returns: 2-element tuple containing - Flag that is True if all constraints are satisfied by the point. - Indices of constraints which are violated by the point. """ constraints_satisfied = ( linear_constraints[0] @ np.expand_dims(point, axis=1) <= linear_constraints[1] ) if np.all(constraints_satisfied): return True, np.array([]) else: return (False, np.where(constraints_satisfied == False)[0]) # noqa: E712
[docs] def tunable_feature_indices( bounds: list[tuple[float, float]], fixed_features: dict[int, float] | None = None, ) -> npt.NDArray: """Get the feature indices of tunable features. Args: bounds: A list of (lower, upper) tuples for each column of X. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value during generation. Returns: The indices of tunable features. """ fixed_feature_indices = list(fixed_features.keys()) if fixed_features else [] feature_indices = np.arange(len(bounds)) return np.delete(feature_indices, fixed_feature_indices)
[docs] def validate_bounds( bounds: list[tuple[float, float]], fixed_feature_indices: npt.NDArray, ) -> None: """Ensure the requested space is [0,1]^d. Args: bounds: A list of d (lower, upper) tuples for each column of X. fixed_feature_indices: Indices of features which are fixed at a particular value. """ for feature_idx, bound in enumerate(bounds): # Bounds for fixed features are not unit-transformed. if feature_idx in fixed_feature_indices: continue if bound[0] != 0 or bound[1] != 1: raise ValueError( "This generator operates on [0,1]^d. Please make use " "of the UnitX transform in the ModelBridge, and ensure " "task features are fixed." )
[docs] def best_observed_point( model: TorchModelLike, bounds: list[tuple[float, float]], objective_weights: Tensoray | None, outcome_constraints: tuple[Tensoray, Tensoray] | None = None, linear_constraints: tuple[Tensoray, Tensoray] | None = None, fixed_features: dict[int, float] | None = None, risk_measure: RiskMeasureMCObjective | None = None, options: TConfig | None = None, ) -> Tensoray | None: """Select the best point that has been observed. Implements two approaches to selecting the best point. For both approaches, only points that satisfy parameter space constraints (bounds, linear_constraints, fixed_features) will be returned. Points must also be observed for all objective and constraint outcomes. Returned points may violate outcome constraints, depending on the method below. 1: Select the point that maximizes the expected utility (objective_weights^T posterior_objective_means - baseline) * Prob(feasible) Here baseline should be selected so that at least one point has positive utility. It can be specified in the options dict, otherwise min (objective_weights^T posterior_objective_means) will be used, where the min is over observed points. 2: Select the best-objective point that is feasible with at least probability p. The following quantities may be specified in the options dict: - best_point_method: 'max_utility' (default) or 'feasible_threshold' to select between the two approaches described above. - utility_baseline: Value for the baseline used in max_utility approach. If not provided, defaults to min objective value. - probability_threshold: Threshold for the feasible_threshold approach. Defaults to p=0.95. - feasibility_mc_samples: Number of MC samples used for estimating the probability of feasibility (defaults 10k). Args: model: A Torch model or Surrogate. bounds: A list of (lower, upper) tuples for each feature. objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value in the best point. risk_measure: An optional risk measure for reporting best robust point. options: A config dictionary with settings described above. Returns: A d-array of the best point, or None if no feasible point exists. """ if not hasattr(model, "Xs"): raise ValueError(f"Model must store training data Xs, but {model} does not.") best_point_and_value = best_in_sample_point( Xs=model.Xs, # pyre-ignore[16]: Presence of attr. checked above. model=model, bounds=bounds, objective_weights=objective_weights, outcome_constraints=outcome_constraints, linear_constraints=linear_constraints, fixed_features=fixed_features, risk_measure=risk_measure, options=options, ) return None if best_point_and_value is None else best_point_and_value[0]
[docs] def best_in_sample_point( Xs: list[torch.Tensor] | list[npt.NDArray], model: TorchModelLike, bounds: list[tuple[float, float]], objective_weights: Tensoray | None, outcome_constraints: tuple[Tensoray, Tensoray] | None = None, linear_constraints: tuple[Tensoray, Tensoray] | None = None, fixed_features: dict[int, float] | None = None, risk_measure: RiskMeasureMCObjective | None = None, options: TConfig | None = None, ) -> tuple[Tensoray, float] | None: """Select the best point that has been observed. Implements two approaches to selecting the best point. For both approaches, only points that satisfy parameter space constraints (bounds, linear_constraints, fixed_features) will be returned. Points must also be observed for all objective and constraint outcomes. Returned points may violate outcome constraints, depending on the method below. 1: Select the point that maximizes the expected utility (objective_weights^T posterior_objective_means - baseline) * Prob(feasible) Here baseline should be selected so that at least one point has positive utility. It can be specified in the options dict, otherwise min (objective_weights^T posterior_objective_means) will be used, where the min is over observed points. 2: Select the best-objective point that is feasible with at least probability p. The following quantities may be specified in the options dict: - best_point_method: 'max_utility' (default) or 'feasible_threshold' to select between the two approaches described above. - utility_baseline: Value for the baseline used in max_utility approach. If not provided, defaults to min objective value. - probability_threshold: Threshold for the feasible_threshold approach. Defaults to p=0.95. - feasibility_mc_samples: Number of MC samples used for estimating the probability of feasibility (defaults 10k). Args: Xs: Training data for the points, among which to select the best. model: A Torch model or Surrogate. bounds: A list of (lower, upper) tuples for each feature. objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value in the best point. risk_measure: An optional risk measure for reporting best robust point. options: A config dictionary with settings described above. Returns: A two-element tuple or None if no feasible point exist. In tuple: - d-array of the best point, - utility at the best point. """ if risk_measure is not None: # TODO[T131759268]: We need to apply the risk measure. Instead of doing obj_w @, # we could use `get_botorch_objective_and_transform` to get the objective # then apply it, though we also need to decide how to deal with constraints. raise NotImplementedError # Parse options if options is None: options = {} method: str = options.get("best_point_method", "max_utility") B: float | None = options.get("utility_baseline", None) threshold: float = options.get("probability_threshold", 0.95) nsamp: int = options.get("feasibility_mc_samples", 10000) # Get points observed for all objective and constraint outcomes if objective_weights is None: return None objective_weights_np = checked_cast(np.ndarray, as_array(objective_weights)) X_obs = get_observed( Xs=Xs, objective_weights=objective_weights, outcome_constraints=outcome_constraints, ) # Filter to those that satisfy constraints. X_obs = filter_constraints_and_fixed_features( X=X_obs, bounds=bounds, linear_constraints=linear_constraints, fixed_features=fixed_features, ) if len(X_obs) == 0: # No feasible points return None # Predict objective and P(feas) at these points for Torch models. if isinstance(Xs[0], torch.Tensor): # pyre-fixme[16]: Item `ndarray` of `Union[ndarray[typing.Any, typing.Any], # Tensor]` has no attribute `detach`. X_obs = X_obs.detach().clone() f, cov = as_array(model.predict(X_obs)) obj = objective_weights_np @ f.transpose() pfeas = np.ones_like(obj) if outcome_constraints is not None: A, b = as_array(outcome_constraints) # (m x j) and (m x 1) # Use Monte Carlo to compute pfeas, to properly handle covariance # across outcomes. for i, _ in enumerate(X_obs): z = np.random.multivariate_normal( mean=f[i, :], cov=cov[i, :, :], size=nsamp ) # (nsamp x j) pfeas[i] = (A @ z.transpose() <= b).all(axis=0).mean() # Identify best point if method == "feasible_threshold": utility = obj utility[pfeas < threshold] = -np.inf elif method == "max_utility": if B is None: B = obj.min() utility = (obj - B) * pfeas else: # pragma: no cover raise UnsupportedError(f"Unknown best point method {method}.") i = np.argmax(utility) if utility[i] == -np.inf: return None else: return X_obs[i, :], utility[i]
[docs] def as_array( x: Tensoray | tuple[Tensoray, ...], ) -> npt.NDArray | tuple[npt.NDArray, ...]: """Convert every item in a tuple of tensors/arrays into an array. Args: x: A tensor, array, or a tuple of potentially mixed tensors and arrays. Returns: x, with everything converted to array. """ if isinstance(x, tuple): return tuple(as_array(x_i) for x_i in x) # pyre-ignore elif isinstance(x, np.ndarray): return x elif torch.is_tensor(x): return x.detach().cpu().double().numpy() else: raise ValueError("Input to as_array must be numpy array or torch tensor")
[docs] def get_observed( Xs: list[torch.Tensor] | list[npt.NDArray], objective_weights: Tensoray, outcome_constraints: tuple[Tensoray, Tensoray] | None = None, # pyre-fixme[7]: Expected `Union[ndarray[typing.Any, typing.Any], Tensor]` but got # implicit return value of `None`. ) -> Tensoray: """Filter points to those that are observed for objective outcomes and outcomes that show up in outcome_constraints (if there are any). Args: Xs: A list of m (k_i x d) feature matrices X. Number of rows k_i can vary from i=1,...,m. objective_weights: The objective is to maximize a weighted sum of the columns of f(x). These are the weights. outcome_constraints: A tuple of (A, b). For k outcome constraints and m outputs at f(x), A is (k x m) and b is (k x 1) such that A f(x) <= b. Returns: Points observed for all objective outcomes and outcome constraints. """ objective_weights_np = as_array(objective_weights) used_outcomes: set[int] = set(np.where(objective_weights_np != 0)[0]) if len(used_outcomes) == 0: raise ValueError("At least one objective weight must be non-zero") if outcome_constraints is not None: used_outcomes = used_outcomes.union( np.where(as_array(outcome_constraints)[0] != 0)[1] ) outcome_list = list(used_outcomes) X_obs_set = {tuple(float(x_i) for x_i in x) for x in Xs[outcome_list[0]]} for _, idx in enumerate(outcome_list, start=1): X_obs_set = X_obs_set.intersection( {tuple(float(x_i) for x_i in x) for x in Xs[idx]} ) if isinstance(Xs[0], np.ndarray): # pyre-fixme[6]: For 2nd param expected `Union[None, Dict[str, Tuple[typing.A... return np.array(list(X_obs_set), dtype=Xs[0].dtype) # (n x d) if isinstance(Xs[0], torch.Tensor): # pyre-fixme[6]: For 3rd param expected `Optional[_C.dtype]` but got # `Union[np.dtype, _C.dtype]`. # pyre-fixme[16]: Item `ndarray` of `Union[ndarray[typing.Any, typing.Any], # Tensor]` has no attribute `device`. return torch.tensor(list(X_obs_set), device=Xs[0].device, dtype=Xs[0].dtype)
[docs] def filter_constraints_and_fixed_features( X: Tensoray, bounds: list[tuple[float, float]], linear_constraints: tuple[Tensoray, Tensoray] | None = None, fixed_features: dict[int, float] | None = None, ) -> Tensoray: """Filter points to those that satisfy bounds, linear_constraints, and fixed_features. Args: X: An tensor or array of points. bounds: A list of (lower, upper) tuples for each feature. linear_constraints: A tuple of (A, b). For k linear constraints on d-dimensional x, A is (k x d) and b is (k x 1) such that A x <= b. fixed_features: A map {feature_index: value} for features that should be fixed to a particular value in the best point. Returns: Feasible points. """ if len(X) == 0: # if there are no points, nothing to filter return X X_np = X if isinstance(X, torch.Tensor): X_np = X.cpu().numpy() feas = np.ones(X_np.shape[0], dtype=bool) # (n) for i, b in enumerate(bounds): # pyre-fixme[6]: For 1st argument expected `Tensor` but got # `Union[ndarray[Any, dtype[Any]], Tensor]`. feas &= (X_np[:, i] >= b[0]) & (X_np[:, i] <= b[1]) if linear_constraints is not None: A, b = as_array(linear_constraints) # (m x d) and (m x 1) # pyre-fixme[20]: Call `torch._C.TensorBase.transpose` expects argument `dim0`. feas &= (A @ X_np.transpose() <= b).all(axis=0) if fixed_features is not None: for idx, val in fixed_features.items(): feas &= X_np[:, idx] == val X_feas = X_np[feas, :] if isinstance(X, torch.Tensor): return torch.from_numpy(X_feas).to(device=X.device, dtype=X.dtype) else: return X_feas
[docs] def mk_discrete_choices( ssd: SearchSpaceDigest, fixed_features: Mapping[int, float] | None = None, ) -> Mapping[int, Sequence[float]]: discrete_choices = ssd.discrete_choices # Add in fixed features. if fixed_features is not None: # Note: if any discrete features are fixed we won't enumerate those. discrete_choices = { **discrete_choices, **{k: [v] for k, v in fixed_features.items()}, } return discrete_choices
[docs] def enumerate_discrete_combinations( discrete_choices: Mapping[int, Sequence[float]], ) -> list[dict[int, float]]: n_combos = np.prod([len(v) for v in discrete_choices.values()]) if n_combos > 50: warnings.warn( f"Enumerating {n_combos} combinations of discrete parameter values " "while optimizing over a mixed search space. This can be very slow.", OptimizationWarning, stacklevel=2, ) fixed_features_list = [ dict(zip(discrete_choices.keys(), c)) for c in itertools.product(*discrete_choices.values()) ] return fixed_features_list
[docs] def all_ordinal_features_are_integer_valued( ssd: SearchSpaceDigest, ) -> bool: """Check if all ordinal features are integer-valued. Args: ssd: A SearchSpaceDigest. Returns: True if all ordinal features are integer-valued, False otherwise. """ for feature_idx in ssd.ordinal_features: choices = ssd.discrete_choices[feature_idx] int_choices = [int(c) for c in choices] if choices != int_choices: return False return True