Source code for ax.service.utils.report_utils

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from collections import defaultdict
from logging import Logger
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
import plotly.graph_objects as go
from ax.core.abstract_data import AbstractDataFrameData
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRunType
from ax.core.metric import Metric
from ax.core.multi_type_experiment import MultiTypeExperiment
from ax.core.objective import MultiObjective, ScalarizedObjective
from ax.core.search_space import SearchSpace
from ax.core.trial import BaseTrial, Trial
from ax.modelbridge import ModelBridge
from ax.modelbridge.cross_validation import cross_validate
from ax.plot.contour import interact_contour_plotly
from ax.plot.diagnostic import interact_cross_validation_plotly
from ax.plot.slice import plot_slice_plotly
from ax.plot.trace import optimization_trace_single_method_plotly
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import checked_cast, not_none

logger: Logger = get_logger(__name__)

# pyre-ignore[11]: Annotation `go.Figure` is not defined as a type.
def _get_cross_validation_plot(model: ModelBridge) -> go.Figure:
    cv = cross_validate(model)
    return interact_cross_validation_plotly(cv)

def _get_objective_trace_plot(
    experiment: Experiment,
    metric_name: str,
    model_transitions: List[int],
    optimization_direction: Optional[str] = None,
) -> Optional[go.Figure]:
    best_objectives = np.array([experiment.fetch_data().df["mean"]])
    return optimization_trace_single_method_plotly(
        title="Best objective found vs. # of iterations",

def _get_objective_v_param_plot(
    search_space: SearchSpace,
    model: ModelBridge,
    metric_name: str,
    trials: Dict[int, BaseTrial],
) -> Optional[go.Figure]:
    range_params = list(search_space.range_parameters.keys())
    if len(range_params) == 1:
        # individual parameter slice plot
        output_slice_plot = plot_slice_plotly(
                str(t.index): not_none(checked_cast(Trial, t).generator_run)
                for t in trials.values()
        return output_slice_plot
    if len(range_params) > 1:
        # contour plot
        output_contour_plot = interact_contour_plotly(
        return output_contour_plot
    # if search space contains no range params
        "_get_objective_v_param_plot requires a search space with at least one "
        "RangeParameter. Returning None."
    return None

def _get_suffix(input_str: str, delim: str = ".", n_chunks: int = 1) -> str:
    return delim.join(input_str.split(delim)[-n_chunks:])

def _get_shortest_unique_suffix_dict(
    input_str_list: List[str], delim: str = "."
) -> Dict[str, str]:
    """Maps a list of strings to their shortest unique suffixes

    Maps all original strings to the smallest number of chunks, as specified by
    delim, that are not a suffix of any other original string. If the original
    string was a suffix of another string, map it to its unaltered self.

        input_str_list: a list of strings to create the suffix mapping for
        delim: the delimiter used to split up the strings into meaningful chunks

        dict: A dict with the original strings as keys and their abbreviations as

    # all input strings must be unique
    assert len(input_str_list) == len(set(input_str_list))
    if delim == "":
        raise ValueError("delim must be a non-empty string.")
    suffix_dict = defaultdict(list)
    # initialize suffix_dict with last chunk
    for istr in input_str_list:
        suffix_dict[_get_suffix(istr, delim=delim, n_chunks=1)].append(istr)
    max_chunks = max(len(istr.split(delim)) for istr in input_str_list)
    if max_chunks == 1:
        return {istr: istr for istr in input_str_list}
    # the upper range of this loop is `max_chunks + 2` because:
    #     - `i` needs to take the value of `max_chunks`, hence one +1
    #     - the contents of the loop are run one more time to check if `all_unique`,
    #           hence the other +1
    for i in range(2, max_chunks + 2):
        new_dict = defaultdict(list)
        all_unique = True
        for suffix, suffix_str_list in suffix_dict.items():
            if len(suffix_str_list) > 1:
                all_unique = False
                for istr in suffix_str_list:
                    new_dict[_get_suffix(istr, delim=delim, n_chunks=i)].append(istr)
                new_dict[suffix] = suffix_str_list
        if all_unique:
            if len(set(input_str_list)) != len(suffix_dict.keys()):
            return {
                suffix_str_list[0]: suffix
                for suffix, suffix_str_list in suffix_dict.items()
        suffix_dict = new_dict
    # If this function has not yet exited, some input strings still share a suffix.
    # This is not expected, but in this case, the function will return the identity
    # mapping, i.e., a dict with the original strings as both keys and values.
        "Something went wrong. Returning dictionary with original strings as keys and "
    return {istr: istr for istr in input_str_list}

[docs]def get_standard_plots( experiment: Experiment, model: Optional[ModelBridge], model_transitions: Optional[List[int]] = None, ) -> List[go.Figure]: """Extract standard plots for single-objective optimization. Extracts a list of plots from an ``Experiment`` and ``ModelBridge`` of general interest to an Ax user. Currently not supported are - TODO: multi-objective optimization - TODO: ChoiceParameter plots Args: - experiment: The ``Experiment`` from which to obtain standard plots. - model: The ``ModelBridge`` used to suggest trial parameters. - data: If specified, data, to which to fit the model before generating plots. - model_transitions: The arm numbers at which shifts in generation_strategy occur. Returns: - a plot of objective value vs. trial index, to show experiment progression - a plot of objective value vs. range parameter values, only included if the model associated with generation_strategy can create predictions. This consists of: - a plot_slice plot if the search space contains one range parameter - an interact_contour plot if the search space contains multiple range parameters """ objective = not_none(experiment.optimization_config).objective if isinstance(objective, MultiObjective): logger.warning( "get_standard_plots does not currently support MultiObjective " "optimization experiments. Returning an empty list." ) return [] if isinstance(objective, ScalarizedObjective): logger.warning( "get_standard_plots does not currently support ScalarizedObjective " "optimization experiments. Returning an empty list." ) return [] if experiment.fetch_data().df.empty:"Experiment {experiment} does not yet have data, nothing to plot.") return [] output_plot_list = [] output_plot_list.append( _get_objective_trace_plot( experiment=experiment, metric_name=not_none(experiment.optimization_config), model_transitions=model_transitions if model_transitions is not None else [], optimization_direction=( "minimize" if not_none(experiment.optimization_config).objective.minimize else "maximize" ), ) ) # Objective vs. parameter plot requires a `Model`, so add it only if model # is alrady available. In cases where initially custom trials are attached, # model might not yet be set on the generation strategy. if model: # TODO: Check if model can predict in favor of try/catch. try: output_plot_list.append( _get_objective_v_param_plot( search_space=experiment.search_space, model=model, metric_name=not_none( experiment.optimization_config ), trials=experiment.trials, ) ) output_plot_list.append(_get_cross_validation_plot(model)) except NotImplementedError: # Model does not implement `predict` method. pass return [plot for plot in output_plot_list if plot is not None]
def _merge_trials_dict_with_df( df: pd.DataFrame, trials_dict: Dict[int, Any], column_name: str ) -> None: """Add a column ``column_name`` to a DataFrame ``df`` containing a column ``trial_index``. Each value of the new column is given by the element of ``trials_dict`` indexed by ``trial_index``. Args: df: Pandas DataFrame with column ``trial_index``, to be appended with a new column. trials_dict: Dict mapping each ``trial_index`` to a value. The new column of df will be populated with the value corresponding with the ``trial_index`` of each row. column_name: Name of the column to be appended to ``df``. """ if "trial_index" not in df.columns: raise ValueError("df must have trial_index column") if any(trials_dict.values()): # field present for any trial if not all(trials_dict.values()): # not present for all trials logger.warning( f"Column {column_name} missing for some trials. " "Filling with None when missing." ) df[column_name] = [trials_dict[trial_index] for trial_index in df.trial_index] else: logger.warning( f"Column {column_name} missing for all trials. " "Not appending column." ) def _get_generation_method_str(trial: BaseTrial) -> str: generation_methods = { not_none(generator_run._model_key) for generator_run in trial.generator_runs if generator_run._model_key is not None } # add "Manual" if any generator_runs are manual if any( generator_run.generator_run_type == for generator_run in trial.generator_runs ): generation_methods.add("Manual") return ", ".join(generation_methods) if generation_methods else "Unknown" def _merge_results_if_no_duplicates( arms_df: pd.DataFrame, data: AbstractDataFrameData, key_components: List[str], deduplicate_on_map_keys: bool, metrics: List[Metric], ): """Formats ``data.df`` and merges it with ``arms_df`` if all of the following are True: - ``data.df`` is not empty - ``data.df`` contains columns corresponding to ``key_components`` - after any formatting, ``data.df`` contains no duplicates of the column ``results_key_col`` If ``deduplicate_on_map_keys is True``, this function also deduplicates rows of ``data.df``, keeping the row with the largest ``map_keys`` value for each ``key_component * metric_name``. Else, any ``map_keys`` are added to ``results_key_call``. """ results = data.df # pyre-ignore[16]: data has no attribute `map_keys`. map_keys = data.map_keys if hasattr(data, "map_keys") and data.map_keys else [] if len(results.index) == 0: f"No results present for the specified metrics `{metrics}`. " "Returning arm parameters and metadata only." ) return arms_df if not all(col in results.columns for col in key_components): logger.warn( f"At least one of key columns `{key_components}` not present in results df " f"`{results}`. Returning arm parameters and metadata only." ) return arms_df # prepare results for merge key_vals = results[key_components[0]].astype("str") for key_component in key_components[1:]: key_vals += results[key_component].astype("str") results_key_col = "-".join(key_components) # Deduplicate if data has `map_keys` and therefore came from a `MapMetric`, # keeping the row with the largest `map_keys` value for each `results_key_col` if len(map_keys) > 0: if deduplicate_on_map_keys: results = results.sort_values(map_keys).drop_duplicates( key_components + ["metric_name"], keep="last" ) else: for map_key in map_keys: key_vals += results[map_key].astype(str) results_key_col = "-".join(map_keys + key_components) elif deduplicate_on_map_keys: logger.warn( "Ignoring user-specified `deduplicate_on_map_keys = True` since " "`exp.fetch_data().map_keys` is empty or does not exist. Check " "that at least one element of `metrics` (or `exp.metrics` if " "`metrics is None`) inherits from `MapMetric`." ) results[results_key_col] = key_vals # Don't return results if duplicates remain if any(results.duplicated(subset=[results_key_col, "metric_name"])): logger.warn( "Experimental results dataframe contains multiple rows with the same " f"keys {results_key_col}. Returning dataframe without results." ) return arms_df metric_vals = results.pivot( index=results_key_col, columns="metric_name", values="mean" ).reset_index() # dedupe results by key_components metadata = results[key_components + map_keys + [results_key_col]].drop_duplicates() metrics_df = pd.merge(metric_vals, metadata, on=results_key_col) # drop synthetic key column metrics_df = metrics_df.drop(results_key_col, axis=1) # merge and return return pd.merge(metrics_df, arms_df, on=key_components, how="outer")
[docs]def exp_to_df( exp: Experiment, metrics: Optional[List[Metric]] = None, run_metadata_fields: Optional[List[str]] = None, trial_properties_fields: Optional[List[str]] = None, deduplicate_on_map_keys: bool = True, **kwargs: Any, ) -> pd.DataFrame: """Transforms an experiment to a DataFrame with rows keyed by trial_index and arm_name, metrics pivoted into one row. If the pivot results in more than one row per arm (or one row per ``arm * map_keys`` combination if ``map_keys`` are present), results are omitted and warning is produced. Only supports ``Experiment``. Transforms an ``Experiment`` into a ``pd.DataFrame``. Args: exp: An ``Experiment`` that may have pending trials. metrics: Override list of metrics to return. Return all metrics if ``None``. run_metadata_fields: fields to extract from ``trial.run_metadata`` for trial in ``experiment.trials``. If there are multiple arms per trial, these fields will be replicated across the arms of a trial. trial_properties_fields: fields to extract from ``trial._properties`` for trial in ``experiment.trials``. If there are multiple arms per trial, these fields will be replicated across the arms of a trial. Output columns names will be prepended with ``"trial_properties_"``. deduplicate_on_map_keys: Whether each ``trial_index * arm_name * metric`` combination should correspond to one row in the df output by this function. If ``True``, for each such combination, keep the row of maximum ``map_keys`` column(s) values. Note that if ``map_keys`` is a list, its order may affect which row is kept. If ``False``, keep rows for all unique combinations of ``arm * map_keys``. **kwargs: Custom named arguments, useful for passing complex objects from call-site to the `fetch_data` callback. Returns: DataFrame: A dataframe of inputs, metadata and metrics by trial and arm (and ``map_keys``, if present). If no trials are available, returns an empty dataframe. If no metric ouputs are available, returns a dataframe of inputs and metadata. """ # Accept Experiment and SimpleExperiment if isinstance(exp, MultiTypeExperiment): raise ValueError("Cannot transform MultiTypeExperiments to DataFrames.") key_components = ["trial_index", "arm_name"] # Get each trial-arm with parameters arms_df = pd.DataFrame() for trial_index, trial in exp.trials.items(): for arm in trial.arms: arms_df = arms_df.append( {"arm_name":, "trial_index": trial_index, **arm.parameters}, ignore_index=True, ) # Fetch results; in case arms_df is empty, return empty results (legacy behavior) data = exp.fetch_data(metrics, **kwargs) results = data.df if len(arms_df.index) == 0: if len(results.index) != 0: raise ValueError( "exp.fetch_data().df returned more rows than there are experimental " "arms. This is an inconsistent experimental state. Please report to " "Ax support." ) return results # Create key column from key_components arms_df["trial_index"] = arms_df["trial_index"].astype(int) # Add trial status trials = exp.trials.items() trial_to_status = {index: for index, trial in trials} _merge_trials_dict_with_df( df=arms_df, trials_dict=trial_to_status, column_name="trial_status" ) # Add generation_method, accounting for the generic case that generator_runs is of # arbitrary length. Repeated methods within a trial are condensed via `set` and an # empty set will yield "Unknown" as the method. trial_to_generation_method = { trial_index: _get_generation_method_str(trial) for trial_index, trial in trials } _merge_trials_dict_with_df( df=arms_df, trials_dict=trial_to_generation_method, column_name="generation_method", ) # Add any trial properties fields to arms_df if trial_properties_fields is not None: # add trial._properties fields for field in trial_properties_fields: trial_to_properties_field = { trial_index: ( trial._properties[field] if field in trial._properties else None ) for trial_index, trial in trials } _merge_trials_dict_with_df( df=arms_df, trials_dict=trial_to_properties_field, column_name="trial_properties_" + field, ) # Add any run_metadata fields to arms_df if run_metadata_fields is not None: # add run_metadata fields for field in run_metadata_fields: trial_to_metadata_field = { trial_index: ( trial.run_metadata[field] if field in trial.run_metadata else None ) for trial_index, trial in trials } _merge_trials_dict_with_df( df=arms_df, trials_dict=trial_to_metadata_field, column_name=field, ) exp_df = _merge_results_if_no_duplicates( arms_df=arms_df, data=data, key_components=key_components, deduplicate_on_map_keys=deduplicate_on_map_keys, metrics=metrics or list(exp.metrics.values()), ) return not_none(not_none(exp_df).sort_values(["arm_name"]))
[docs]def get_best_trial( exp: Experiment, additional_metrics: Optional[List[Metric]] = None, run_metadata_fields: Optional[List[str]] = None, **kwargs: Any, ) -> Optional[pd.DataFrame]: """Finds the optimal trial given an experiment, based on raw objective value. Returns a 1-row dataframe. Should match the row of ``exp_to_df`` with the best raw objective value, given the same arguments. Args: exp: An Experiment that may have pending trials. additional_metrics: List of metrics to return in addition to the objective metric. Return all metrics if None. run_metadata_fields: fields to extract from trial.run_metadata for trial in experiment.trials. If there are multiple arms per trial, these fields will be replicated across the arms of a trial. **kwargs: Custom named arguments, useful for passing complex objects from call-site to the `fetch_data` callback. Returns: DataFrame: A dataframe of inputs and metrics of the optimal trial. """ objective = not_none(exp.optimization_config).objective if isinstance(objective, MultiObjective): logger.warning( "No best trial is available for `MultiObjective` optimization. " "Returning None for best trial." ) return None if isinstance(objective, ScalarizedObjective): logger.warning( "No best trial is available for `ScalarizedObjective` optimization. " "Returning None for best trial." ) return None if (additional_metrics is not None) and ( objective.metric not in additional_metrics ): additional_metrics.append(objective.metric) trials_df = exp_to_df( exp=exp, metrics=additional_metrics, run_metadata_fields=run_metadata_fields, **kwargs, ) if len(trials_df.index) == 0: logger.warning("`exp_to_df` returned 0 trials. Returning None for best trial.") return None metric_name = minimize = objective.minimize if metric_name not in trials_df.columns: logger.warning( f"`exp_to_df` did not have data for metric {metric_name}. " "Returning None for best trial." ) return None metric_optimum = ( trials_df[metric_name].min() if minimize else trials_df[metric_name].max() ) return pd.DataFrame(trials_df[trials_df[metric_name] == metric_optimum].head(1))