Source code for ax.core.metric

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from __future__ import annotations

from dataclasses import dataclass
from logging import Logger

from typing import (
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Tuple,
    Type,
    TYPE_CHECKING,
)

from ax.core.data import Data
from ax.utils.common.base import SortableBase
from ax.utils.common.logger import get_logger
from ax.utils.common.result import Err, Ok, Result
from ax.utils.common.serialization import SerializationMixin

if TYPE_CHECKING:  # pragma: no cover
    # import as module to make sphinx-autodoc-typehints happy
    from ax import core  # noqa F401


[docs]@dataclass(frozen=True) class MetricFetchE: message: str exception: Optional[Exception]
MetricFetchResult = Result[Data, MetricFetchE] logger: Logger = get_logger(__name__)
[docs]class Metric(SortableBase, SerializationMixin): """Base class for representing metrics. The `fetch_trial_data` method is the essential method to override when subclassing, which specifies how to retrieve a Metric, for a given trial. A Metric must return a Data object, which requires (at minimum) the following: https://ax.dev/api/_modules/ax/core/data.html#Data.required_columns Attributes: lower_is_better: Flag for metrics which should be minimized. properties: Properties specific to a particular metric. """ data_constructor: Type[Data] = Data def __init__( self, name: str, lower_is_better: Optional[bool] = None, properties: Optional[Dict[str, Any]] = None, ) -> None: """Inits Metric. Args: name: Name of metric. lower_is_better: Flag for metrics which should be minimized. properties: Dictionary of this metric's properties """ self._name = name self.lower_is_better = lower_is_better # pyre-fixme[4]: Attribute must be annotated. self.properties = properties or {}
[docs] @classmethod def is_available_while_running(cls) -> bool: """Whether metrics of this class are available while the trial is running. Metrics that are not available while the trial is running are assumed to be available only upon trial completion. For such metrics, data is assumed to never change once the trial is completed. NOTE: If this method returns `False`, data-fetching via `experiment.fetch_data` will return the data cached on the experiment (for the metrics of the given class) whenever its available. Data is cached on experiment when attached via `experiment.attach_data`. """ return False
@property def name(self) -> str: """Get name of metric.""" return self._name @property def fetch_multi_group_by_metric(self) -> Type[Metric]: """Metric class, with which to group this metric in `Experiment._metrics_by_class`, which is used to combine metrics on experiment into groups and then fetch their data via `Metric.fetch_trial_data_multi` for each group. NOTE: By default, this property will just return the class on which it is defined; however, in some cases it is useful to group metrics by their superclass, in which case this property should return that superclass. """ return self.__class__
[docs] def fetch_trial_data( self, trial: core.base_trial.BaseTrial, **kwargs: Any ) -> MetricFetchResult: """Fetch data for one trial.""" raise NotImplementedError( f"Metric {self.name} does not implement data-fetching logic." ) # pragma: no cover
[docs] def fetch_experiment_data( self, experiment: core.experiment.Experiment, **kwargs: Any ) -> Dict[int, MetricFetchResult]: """Fetch this metric's data for an experiment. Returns Dict of trial_index => Result """ return { trial.index: self.fetch_trial_data(trial=trial, **kwargs) for trial in experiment.trials.values() if trial.status.expecting_data }
[docs] @classmethod def fetch_trial_data_multi( cls, trial: core.base_trial.BaseTrial, metrics: Iterable[Metric], **kwargs: Any ) -> Dict[str, MetricFetchResult]: """Fetch multiple metrics data for one trial. Returns Dict of metric_name => Result Default behavior calls `fetch_trial_data` for each metric. Subclasses should override this to trial data computation for multiple metrics. """ return { metric.name: metric.fetch_trial_data(trial=trial, **kwargs) for metric in metrics }
[docs] @classmethod def fetch_experiment_data_multi( cls, experiment: core.experiment.Experiment, metrics: Iterable[Metric], trials: Optional[Iterable[core.base_trial.BaseTrial]] = None, **kwargs: Any, ) -> Dict[int, Dict[str, MetricFetchResult]]: """Fetch multiple metrics data for an experiment. Returns Dict of trial_index => (metric_name => Result) Default behavior calls `fetch_trial_data_multi` for each trial. Subclasses should override to batch data computation across trials + metrics. """ return { trial.index: cls.fetch_trial_data_multi( trial=trial, metrics=metrics, **kwargs ) for trial in (trials if trials is not None else experiment.trials.values()) if trial.status.expecting_data } return cls.data_constructor.from_multiple_data( [ cls.fetch_trial_data_multi(trial, metrics, **kwargs) if trial.status.expecting_data else cls.data_constructor() for trial in (experiment.trials.values() if trials is None else trials) ] )
[docs] @classmethod def lookup_or_fetch_experiment_data_multi( cls, experiment: core.experiment.Experiment, metrics: Iterable[Metric], trials: Optional[Iterable[core.base_trial.BaseTrial]] = None, **kwargs: Any, ) -> Tuple[Dict[int, Dict[str, MetricFetchResult]], bool]: """Fetch or lookup (with fallback to fetching) data for given metrics, depending on whether they are available while running. Return a tuple containing the data, along with a boolean that will be True if new data was fetched, and False if all data was looked up from cache. If metric is available while running, its data can change (and therefore we should always re-fetch it). If metric is available only upon trial completion, its data does not change, so we can look up that data on the experiment and only fetch the data that is not already attached to the experiment. NOTE: If fetching data for a metrics class that is only available upon trial completion, data fetched in this function (data that was not yet available on experiment) will be attached to experiment. """ # If this metric is available while trial is running, just default to # `fetch_experiment_data_multi`. if cls.is_available_while_running(): fetched_data = cls.fetch_experiment_data_multi( experiment=experiment, metrics=metrics, trials=trials, **kwargs ) return fetched_data, True # If this metric is available only upon trial completion, look up data # on experiment and only fetch data that is not already cached. if trials is None: completed_trials = experiment.trials_by_status[ core.base_trial.TrialStatus.COMPLETED ] else: completed_trials = [t for t in trials if t.status.is_completed] if not completed_trials: return {}, False trials_results = {} contains_new_data = False for trial in completed_trials: cached_trial_data = experiment.lookup_data_for_trial( trial_index=trial.index, )[0] cached_metric_names = cached_trial_data.metric_names metrics_to_fetch = [m for m in metrics if m.name not in cached_metric_names] if not metrics_to_fetch: # If all needed data fetched from cache, no need to fetch any other data # for trial. trials_results[trial.index] = cls._wrap_trial_data_multi( data=cached_trial_data ) continue try: fetched_trial_data = cls.fetch_experiment_data_multi( experiment=experiment, metrics=metrics_to_fetch, trials=[trial], **kwargs, )[trial.index] contains_new_data = True except NotImplementedError: # Metric does not implement fetching logic and only uses lookup. fetched_trial_data = {} trials_results[trial.index] = { **cls._wrap_trial_data_multi(data=cached_trial_data), **fetched_trial_data, } return ( { trial_index: { metric_name: results for metric_name, results in results_by_metric_name.items() if metric_name in [metric.name for metric in metrics] } for trial_index, results_by_metric_name in trials_results.items() }, contains_new_data, )
[docs] def clone(self) -> Metric: """Create a copy of this Metric.""" cls = type(self) return cls( **cls.deserialize_init_args(args=cls.serialize_init_args(obj=self)), )
def __repr__(self) -> str: return "{class_name}('{metric_name}')".format( class_name=self.__class__.__name__, metric_name=self.name ) @property def _unique_id(self) -> str: return str(self) @classmethod def _unwrap_experiment_data(cls, results: Mapping[int, MetricFetchResult]) -> Data: oks: List[Ok[Data, MetricFetchE]] = [ result for result in results.values() if isinstance(result, Ok) ] if len(oks) < len(results): errs: List[Err[Data, MetricFetchE]] = [ result for result in results.values() if isinstance(result, Err) ] # TODO[mpolson64] Raise all errors in a group via PEP 654 exceptions = [ err.err.exception if err.err.exception is not None else Exception(err.err.message) for err in errs ] raise exceptions[0] if len(exceptions) == 1 else Exception(exceptions) data = [ok.ok for ok in oks] return ( cls.data_constructor.from_multiple_data(data=data) if len(data) > 0 else cls.data_constructor() ) @classmethod def _unwrap_trial_data_multi( cls, results: Mapping[str, MetricFetchResult], # TODO[mpolson64] Add critical_metric_names to other unwrap methods critical_metric_names: Optional[Iterable[str]] = None, ) -> Data: oks: List[Ok[Data, MetricFetchE]] = [ result for result in results.values() if isinstance(result, Ok) ] if len(oks) < len(results): # If no critical_metric_names supplied all metrics to be treated as # critical critical_metric_names = critical_metric_names or results.keys() # Noncritical Errs should be brought to the user's attention via warnings # but not raise an Exception noncritical_errs: List[Err[Data, MetricFetchE]] = [ result for metric_name, result in results.items() if isinstance(result, Err) and metric_name in critical_metric_names ] for err in noncritical_errs: logger.warning( f"Err encountered while unwrapping MetricFetchResults: {err.err}. " "Metric is not marked critical, ignoring for now." ) critical_errs: List[Err[Data, MetricFetchE]] = [ result for metric_name, result in results.items() if isinstance(result, Err) and metric_name in critical_metric_names ] if len(critical_errs) > 0: # TODO[mpolson64] Raise all errors in a group via PEP 654 exceptions = [ err.err.exception if err.err.exception is not None else Exception(err.err.message) for err in critical_errs ] raise exceptions[0] if len(exceptions) == 1 else Exception(exceptions) data = [ok.ok for ok in oks] return ( cls.data_constructor.from_multiple_data(data=data) if len(data) > 0 else cls.data_constructor() ) @classmethod def _unwrap_experiment_data_multi( cls, results: Mapping[int, Mapping[str, MetricFetchResult]], ) -> Data: flattened = [ result for sublist in results.values() for result in sublist.values() ] oks: List[Ok[Data, MetricFetchE]] = [ result for result in flattened if isinstance(result, Ok) ] if len(oks) < len(flattened): errs: List[Err[Data, MetricFetchE]] = [ result for result in flattened if isinstance(result, Err) ] # TODO[mpolson64] Raise all errors in a group via PEP 654 exceptions = [ err.err.exception if err.err.exception is not None else Exception(err.err.message) for err in errs ] raise exceptions[0] if len(exceptions) == 1 else Exception(exceptions) data = [ok.ok for ok in oks] return ( cls.data_constructor.from_multiple_data(data=data) if len(data) > 0 else cls.data_constructor() ) @classmethod def _wrap_experiment_data(cls, data: Data) -> Dict[int, MetricFetchResult]: return { trial_index: Ok( value=Data( df=data.true_df.loc[data.true_df["trial_index"] == trial_index] ) ) for trial_index in data.true_df["trial_index"] } @classmethod def _wrap_trial_data_multi(cls, data: Data) -> Dict[str, MetricFetchResult]: return { metric_name: Ok( value=Data( df=data.true_df.loc[data.true_df["metric_name"] == metric_name] ) ) for metric_name in data.true_df["metric_name"] } @classmethod def _wrap_experiment_data_multi( cls, data: Data ) -> Dict[int, Dict[str, MetricFetchResult]]: # pyre-fixme[7] return { trial_index: { metric_name: Ok( value=Data( df=data.true_df.loc[ (data.true_df["trial_index"] == trial_index) & (data.true_df["metric_name"] == metric_name) ] ) ) for metric_name in data.true_df["metric_name"] } for trial_index in data.true_df["trial_index"] }