Source code for ax.modelbridge.cross_validation

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum
from functools import partial
from numbers import Number
from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Set, Tuple

import numpy as np
from ax.core.observation import Observation, ObservationData
from ax.core.optimization_config import OptimizationConfig
from ax.modelbridge.base import ModelBridge
from ax.utils.common.logger import get_logger
from scipy.stats import fisher_exact, norm, pearsonr, spearmanr

# pyre-fixme[5]: Global expression must be annotated.
logger = get_logger(__name__)

CVDiagnostics = Dict[str, Dict[str, float]]

MEAN_PREDICTION_CI = "Mean prediction CI"
MAPE = "MAPE"
TOTAL_RAW_EFFECT = "Total raw effect"
CORRELATION_COEFFICIENT = "Correlation coefficient"
RANK_CORRELATION = "Rank correlation"
FISHER_EXACT_TEST_P = "Fisher exact test p"
LOG_LIKELIHOOD = "Log likelihood"


[docs]class CVResult(NamedTuple): """Container for cross validation results.""" observed: Observation predicted: ObservationData
[docs]class AssessModelFitResult(NamedTuple): """Container for model fit assessment results""" good_fit_metrics_to_fisher_score: Dict[str, float] bad_fit_metrics_to_fisher_score: Dict[str, float]
[docs]def cross_validate( model: ModelBridge, folds: int = -1, # pyre-fixme[24]: Generic type `Callable` expects 2 type parameters. test_selector: Optional[Callable] = None, ) -> List[CVResult]: """Cross validation for model predictions. Splits the model's training data into train/test folds and makes out-of-sample predictions on the test folds. Train/test splits are made based on arm names, so that repeated observations of a arm will always be in the train or test set together. The test set can be limited to a specific set of observations by passing in a test_selector callable. This function should take in an Observation and return a boolean indiciating if it should be used in the test set or not. For example, we can limit the test set to arms with trial 0 with test_selector = lambda obs: obs.features.trial_index == 0 If not provided, all observations will be available for the test set. Args: model: Fitted model (ModelBridge) to cross validate. folds: Number of folds. Use -1 for leave-one-out, otherwise will be k-fold. test_selector: Function for selecting observations for the test set. Returns: A CVResult for each observation in the training data. """ # Get in-design training points training_data = [ obs for i, obs in enumerate(model.get_training_data()) if model.training_in_design[i] ] arm_names = {obs.arm_name for obs in training_data} n = len(arm_names) if folds > n: raise ValueError(f"Training data only has {n} arms, which is less than folds") elif n == 0: raise ValueError( f"{model.__class__.__name__} has no training data. Either it has been " "incorrectly initialized or should not be cross validated." ) elif folds < 2 and folds != -1: raise ValueError("Folds must be -1 for LOO, or > 1.") elif folds == -1: folds = n arm_names_rnd = np.array(list(arm_names)) np.random.shuffle(arm_names_rnd) result = [] for train_names, test_names in _gen_train_test_split( folds=folds, arm_names=arm_names_rnd ): # Construct train/test data cv_training_data = [] cv_test_data = [] cv_test_points = [] for obs in training_data: if obs.arm_name in train_names: cv_training_data.append(obs) elif obs.arm_name in test_names and ( test_selector is None or test_selector(obs) ): cv_test_points.append(obs.features) cv_test_data.append(obs) if len(cv_test_points) == 0: continue # Make the prediction cv_test_predictions = model.cross_validate( cv_training_data=cv_training_data, cv_test_points=cv_test_points ) # Form CVResult objects for i, obs in enumerate(cv_test_data): result.append(CVResult(observed=obs, predicted=cv_test_predictions[i])) return result
[docs]def cross_validate_by_trial(model: ModelBridge, trial: int = -1) -> List[CVResult]: """Cross validation for model predictions on a particular trial. Uses all of the data up until the specified trial to predict each of the arms that was launched in that trial. Defaults to the last trial. Args: model: Fitted model (ModelBridge) to cross validate. trial: Trial for which predictions are evaluated. Returns: A CVResult for each observation in the training data. """ # Get in-design training points training_data = [ obs for i, obs in enumerate(model.get_training_data()) if model.training_in_design[i] ] all_trials = { int(d.features.trial_index) for d in training_data if d.features.trial_index is not None } if len(all_trials) < 2: raise ValueError(f"Training data has fewer than 2 trials ({all_trials})") if trial < 0: trial = max(all_trials) elif trial not in all_trials: raise ValueError(f"Trial {trial} not found in training data") # Construct train/test data cv_training_data = [] cv_test_data = [] cv_test_points = [] for obs in training_data: if obs.features.trial_index is None: continue elif obs.features.trial_index < trial: cv_training_data.append(obs) elif obs.features.trial_index == trial: cv_test_points.append(obs.features) cv_test_data.append(obs) # Make the prediction cv_test_predictions = model.cross_validate( cv_training_data=cv_training_data, cv_test_points=cv_test_points ) # Form CVResult objects result = [ CVResult(observed=obs, predicted=cv_test_predictions[i]) for i, obs in enumerate(cv_test_data) ] return result
[docs]def compute_diagnostics(result: List[CVResult]) -> CVDiagnostics: """Computes diagnostics for given cross validation results. It provides a dictionary with values for the following diagnostics, for each metric: - 'Mean prediction CI': the average width of the CIs at each of the CV predictions, relative to the observed mean. - 'MAPE': mean absolute percentage error of the estimated mean relative to the observed mean. - 'Total raw effect': the multiple change from the smallest observed mean to the largest observed mean, i.e. `(max - min) / min`. - 'Correlation coefficient': the Pearson correlation of the estimated and observed means. - 'Rank correlation': the Spearman correlation of the estimated and observed means. - 'Fisher exact test p': we test if the model is able to distinguish the bottom half of the observations from the top half, using Fisher's exact test and the observed/estimated means. A low p value indicates that the model has some ability to identify good arms. A high p value indicates that the model cannot identify arms better than chance, or that the observations are too noisy to be able to tell. Each of these is returned as a dictionary from metric name to value for that metric. Args: result: Output of cross_validate Returns: A dictionary keyed by diagnostic name with results as described above. """ # Extract per-metric outcomes from CVResults. y_obs = defaultdict(list) y_pred = defaultdict(list) se_pred = defaultdict(list) for res in result: for j, metric_name in enumerate(res.observed.data.metric_names): y_obs[metric_name].append(res.observed.data.means[j]) # Find the matching prediction k = res.predicted.metric_names.index(metric_name) y_pred[metric_name].append(res.predicted.means[k]) se_pred[metric_name].append(np.sqrt(res.predicted.covariance[k, k])) diagnostic_fns = { MEAN_PREDICTION_CI: _mean_prediction_ci, MAPE: _mape, TOTAL_RAW_EFFECT: _total_raw_effect, CORRELATION_COEFFICIENT: _correlation_coefficient, RANK_CORRELATION: _rank_correlation, FISHER_EXACT_TEST_P: _fisher_exact_test_p, LOG_LIKELIHOOD: _log_likelihood, } diagnostics: Dict[str, Dict[str, float]] = defaultdict(dict) # Get all per-metric diagnostics. for metric_name in y_obs: for name, fn in diagnostic_fns.items(): diagnostics[name][metric_name] = fn( y_obs=np.array(y_obs[metric_name]), y_pred=np.array(y_pred[metric_name]), se_pred=np.array(se_pred[metric_name]), ) return diagnostics
[docs]def assess_model_fit( diagnostics: CVDiagnostics, significance_level: float = 0.1, ) -> AssessModelFitResult: """Assess model fit for given diagnostics results. It determines if a model fit is good or bad based on Fisher exact test p Args: diagnostics: Output of compute_diagnostics Returns: Two dictionaries, one for good metrics, one for bad metrics, each mapping metric name to p-value """ good_fit_metrics_to_fisher_score: Dict[str, float] = {} bad_fit_metrics_to_fisher_score: Dict[str, float] = {} for metric, score in diagnostics[FISHER_EXACT_TEST_P].items(): if score > significance_level: bad_fit_metrics_to_fisher_score[metric] = score else: good_fit_metrics_to_fisher_score[metric] = score if len(bad_fit_metrics_to_fisher_score) > 0: logger.warning( "{} {} {} unable to be reliably fit.".format( ("Metrics" if len(bad_fit_metrics_to_fisher_score) > 1 else "Metric"), (" , ".join(bad_fit_metrics_to_fisher_score.keys())), ("were" if len(bad_fit_metrics_to_fisher_score) > 1 else "was"), ) ) return AssessModelFitResult( good_fit_metrics_to_fisher_score=good_fit_metrics_to_fisher_score, bad_fit_metrics_to_fisher_score=bad_fit_metrics_to_fisher_score, )
[docs]def has_good_opt_config_model_fit( optimization_config: OptimizationConfig, assess_model_fit_result: AssessModelFitResult, ) -> bool: """Assess model fit for given diagnostics results across the optimization config metrics Bad fit criteria: Any objective metrics are poorly fit based on the Fisher exact test p (see assess_model_fit()) TODO[]: Incl. outcome constraints in assessment Args: optimization_config: Objective/Outcome constraint metrics to assess diagnostics: Output of compute_diagnostics Returns: Two dictionaries, one for good metrics, one for bad metrics, each mapping metric name to p-value """ # Bad fit criteria: Any objective metrics are poorly fit # TODO[]: Incl. outcome constraints in assessment has_good_opt_config_fit = all( (m.name in assess_model_fit_result.good_fit_metrics_to_fisher_score) for m in optimization_config.objective.metrics ) return has_good_opt_config_fit
def _gen_train_test_split( folds: int, arm_names: np.ndarray ) -> Iterable[Tuple[Set[str], Set[str]]]: """Return train/test splits of arm names. Args: folds: Number of folds to return arm_names: Array of arm names Returns: Yields (train, test) tuple of arm names. """ n = len(arm_names) test_size = n // folds # The size of all test sets but the last final_size = test_size + (n - folds * test_size) # Grab the leftovers for fold in range(folds): # We will take the test set from the back of the array. # Roll the list of arm names to get a fresh test set arm_names = np.roll(arm_names, test_size) n_test = test_size if fold < folds - 1 else final_size yield set(arm_names[:-n_test]), set(arm_names[-n_test:]) def _mean_prediction_ci( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: # Pyre does not allow float * np.ndarray. return float(np.mean(1.96 * 2 * se_pred / np.abs(y_obs))) def _log_likelihood( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: return float(np.sum(norm.logpdf(y_obs, loc=y_pred, scale=se_pred))) def _mape(y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray) -> float: return float(np.mean(np.abs((y_pred - y_obs) / y_obs))) def _total_raw_effect( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: min_y_obs = np.min(y_obs) return float((np.max(y_obs) - min_y_obs) / min_y_obs) def _correlation_coefficient( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: with np.errstate(invalid="ignore"): rho, _ = pearsonr(y_pred, y_obs) return float(rho) def _rank_correlation( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: with np.errstate(invalid="ignore"): rho, _ = spearmanr(y_pred, y_obs) return float(rho) def _fisher_exact_test_p( y_obs: np.ndarray, y_pred: np.ndarray, se_pred: np.ndarray ) -> float: n_half = len(y_obs) // 2 top_obs = y_obs.argsort(axis=0)[-n_half:] top_est = y_pred.argsort(axis=0)[-n_half:] # Construct contingency table tp = len(set(top_est).intersection(top_obs)) fp = n_half - tp fn = n_half - tp tn = (len(y_obs) - n_half) - (n_half - tp) table = np.array([[tp, fp], [fn, tn]]) # Compute the test statistic _, p = fisher_exact(table, alternative="greater") return float(p)
[docs]class BestModelSelector(ABC):
[docs] @abstractmethod def best_diagnostic(self, diagnostics: List[CVDiagnostics]) -> int: """ Return the index of the best diagnostic. """ pass # pragma: no cover
[docs]class CallableEnum(Enum): # pyre-fixme[3]: Return annotation cannot be `Any`. def __call__(self, *args: Optional[Any], **kwargs: Optional[Any]) -> Any: return self.value(*args, **kwargs)
[docs]class MetricAggregation(CallableEnum): MEAN: Callable[[Iterable[Number]], Number] = partial(np.mean)
[docs]class DiagnosticCriterion(CallableEnum): MIN: Callable[[Iterable[Number]], Number] = partial(np.amin)
[docs]class SingleDiagnosticBestModelSelector(BestModelSelector): """Choose the best model using a single cross-validation diagnostic. The input is a list of CVDiagnostics, each corresponding to one model. The specified diagnostic is extracted from each of the CVDiagnostics, its values (each of which corresponds to a separate metric) are aggregated with the aggregation function, the best one is determined with the criterion, and the index of the best diagnostic result is returned. Example: :: s = SingleDiagnosticBestModelSelector( diagnostic = 'Fisher exact test p', criterion = DiagnosticCriterion.MIN, metric_aggregation = MetricAggregation.MEAN, ) best_diagnostic_index = s.best_diagnostic(diagnostics) Args: diagnostic (str): The name of the diagnostic to use, which should be a key in CVDiagnostic. metric_aggregation (MetricAggregation): Callable applied to the values of the diagnostic for a single model to produce a single number. criterion (DiagnosticCriterion): Callable used to determine which of the (aggregated) diagnostics is the best. Returns: int: index of the selected best diagnostic. """ def __init__( self, diagnostic: str, metric_aggregation: MetricAggregation, criterion: DiagnosticCriterion, ) -> None: self.diagnostic = diagnostic self.metric_aggregation = metric_aggregation self.criterion = criterion
[docs] def best_diagnostic(self, diagnostics: List[CVDiagnostics]) -> int: aggregated_diagnostic_values = [ self.metric_aggregation(list(d[self.diagnostic].values())) for d in diagnostics ] best_diagnostic = self.criterion(aggregated_diagnostic_values) return aggregated_diagnostic_values.index(best_diagnostic)