Source code for ax.benchmark.benchmark_result

# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np
from ax.core.experiment import Experiment
from ax.core.map_data import MapData
from ax.service.utils.best_point_mixin import BestPointMixin
from ax.utils.common.base import Base
from ax.utils.common.typeutils import not_none
from numpy import nanmean, nanquantile, ndarray
from pandas import DataFrame
from scipy.stats import sem

# NOTE: Do not add `from __future__ import annotatations` to this file. Adding
# `annotations` postpones evaluation of types and will break FBLearner's usage of
# `BenchmarkResult` as return type annotation, used for serialization and rendering
# in the UI.

PERCENTILES = 0.1, 0.25, 0.5, 0.75, 0.9

[docs]@dataclass(frozen=True, eq=False) class BenchmarkResult(Base): """The result of a single optimization loop from one (BenchmarkProblem, BenchmarkMethod) pair. More information will be added to the BenchmarkResult as the suite develops. """ name: str seed: int experiment: Experiment # Tracks best point if single-objective problem, max hypervolume if MOO optimization_trace: ndarray score_trace: ndarray fit_time: float gen_time: float
[docs] def optimization_trace_by_progression( self, final_progression_only: bool = False ) -> Tuple[ndarray, ndarray]: # (y-values, x-values) if isinstance(self.experiment.lookup_data(), MapData): by_progression_result = BestPointMixin.get_trace_by_progression( experiment=self.experiment, final_progression_only=final_progression_only, ) # tuple of y-values, x-values optimization_trace_by_progression = ( np.array(by_progression_result[0]), np.array(by_progression_result[1]), ) else: # if not MapData, set this to standard optimization_trace # with a default x-values optimization_trace = np.array( BestPointMixin.get_trace( experiment=self.experiment, ) ) optimization_trace_by_progression = ( optimization_trace, np.arange(optimization_trace.shape[0]), ) return optimization_trace_by_progression
[docs] def progression_trace(self) -> ndarray: """Computes progressions used as a function of trials and also the total progression across all trials.""" experiment = self.experiment optimization_config = not_none(experiment.optimization_config) objective = map_data = experiment.lookup_data() if not isinstance(map_data, MapData): raise ValueError("`get_trace_by_progression` requires MapData.") map_df = map_data.map_df # assume the first map_key is progression map_key = map_data.map_keys[0] map_df = map_df[map_df["metric_name"] == objective] map_df = map_df.sort_values(by=["trial_index", map_key]) map_df = map_df.drop_duplicates(MapData.DEDUPLICATE_BY_COLUMNS, keep="last") return map_df[map_key].to_numpy()
[docs] def total_progression(self) -> float: return self.progression_trace().sum()
[docs]@dataclass(frozen=True, eq=False) class AggregatedBenchmarkResult(Base): """The result of a benchmark test, or series of replications. Scalar data present in the BenchmarkResult is here represented as (mean, sem) pairs. More information will be added to the AggregatedBenchmarkResult as the suite develops. """ name: str results: List[BenchmarkResult] # mean, sem, and quartile columns optimization_trace: DataFrame score_trace: DataFrame # (mean, sem) pairs fit_time: List[float] gen_time: List[float] def __str__(self) -> str: return f"{self.__class__}(name={})"
[docs] @classmethod def from_benchmark_results( cls, results: List[BenchmarkResult], ) -> "AggregatedBenchmarkResult": """Aggregrates a list of BenchmarkResults. For various reasons (timeout, errors, etc.) each BenchmarkResult may have a different number of trials; aggregated traces and statistics are computed with and truncated to the minimum trial count to ensure each replication is included. """ # Extract average wall times and standard errors thereof fit_time, gen_time = map( lambda Ts: [nanmean(Ts), float(sem(Ts, ddof=1, nan_policy="propagate"))], zip(*((res.fit_time, res.gen_time) for res in results)), ) # Compute some statistics for each trace trace_stats = {} for name in ("optimization_trace", "score_trace"): step_data = zip( *(getattr(res, name) for res in results), ) stats = _get_stats( step_data=step_data, percentiles=PERCENTILES, progressions=None ) trace_stats[name] = stats # Return aggregated results return cls( name=results[0].name, results=results, fit_time=fit_time, gen_time=gen_time, **{name: DataFrame(stats) for name, stats in trace_stats.items()}, )
[docs] def optimization_trace_by_progression( self, final_progression_only: bool = False ) -> DataFrame: trace_by_progression_results = [ res.optimization_trace_by_progression( final_progression_only=final_progression_only ) for res in self.results ] step_data = zip( *(res[0] for res in trace_by_progression_results), ) progressions = trace_by_progression_results[0][1] stats = _get_stats( step_data=step_data, percentiles=PERCENTILES, progressions=progressions, ) return DataFrame(stats)
[docs] def progression_trace(self) -> DataFrame: progression_traces = zip(*(res.progression_trace() for res in self.results)) stats = _get_stats( step_data=progression_traces, percentiles=PERCENTILES, progressions=None ) return DataFrame(stats)
[docs] def total_progression(self) -> List[float]: total_progressions = [res.total_progression() for res in self.results] return [ nanmean(total_progressions), sem(total_progressions, ddof=1, nan_policy="propagate"), ]
def _get_stats( # pyre-fixme[24]: Generic type `Iterable` expects 1 type parameter. step_data: Iterable, # pyre-fixme[24]: Generic type `tuple` expects at least 1 type parameter. percentiles: Tuple, progressions: Optional[np.ndarray], # pyre-fixme[24]: Generic type `dict` expects 2 type parameters, use `typing.Dict` # to avoid runtime subscripting errors. ) -> Dict: quantiles = [] stats = {"mean": [], "sem": []} if progressions is not None: stats.update({"progression": []}) for i, step_vals in enumerate(step_data): stats["mean"].append(nanmean(step_vals)) stats["sem"].append(sem(step_vals, ddof=1, nan_policy="propagate")) quantiles.append(nanquantile(step_vals, q=percentiles)) if progressions is not None: stats["progression"].append(progressions[i]) stats.update({f"P{100 * p:.0f}": q for p, q in zip(percentiles, zip(*quantiles))}) return stats