Source code for ax.early_stopping.utils
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from collections import defaultdict
from logging import Logger
from typing import Dict, List, Optional, Tuple
import pandas as pd
from ax.core.base_trial import TrialStatus
from ax.core.experiment import Experiment
from ax.core.map_data import MapData
from ax.utils.common.logger import get_logger
from pyre_extensions import assert_is_instance
logger: Logger = get_logger(__name__)
[docs]def align_partial_results(
df: pd.DataFrame,
progr_key: str, # progression key
metrics: List[str],
interpolation: str = "slinear",
do_forward_fill: bool = False,
# TODO: Allow normalizing progr_key (e.g. subtract min time stamp)
) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
"""Helper function to align partial results with heterogeneous index
Args:
df: The DataFrame containing the raw data (in long format).
progr_key: The key of the column indexing progression (such as
the number of training examples, timestamps, etc.).
metrics: The names of the metrics to consider.
interpolation: The interpolation method used to fill missing values
(if applicable). See `pandas.DataFrame.interpolate` for
available options. Limit area is `inside`.
forward_fill: If True, performs a forward fill after interpolation.
This is useful for scalarizing learning curves when some data
is missing. For instance, suppose we obtain a curve for task_1
for progression in [a, b] and task_2 for progression in [c, d]
where b < c. Performing the forward fill on task_1 is a possible
solution.
Returns:
A two-tuple containing a dict mapping the provided metric names to the
index-normalized and interpolated mean (sem).
"""
missing_metrics = set(metrics) - set(df["metric_name"])
if missing_metrics:
raise ValueError(f"Metrics {missing_metrics} not found in input dataframe")
# select relevant metrics
df = df[df["metric_name"].isin(metrics)]
# log some information about raw data
for m in metrics:
df_m = df[df["metric_name"] == m]
if len(df_m) > 0:
logger.debug(
f"Metric {m} raw data has observations from "
f"{df_m[progr_key].min()} to {df_m[progr_key].max()}."
)
else:
logger.info(f"No data from metric {m} yet.")
# drop arm names (assumes 1:1 map between trial indices and arm names)
df = df.drop("arm_name", axis=1)
# remove duplicates (same trial, metric, progr_key), which can happen
# if the same progression is erroneously reported more than once
df = df.drop_duplicates(
subset=["trial_index", "metric_name", progr_key], keep="first"
)
# set multi-index over trial, metric, and progression key
df = df.set_index(["trial_index", "metric_name", progr_key])
# sort index
df = df.sort_index()
# drop sem if all NaN (assumes presence of sem column)
has_sem = not df["sem"].isnull().all()
if not has_sem:
df = df.drop("sem", axis=1)
# create the common index that every map result will be re-indexed w.r.t.
index_union = df.index.levels[2].unique()
# loop through (trial, metric) combos and align data
dfs_mean = defaultdict(list)
dfs_sem = defaultdict(list)
for tidx in df.index.levels[0]: # this could be slow if there are many trials
for metric in df.index.levels[1]:
# grab trial+metric sub-df and reindex to common index
df_ridx = df.loc[(tidx, metric)].reindex(index_union)
# interpolate / fill missing results
# TODO: Allow passing of additional kwargs to `interpolate`
# TODO: Allow using an arbitrary prediction model for this instead
try:
df_interp = df_ridx.interpolate(
method=interpolation, limit_area="inside"
)
if do_forward_fill:
# do forward fill (with valid observations) to handle instances
# where one task only has data for early progressions
df_interp = df_interp.fillna(method="pad")
except ValueError as e:
df_interp = df_ridx
logger.info(
f"Got exception `{e}` during interpolation. "
"Using uninterpolated values instead."
)
# renaming column to trial index, append results
dfs_mean[metric].append(df_interp["mean"].rename(tidx))
if has_sem:
dfs_sem[metric].append(df_interp["sem"].rename(tidx))
# combine results into output dataframes
dfs_mean = {metric: pd.concat(dfs, axis=1) for metric, dfs in dfs_mean.items()}
dfs_sem = {metric: pd.concat(dfs, axis=1) for metric, dfs in dfs_sem.items()}
return dfs_mean, dfs_sem
[docs]def estimate_early_stopping_savings(
experiment: Experiment,
map_key: Optional[str] = None,
) -> float:
"""Estimate resource savings due to early stopping by considering
COMPLETED and EARLY_STOPPED trials. First, use the mean of final
progressions of the set completed trials as a benchmark for the
length of a single trial. The savings is then estimated as:
resource_savings =
1 - actual_resource_usage / (num_trials * length of single trial)
Args:
experiment: The experiment.
map_key: The map_key to use when computing resource savings.
Returns:
The estimated resource savings as a fraction of total resource usage (i.e.
0.11 estimated savings indicates we would expect the experiment to have used 11%
more resources without early stopping present).
"""
map_data = assert_is_instance(experiment.lookup_data(), MapData)
# If no map_key is provided, use some arbitrary map_key in the experiment's MapData
if map_key is not None:
step_key = map_key
elif len(map_data.map_key_infos) > 0:
step_key = map_data.map_key_infos[0].key
else:
return 0
# Get final number of steps of each trial
trial_resources = (
map_data.map_df[["trial_index", step_key]]
.groupby("trial_index")
.max()
.reset_index()
)
early_stopped_trial_idcs = experiment.trial_indices_by_status[
TrialStatus.EARLY_STOPPED
]
completed_trial_idcs = experiment.trial_indices_by_status[TrialStatus.COMPLETED]
# Assume that any early stopped trial would have had the mean number of steps of
# the completed trials
mean_completed_trial_resources = trial_resources[
trial_resources["trial_index"].isin(completed_trial_idcs)
][step_key].mean()
# Calculate the steps saved per early stopped trial. If savings are estimated to be
# negative assume no savings
stopped_trial_resources = trial_resources[
trial_resources["trial_index"].isin(early_stopped_trial_idcs)
][step_key]
saved_trial_resources = (
mean_completed_trial_resources - stopped_trial_resources
).clip(0)
# Return the ratio of the total saved resources over the total resources used plus
# the total saved resources
return saved_trial_resources.sum() / trial_resources[step_key].sum()