#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from __future__ import annotations
import itertools
import warnings
from collections.abc import Callable, Mapping
from typing import Protocol, Sequence, Union
import numpy as np
import numpy.typing as npt
import torch
from ax.core.search_space import SearchSpaceDigest
from ax.exceptions.core import SearchSpaceExhausted, UnsupportedError
from ax.models.types import TConfig
from ax.utils.common.typeutils import checked_cast
from botorch.acquisition.risk_measures import RiskMeasureMCObjective
from botorch.exceptions.warnings import OptimizationWarning
from torch import Tensor
# pyre-fixme[24]: Generic type `np.ndarray` expects 2 type parameters.
Tensoray = Union[torch.Tensor, np.ndarray]
[docs]
class TorchModelLike(Protocol):
"""A protocol that stands in for ``TorchModel`` like objects that
have a ``predict`` method.
"""
[docs]
def predict(self, X: Tensor) -> tuple[Tensor, Tensor]:
"""Predicts outcomes given an input tensor.
Args:
X: A ``n x d`` tensor of input parameters.
Returns:
Tensor: The predicted posterior mean as an ``n x o``-dim tensor.
Tensor: The predicted posterior covariance as a ``n x o x o``-dim tensor.
"""
...
DEFAULT_MAX_RS_DRAWS = 10000
[docs]
def rejection_sample(
gen_unconstrained: Callable[
[int, int, npt.NDArray, dict[int, float] | None], npt.NDArray
],
n: int,
d: int,
tunable_feature_indices: npt.NDArray,
linear_constraints: tuple[npt.NDArray, npt.NDArray] | None = None,
deduplicate: bool = False,
max_draws: int | None = None,
fixed_features: dict[int, float] | None = None,
rounding_func: Callable[[npt.NDArray], npt.NDArray] | None = None,
existing_points: npt.NDArray | None = None,
) -> tuple[npt.NDArray, int]:
"""Rejection sample in parameter space. Parameter space is typically
[0, 1] for all tunable parameters.
Models must implement a `gen_unconstrained` method in order to support
rejection sampling via this utility.
Args:
gen_unconstrained: A callable that generates unconstrained points in
the parameter space. This is typically the `_gen_unconstrained` method
of a `RandomModel`.
n: Number of samples to generate.
d: Dimensionality of the parameter space.
tunable_feature_indices: Indices of the tunable features in the
parameter space.
linear_constraints: A tuple of (A, b). For k linear constraints on
d-dimensional x, A is (k x d) and b is (k x 1) such that
A x <= b.
deduplicate: If true, reject points that are duplicates of previously
generated points. The points are deduplicated after applying the
rounding function.
max_draws: Maximum number of attemped draws before giving up.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value during generation.
rounding_func: A function that rounds an optimization result
appropriately (e.g., according to `round-trip` transformations).
existing_points: A set of previously generated points to use
for deduplication. These should be provided in the parameter
space model operates in.
"""
# We need to perform the round trip transformation on our generated point
# in order to deduplicate in the original search space.
# The transformation is applied above.
if deduplicate and rounding_func is None:
raise ValueError("Rounding function must be provided for deduplication.")
# Rejection sample with parameter constraints.
points = np.zeros((n, d))
attempted_draws = 0
successful_draws = 0
if max_draws is None:
max_draws = DEFAULT_MAX_RS_DRAWS
while successful_draws < n and attempted_draws <= max_draws:
# _gen_unconstrained returns points including fixed features.
# pyre-ignore: Anonymous function w/ named args.
point = gen_unconstrained(
n=1,
d=d,
tunable_feature_indices=tunable_feature_indices,
fixed_features=fixed_features,
)[0]
# Check parameter constraints, always in raw transformed space.
if linear_constraints is not None:
all_constraints_satisfied, _ = check_param_constraints(
linear_constraints=linear_constraints, point=point
)
else:
all_constraints_satisfied = True
if all_constraints_satisfied:
# Apply the rounding function if the point satisfies the linear constraints.
if rounding_func is not None:
# NOTE: This could still fail rounding with a logger warning. But this
# should be rare since the point is feasible in the continuous space.
point = rounding_func(point)
# Deduplicate: don't add the same point twice.
duplicate = False
if deduplicate:
if existing_points is not None:
prev_points = np.vstack(
[points[:successful_draws, :], existing_points]
)
else:
prev_points = points[:successful_draws, :]
duplicate = check_duplicate(point=point, points=prev_points)
# Add point if valid.
if not duplicate:
points[successful_draws] = point
successful_draws += 1
attempted_draws += 1
if successful_draws < n:
# Only possible if attempted_draws >= max_draws.
raise SearchSpaceExhausted(
f"Rejection sampling error (specified maximum draws ({max_draws}) exhausted"
f", without finding sufficiently many ({n}) candidates). This likely means "
"that there are no new points left in the search space."
)
else:
return (points, attempted_draws)
[docs]
def check_duplicate(point: npt.NDArray, points: npt.NDArray) -> bool:
"""Check if a point exists in another array.
Args:
point: Newly generated point to check.
points: Points previously generated.
Returns:
True if the point is contained in points, else False
"""
for p in points:
if np.array_equal(p, point):
return True
return False
[docs]
def add_fixed_features(
tunable_points: npt.NDArray,
d: int,
fixed_features: dict[int, float] | None,
tunable_feature_indices: npt.NDArray,
) -> npt.NDArray:
"""Add fixed features to points in tunable space.
Args:
tunable_points: Points in tunable space.
d: Dimension of parameter space.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value during generation.
tunable_feature_indices: Parameter indices (in d) which are tunable.
Returns:
points: Points in the full d-dimensional space, defined by bounds.
"""
n = np.shape(tunable_points)[0]
points = np.zeros((n, d))
points[:, tunable_feature_indices] = tunable_points
if fixed_features:
fixed_feature_indices = np.array(list(fixed_features.keys()))
fixed_values = np.tile(list(fixed_features.values()), (n, 1))
points[:, fixed_feature_indices] = fixed_values
return points
[docs]
def check_param_constraints(
linear_constraints: tuple[npt.NDArray, npt.NDArray],
point: npt.NDArray,
) -> tuple[bool, npt.NDArray]:
"""Check if a point satisfies parameter constraints.
Args:
linear_constraints: A tuple of (A, b). For k linear constraints on
d-dimensional x, A is (k x d) and b is (k x 1) such that
A x <= b.
point: A candidate point in d-dimensional space, as a (1 x d) matrix.
Returns:
2-element tuple containing
- Flag that is True if all constraints are satisfied by the point.
- Indices of constraints which are violated by the point.
"""
constraints_satisfied = (
linear_constraints[0] @ np.expand_dims(point, axis=1) <= linear_constraints[1]
)
if np.all(constraints_satisfied):
return True, np.array([])
else:
return (False, np.where(constraints_satisfied == False)[0]) # noqa: E712
[docs]
def tunable_feature_indices(
bounds: list[tuple[float, float]],
fixed_features: dict[int, float] | None = None,
) -> npt.NDArray:
"""Get the feature indices of tunable features.
Args:
bounds: A list of (lower, upper) tuples for each column of X.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value during generation.
Returns:
The indices of tunable features.
"""
fixed_feature_indices = list(fixed_features.keys()) if fixed_features else []
feature_indices = np.arange(len(bounds))
return np.delete(feature_indices, fixed_feature_indices)
[docs]
def validate_bounds(
bounds: list[tuple[float, float]],
fixed_feature_indices: npt.NDArray,
) -> None:
"""Ensure the requested space is [0,1]^d.
Args:
bounds: A list of d (lower, upper) tuples for each column of X.
fixed_feature_indices: Indices of features which are fixed at a
particular value.
"""
for feature_idx, bound in enumerate(bounds):
# Bounds for fixed features are not unit-transformed.
if feature_idx in fixed_feature_indices:
continue
if bound[0] != 0 or bound[1] != 1:
raise ValueError(
"This generator operates on [0,1]^d. Please make use "
"of the UnitX transform in the ModelBridge, and ensure "
"task features are fixed."
)
[docs]
def best_observed_point(
model: TorchModelLike,
bounds: list[tuple[float, float]],
objective_weights: Tensoray | None,
outcome_constraints: tuple[Tensoray, Tensoray] | None = None,
linear_constraints: tuple[Tensoray, Tensoray] | None = None,
fixed_features: dict[int, float] | None = None,
risk_measure: RiskMeasureMCObjective | None = None,
options: TConfig | None = None,
) -> Tensoray | None:
"""Select the best point that has been observed.
Implements two approaches to selecting the best point.
For both approaches, only points that satisfy parameter space constraints
(bounds, linear_constraints, fixed_features) will be returned. Points must
also be observed for all objective and constraint outcomes. Returned
points may violate outcome constraints, depending on the method below.
1: Select the point that maximizes the expected utility
(objective_weights^T posterior_objective_means - baseline) * Prob(feasible)
Here baseline should be selected so that at least one point has positive
utility. It can be specified in the options dict, otherwise
min (objective_weights^T posterior_objective_means)
will be used, where the min is over observed points.
2: Select the best-objective point that is feasible with at least
probability p.
The following quantities may be specified in the options dict:
- best_point_method: 'max_utility' (default) or 'feasible_threshold'
to select between the two approaches described above.
- utility_baseline: Value for the baseline used in max_utility approach. If
not provided, defaults to min objective value.
- probability_threshold: Threshold for the feasible_threshold approach.
Defaults to p=0.95.
- feasibility_mc_samples: Number of MC samples used for estimating the
probability of feasibility (defaults 10k).
Args:
model: A Torch model or Surrogate.
bounds: A list of (lower, upper) tuples for each feature.
objective_weights: The objective is to maximize a weighted sum of
the columns of f(x). These are the weights.
outcome_constraints: A tuple of (A, b). For k outcome constraints
and m outputs at f(x), A is (k x m) and b is (k x 1) such that
A f(x) <= b.
linear_constraints: A tuple of (A, b). For k linear constraints on
d-dimensional x, A is (k x d) and b is (k x 1) such that
A x <= b.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value in the best point.
risk_measure: An optional risk measure for reporting best robust point.
options: A config dictionary with settings described above.
Returns:
A d-array of the best point, or None if no feasible point exists.
"""
if not hasattr(model, "Xs"):
raise ValueError(f"Model must store training data Xs, but {model} does not.")
best_point_and_value = best_in_sample_point(
Xs=model.Xs, # pyre-ignore[16]: Presence of attr. checked above.
model=model,
bounds=bounds,
objective_weights=objective_weights,
outcome_constraints=outcome_constraints,
linear_constraints=linear_constraints,
fixed_features=fixed_features,
risk_measure=risk_measure,
options=options,
)
return None if best_point_and_value is None else best_point_and_value[0]
[docs]
def best_in_sample_point(
Xs: list[torch.Tensor] | list[npt.NDArray],
model: TorchModelLike,
bounds: list[tuple[float, float]],
objective_weights: Tensoray | None,
outcome_constraints: tuple[Tensoray, Tensoray] | None = None,
linear_constraints: tuple[Tensoray, Tensoray] | None = None,
fixed_features: dict[int, float] | None = None,
risk_measure: RiskMeasureMCObjective | None = None,
options: TConfig | None = None,
) -> tuple[Tensoray, float] | None:
"""Select the best point that has been observed.
Implements two approaches to selecting the best point.
For both approaches, only points that satisfy parameter space constraints
(bounds, linear_constraints, fixed_features) will be returned. Points must
also be observed for all objective and constraint outcomes. Returned
points may violate outcome constraints, depending on the method below.
1: Select the point that maximizes the expected utility
(objective_weights^T posterior_objective_means - baseline) * Prob(feasible)
Here baseline should be selected so that at least one point has positive
utility. It can be specified in the options dict, otherwise
min (objective_weights^T posterior_objective_means)
will be used, where the min is over observed points.
2: Select the best-objective point that is feasible with at least
probability p.
The following quantities may be specified in the options dict:
- best_point_method: 'max_utility' (default) or 'feasible_threshold'
to select between the two approaches described above.
- utility_baseline: Value for the baseline used in max_utility approach. If
not provided, defaults to min objective value.
- probability_threshold: Threshold for the feasible_threshold approach.
Defaults to p=0.95.
- feasibility_mc_samples: Number of MC samples used for estimating the
probability of feasibility (defaults 10k).
Args:
Xs: Training data for the points, among which to select the best.
model: A Torch model or Surrogate.
bounds: A list of (lower, upper) tuples for each feature.
objective_weights: The objective is to maximize a weighted sum of
the columns of f(x). These are the weights.
outcome_constraints: A tuple of (A, b). For k outcome constraints
and m outputs at f(x), A is (k x m) and b is (k x 1) such that
A f(x) <= b.
linear_constraints: A tuple of (A, b). For k linear constraints on
d-dimensional x, A is (k x d) and b is (k x 1) such that
A x <= b.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value in the best point.
risk_measure: An optional risk measure for reporting best robust point.
options: A config dictionary with settings described above.
Returns:
A two-element tuple or None if no feasible point exist. In tuple:
- d-array of the best point,
- utility at the best point.
"""
if risk_measure is not None:
# TODO[T131759268]: We need to apply the risk measure. Instead of doing obj_w @,
# we could use `get_botorch_objective_and_transform` to get the objective
# then apply it, though we also need to decide how to deal with constraints.
raise NotImplementedError
# Parse options
if options is None:
options = {}
method: str = options.get("best_point_method", "max_utility")
B: float | None = options.get("utility_baseline", None)
threshold: float = options.get("probability_threshold", 0.95)
nsamp: int = options.get("feasibility_mc_samples", 10000)
# Get points observed for all objective and constraint outcomes
if objective_weights is None:
return None
objective_weights_np = checked_cast(np.ndarray, as_array(objective_weights))
X_obs = get_observed(
Xs=Xs,
objective_weights=objective_weights,
outcome_constraints=outcome_constraints,
)
# Filter to those that satisfy constraints.
X_obs = filter_constraints_and_fixed_features(
X=X_obs,
bounds=bounds,
linear_constraints=linear_constraints,
fixed_features=fixed_features,
)
if len(X_obs) == 0:
# No feasible points
return None
# Predict objective and P(feas) at these points for Torch models.
if isinstance(Xs[0], torch.Tensor):
# pyre-fixme[16]: Item `ndarray` of `Union[ndarray[typing.Any, typing.Any],
# Tensor]` has no attribute `detach`.
X_obs = X_obs.detach().clone()
f, cov = as_array(model.predict(X_obs))
obj = objective_weights_np @ f.transpose()
pfeas = np.ones_like(obj)
if outcome_constraints is not None:
A, b = as_array(outcome_constraints) # (m x j) and (m x 1)
# Use Monte Carlo to compute pfeas, to properly handle covariance
# across outcomes.
for i, _ in enumerate(X_obs):
z = np.random.multivariate_normal(
mean=f[i, :], cov=cov[i, :, :], size=nsamp
) # (nsamp x j)
pfeas[i] = (A @ z.transpose() <= b).all(axis=0).mean()
# Identify best point
if method == "feasible_threshold":
utility = obj
utility[pfeas < threshold] = -np.inf
elif method == "max_utility":
if B is None:
B = obj.min()
utility = (obj - B) * pfeas
else: # pragma: no cover
raise UnsupportedError(f"Unknown best point method {method}.")
i = np.argmax(utility)
if utility[i] == -np.inf:
return None
else:
return X_obs[i, :], utility[i]
[docs]
def as_array(
x: Tensoray | tuple[Tensoray, ...],
) -> npt.NDArray | tuple[npt.NDArray, ...]:
"""Convert every item in a tuple of tensors/arrays into an array.
Args:
x: A tensor, array, or a tuple of potentially mixed tensors and arrays.
Returns:
x, with everything converted to array.
"""
if isinstance(x, tuple):
return tuple(as_array(x_i) for x_i in x) # pyre-ignore
elif isinstance(x, np.ndarray):
return x
elif torch.is_tensor(x):
return x.detach().cpu().double().numpy()
else:
raise ValueError("Input to as_array must be numpy array or torch tensor")
[docs]
def get_observed(
Xs: list[torch.Tensor] | list[npt.NDArray],
objective_weights: Tensoray,
outcome_constraints: tuple[Tensoray, Tensoray] | None = None,
# pyre-fixme[7]: Expected `Union[ndarray[typing.Any, typing.Any], Tensor]` but got
# implicit return value of `None`.
) -> Tensoray:
"""Filter points to those that are observed for objective outcomes and outcomes
that show up in outcome_constraints (if there are any).
Args:
Xs: A list of m (k_i x d) feature matrices X. Number of rows k_i
can vary from i=1,...,m.
objective_weights: The objective is to maximize a weighted sum of
the columns of f(x). These are the weights.
outcome_constraints: A tuple of (A, b). For k outcome constraints
and m outputs at f(x), A is (k x m) and b is (k x 1) such that
A f(x) <= b.
Returns:
Points observed for all objective outcomes and outcome constraints.
"""
objective_weights_np = as_array(objective_weights)
used_outcomes: set[int] = set(np.where(objective_weights_np != 0)[0])
if len(used_outcomes) == 0:
raise ValueError("At least one objective weight must be non-zero")
if outcome_constraints is not None:
used_outcomes = used_outcomes.union(
np.where(as_array(outcome_constraints)[0] != 0)[1]
)
outcome_list = list(used_outcomes)
X_obs_set = {tuple(float(x_i) for x_i in x) for x in Xs[outcome_list[0]]}
for _, idx in enumerate(outcome_list, start=1):
X_obs_set = X_obs_set.intersection(
{tuple(float(x_i) for x_i in x) for x in Xs[idx]}
)
if isinstance(Xs[0], np.ndarray):
# pyre-fixme[6]: For 2nd param expected `Union[None, Dict[str, Tuple[typing.A...
return np.array(list(X_obs_set), dtype=Xs[0].dtype) # (n x d)
if isinstance(Xs[0], torch.Tensor):
# pyre-fixme[6]: For 3rd param expected `Optional[_C.dtype]` but got
# `Union[np.dtype, _C.dtype]`.
# pyre-fixme[16]: Item `ndarray` of `Union[ndarray[typing.Any, typing.Any],
# Tensor]` has no attribute `device`.
return torch.tensor(list(X_obs_set), device=Xs[0].device, dtype=Xs[0].dtype)
[docs]
def filter_constraints_and_fixed_features(
X: Tensoray,
bounds: list[tuple[float, float]],
linear_constraints: tuple[Tensoray, Tensoray] | None = None,
fixed_features: dict[int, float] | None = None,
) -> Tensoray:
"""Filter points to those that satisfy bounds, linear_constraints, and
fixed_features.
Args:
X: An tensor or array of points.
bounds: A list of (lower, upper) tuples for each feature.
linear_constraints: A tuple of (A, b). For k linear constraints on
d-dimensional x, A is (k x d) and b is (k x 1) such that
A x <= b.
fixed_features: A map {feature_index: value} for features that
should be fixed to a particular value in the best point.
Returns:
Feasible points.
"""
if len(X) == 0: # if there are no points, nothing to filter
return X
X_np = X
if isinstance(X, torch.Tensor):
X_np = X.cpu().numpy()
feas = np.ones(X_np.shape[0], dtype=bool) # (n)
for i, b in enumerate(bounds):
feas &= (X_np[:, i] >= b[0]) & (X_np[:, i] <= b[1])
if linear_constraints is not None:
A, b = as_array(linear_constraints) # (m x d) and (m x 1)
# pyre-fixme[20]: Call `torch._C.TensorBase.transpose` expects argument `dim0`.
feas &= (A @ X_np.transpose() <= b).all(axis=0)
if fixed_features is not None:
for idx, val in fixed_features.items():
feas &= X_np[:, idx] == val
X_feas = X_np[feas, :]
if isinstance(X, torch.Tensor):
return torch.from_numpy(X_feas).to(device=X.device, dtype=X.dtype)
else:
return X_feas
[docs]
def mk_discrete_choices(
ssd: SearchSpaceDigest,
fixed_features: Mapping[int, float] | None = None,
) -> Mapping[int, Sequence[float]]:
discrete_choices = ssd.discrete_choices
# Add in fixed features.
if fixed_features is not None:
# Note: if any discrete features are fixed we won't enumerate those.
discrete_choices = {
**discrete_choices,
**{k: [v] for k, v in fixed_features.items()},
}
return discrete_choices
[docs]
def enumerate_discrete_combinations(
discrete_choices: Mapping[int, Sequence[float]],
) -> list[dict[int, float]]:
n_combos = np.prod([len(v) for v in discrete_choices.values()])
if n_combos > 50:
warnings.warn(
f"Enumerating {n_combos} combinations of discrete parameter values "
"while optimizing over a mixed search space. This can be very slow.",
OptimizationWarning,
stacklevel=2,
)
fixed_features_list = [
dict(zip(discrete_choices.keys(), c))
for c in itertools.product(*discrete_choices.values())
]
return fixed_features_list
[docs]
def all_ordinal_features_are_integer_valued(
ssd: SearchSpaceDigest,
) -> bool:
"""Check if all ordinal features are integer-valued.
Args:
ssd: A SearchSpaceDigest.
Returns:
True if all ordinal features are integer-valued, False otherwise.
"""
for feature_idx in ssd.ordinal_features:
choices = ssd.discrete_choices[feature_idx]
int_choices = [int(c) for c in choices]
if choices != int_choices:
return False
return True