Source code for ax.plot.helper

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.

import math
from collections import Counter
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import numpy as np
from ax.core.generator_run import GeneratorRun
from ax.core.observation import ObservationFeatures
from ax.core.parameter import ChoiceParameter, FixedParameter, RangeParameter
from ax.core.types import TParameterization
from ax.modelbridge.base import ModelBridge
from ax.modelbridge.transforms.ivw import IVW
from ax.plot.base import DECIMALS, PlotData, PlotInSampleArm, PlotOutOfSampleArm, Z
from ax.utils.common.logger import get_logger


logger = get_logger(name="PlotHelper")

# Typing alias
RawData = List[Dict[str, Union[str, float]]]

TNullableGeneratorRunsDict = Optional[Dict[str, GeneratorRun]]


def _format_dict(param_dict: TParameterization, name: str = "Parameterization") -> str:
    """Format a dictionary for labels.

    Args:
        param_dict: Dictionary to be formatted
        name: String name of the thing being formatted.

    Returns: stringified blob.
    """
    if len(param_dict) >= 10:
        blob = "{} has too many items to render on hover ({}).".format(
            name, len(param_dict)
        )
    else:
        blob = "<br><em>{}:</em><br>{}".format(
            name, "<br>".join("{}: {}".format(n, v) for n, v in param_dict.items())
        )
    return blob


def _wrap_metric(metric_name: str) -> str:
    """Put a newline on "::" for metric names.

    Args:
        metric_name: metric name.

    Returns: wrapped metric name.
    """
    if "::" in metric_name:
        return "<br>".join(metric_name.split("::"))
    else:
        return metric_name


def _format_CI(estimate: float, sd: float, relative: bool, zval: float = Z) -> str:
    """Format confidence intervals given estimate and standard deviation.

    Args:
        estimate: point estimate.
        sd: standard deviation of point estimate.
        relative: if True, '%' is appended.
        zval: z-value associated with desired CI (e.g. 1.96 for 95% CIs)

    Returns: formatted confidence interval.
    """
    return "[{lb:.{digits}f}{perc}, {ub:.{digits}f}{perc}]".format(
        lb=estimate - zval * sd,
        ub=estimate + zval * sd,
        digits=DECIMALS,
        perc="%" if relative else "",
    )


[docs]def arm_name_to_tuple(arm_name: str) -> Union[Tuple[int, int], Tuple[int]]: tup = arm_name.split("_") if len(tup) == 2: try: return (int(tup[0]), int(tup[1])) except ValueError: return (0,) return (0,)
[docs]def resize_subtitles(figure: Dict[str, Any], size: int): for ant in figure["layout"]["annotations"]: ant["font"].update(size=size) return figure
def _filter_dict( param_dict: TParameterization, subset_keys: List[str] ) -> TParameterization: """Filter a dictionary to keys present in a given list.""" return {k: v for k, v in param_dict.items() if k in subset_keys} def _get_in_sample_arms( model: ModelBridge, metric_names: Set[str], fixed_features: Optional[ObservationFeatures] = None, ) -> Tuple[Dict[str, PlotInSampleArm], RawData, Dict[str, TParameterization]]: """Get in-sample arms from a model with observed and predicted values for specified metrics. Returns a PlotInSampleArm object in which repeated observations are merged with IVW, and a RawData object in which every observation is listed. Fixed features input can be used to override fields of the insample arms when making model predictions. Args: model: An instance of the model bridge. metric_names: Restrict predictions to these metrics. If None, uses all metrics in the model. Returns: A tuple containing - Map from arm name to PlotInSampleArm. - List of the data for each observation like:: {'metric_name': 'likes', 'arm_name': '0_0', 'mean': 1., 'sem': 0.1} - Map from arm name to parameters """ observations = model.get_training_data() # Calculate raw data raw_data = [] cond_name_to_parameters = {} for obs in observations: cond_name_to_parameters[obs.arm_name] = obs.features.parameters for j, metric_name in enumerate(obs.data.metric_names): if metric_name in metric_names: raw_data.append( { "metric_name": metric_name, "arm_name": obs.arm_name, "mean": obs.data.means[j], "sem": np.sqrt(obs.data.covariance[j, j]), } ) # Check that we have one ObservationFeatures per arm name since we # key by arm name and the model is not Multi-task. # If "TrialAsTask" is present, one of the arms is also chosen. if ("TrialAsTask" not in model.transforms.keys()) and ( len(cond_name_to_parameters) != len(observations) ): logger.error( "Have observations of arms with different features but same" " name. Arbitrary one will be plotted." ) # Merge multiple measurements within each Observation with IVW to get # un-modeled prediction t = IVW(None, [], []) obs_data = t.transform_observation_data([obs.data for obs in observations], []) # Start filling in plot data in_sample_plot: Dict[str, PlotInSampleArm] = {} for i, obs in enumerate(observations): if obs.arm_name is None: raise ValueError("Observation must have arm name for plotting.") # Extract raw measurement obs_y = {} obs_se = {} # Use the IVW data, not obs.data for j, metric_name in enumerate(obs_data[i].metric_names): if metric_name in metric_names: obs_y[metric_name] = obs_data[i].means[j] obs_se[metric_name] = np.sqrt(obs_data[i].covariance[j, j]) # Make a prediction. if model.training_in_design[i]: features = obs.features if fixed_features is not None: features.update_features(fixed_features) pred_y, pred_se = _predict_at_point(model, features, metric_names) else: # Use raw data for out-of-design points pred_y = obs_y pred_se = obs_se # pyre-fixme[6]: Expected `str` for 1st param but got `Optional[str]`. in_sample_plot[obs.arm_name] = PlotInSampleArm( # pyre-fixme[6]: Expected `str` for 1st param but got `Optional[str]`. name=obs.arm_name, y=obs_y, se=obs_se, parameters=obs.features.parameters, y_hat=pred_y, se_hat=pred_se, context_stratum=None, ) return in_sample_plot, raw_data, cond_name_to_parameters def _predict_at_point( model: ModelBridge, obsf: ObservationFeatures, metric_names: Set[str] ) -> Tuple[Dict[str, float], Dict[str, float]]: """Make a prediction at a point. Returns mean and standard deviation in format expected by plotting. Args: model: ModelBridge obsf: ObservationFeatures for which to predict metric_names: Limit predictions to these metrics. Returns: A tuple containing - Map from metric name to prediction. - Map from metric name to standard error. """ y_hat = {} se_hat = {} f_pred, cov_pred = model.predict([obsf]) for metric_name in f_pred: if metric_name in metric_names: y_hat[metric_name] = f_pred[metric_name][0] se_hat[metric_name] = np.sqrt(cov_pred[metric_name][metric_name][0]) return y_hat, se_hat def _get_out_of_sample_arms( model: ModelBridge, generator_runs_dict: Dict[str, GeneratorRun], metric_names: Set[str], fixed_features: Optional[ObservationFeatures] = None, ) -> Dict[str, Dict[str, PlotOutOfSampleArm]]: """Get out-of-sample predictions from a model given a dict of generator runs. Fixed features input can be used to override fields of the candidate arms when making model predictions. Args: model: The model. generator_runs_dict: a mapping from generator run name to generator run. metric_names: metrics to include in the plot. Returns: A mapping from name to a mapping from arm name to plot. """ out_of_sample_plot: Dict[str, Dict[str, PlotOutOfSampleArm]] = {} for generator_run_name, generator_run in generator_runs_dict.items(): out_of_sample_plot[generator_run_name] = {} for arm in generator_run.arms: # This assumes context is None obsf = ObservationFeatures.from_arm(arm) if fixed_features is not None: obsf.update_features(fixed_features) # Make a prediction try: pred_y, pred_se = _predict_at_point(model, obsf, metric_names) except Exception: # Check if it is an out-of-design arm. if not model.model_space.check_membership(obsf.parameters): # Skip this point continue else: # It should have worked raise arm_name = arm.name_or_short_signature out_of_sample_plot[generator_run_name][arm_name] = PlotOutOfSampleArm( name=arm_name, parameters=obsf.parameters, y_hat=pred_y, se_hat=pred_se, context_stratum=None, ) return out_of_sample_plot
[docs]def get_plot_data( model: ModelBridge, generator_runs_dict: Dict[str, GeneratorRun], metric_names: Optional[Set[str]] = None, fixed_features: Optional[ObservationFeatures] = None, ) -> Tuple[PlotData, RawData, Dict[str, TParameterization]]: """Format data object with metrics for in-sample and out-of-sample arms. Calculate both observed and predicted metrics for in-sample arms. Calculate predicted metrics for out-of-sample arms passed via the `generator_runs_dict` argument. In PlotData, in-sample observations are merged with IVW. In RawData, they are left un-merged and given as a list of dictionaries, one for each observation and having keys 'arm_name', 'mean', and 'sem'. Args: model: The model. generator_runs_dict: a mapping from generator run name to generator run. metric_names: Restrict predictions to this set. If None, all metrics in the model will be returned. fixed_features: Fixed features to use when making model predictions. Returns: A tuple containing - PlotData object with in-sample and out-of-sample predictions. - List of observations like:: {'metric_name': 'likes', 'arm_name': '0_1', 'mean': 1., 'sem': 0.1}. - Mapping from arm name to parameters. """ metrics_plot = model.metric_names if metric_names is None else metric_names in_sample_plot, raw_data, cond_name_to_parameters = _get_in_sample_arms( model=model, metric_names=metrics_plot, fixed_features=fixed_features ) out_of_sample_plot = _get_out_of_sample_arms( model=model, generator_runs_dict=generator_runs_dict, metric_names=metrics_plot, fixed_features=fixed_features, ) # pyre-fixme[16]: `Optional` has no attribute `arm_name`. status_quo_name = None if model.status_quo is None else model.status_quo.arm_name plot_data = PlotData( metrics=list(metrics_plot), in_sample=in_sample_plot, out_of_sample=out_of_sample_plot, status_quo_name=status_quo_name, ) return plot_data, raw_data, cond_name_to_parameters
[docs]def get_range_parameter(model: ModelBridge, param_name: str) -> RangeParameter: """ Get the range parameter with the given name from the model. Throws if parameter doesn't exist or is not a range parameter. Args: model: The model. param_name: The name of the RangeParameter to be found. Returns: The RangeParameter named `param_name`. """ range_param = model.model_space.parameters.get(param_name) if range_param is None: raise ValueError(f"Parameter `{param_name}` does not exist.") if not isinstance(range_param, RangeParameter): raise ValueError(f"{param_name} is not a RangeParameter") return range_param
[docs]def get_range_parameters(model: ModelBridge) -> List[RangeParameter]: """ Get a list of range parameters from a model. Args: model: The model. Returns: List of RangeParameters. """ return [ parameter for parameter in model.model_space.parameters.values() if isinstance(parameter, RangeParameter) ]
[docs]def get_grid_for_parameter(parameter: RangeParameter, density: int) -> np.ndarray: """Get a grid of points along the range of the parameter. Will be a log-scale grid if parameter is log scale. Args: parameter: Parameter for which to generate grid. density: Number of points in the grid. """ is_log = parameter.log_scale if is_log: grid = np.linspace( np.log10(parameter.lower), np.log10(parameter.upper), density ) grid = 10 ** grid else: grid = np.linspace(parameter.lower, parameter.upper, density) return grid
[docs]def get_fixed_values( model: ModelBridge, slice_values: Optional[Dict[str, Any]] = None ) -> TParameterization: """Get fixed values for parameters in a slice plot. If there is an in-design status quo, those values will be used. Otherwise, the mean of RangeParameters or the mode of ChoiceParameters is used. Any value in slice_values will override the above. Args: model: ModelBridge being used for plotting slice_values: Map from parameter name to value at which is should be fixed. Returns: Map from parameter name to fixed value. """ # Check if status_quo is in design if model.status_quo is not None and model.model_space.check_membership( # pyre-fixme[16]: `Optional` has no attribute `features`. model.status_quo.features.parameters ): setx = model.status_quo.features.parameters else: observations = model.get_training_data() setx = {} for p_name, parameter in model.model_space.parameters.items(): # Exclude out of design status quo (no parameters) vals = [ obs.features.parameters[p_name] for obs in observations if ( len(obs.features.parameters) > 0 and parameter.validate(obs.features.parameters[p_name]) ) ] if isinstance(parameter, FixedParameter): setx[p_name] = parameter.value elif isinstance(parameter, ChoiceParameter): setx[p_name] = Counter(vals).most_common(1)[0][0] elif isinstance(parameter, RangeParameter): setx[p_name] = parameter._cast(np.mean(vals)) if slice_values is not None: # slice_values has type Dictionary[str, Any] setx.update(slice_values) return setx
# Utility methods ported from JS
[docs]def contour_config_to_trace(config): # Load from config arm_data = config["arm_data"] density = config["density"] grid_x = config["grid_x"] grid_y = config["grid_y"] f = config["f"] lower_is_better = config["lower_is_better"] metric = config["metric"] rel = config["rel"] sd = config["sd"] xvar = config["xvar"] yvar = config["yvar"] green_scale = config["green_scale"] green_pink_scale = config["green_pink_scale"] blue_scale = config["blue_scale"] # format data res = relativize_data(f, sd, rel, arm_data, metric) f_final = res[0] sd_final = res[1] # calculate max of abs(outcome), used for colorscale f_absmax = max(abs(min(f_final)), max(f_final)) # transform to nested array f_plt = [] for ind in range(0, len(f_final), density): f_plt.append(f_final[ind : ind + density]) sd_plt = [] for ind in range(0, len(sd_final), density): sd_plt.append(sd_final[ind : ind + density]) CONTOUR_CONFIG = { "autocolorscale": False, "autocontour": True, "contours": {"coloring": "heatmap"}, "hoverinfo": "x+y+z", "ncontours": int(density / 2), "type": "contour", "x": grid_x, "y": grid_y, } if rel: f_scale = reversed(green_pink_scale) if lower_is_better else green_pink_scale else: f_scale = green_scale f_trace = { "colorbar": { "x": 0.45, "y": 0.5, "ticksuffix": "%" if rel else "", "tickfont": {"size": 8}, }, "colorscale": [(i / (len(f_scale) - 1), rgb(v)) for i, v in enumerate(f_scale)], "xaxis": "x", "yaxis": "y", "z": f_plt, # zmax and zmin are ignored if zauto is true "zauto": not rel, "zmax": f_absmax, "zmin": -f_absmax, } sd_trace = { "colorbar": { "x": 1, "y": 0.5, "ticksuffix": "%" if rel else "", "tickfont": {"size": 8}, }, "colorscale": [ (i / (len(blue_scale) - 1), rgb(v)) for i, v in enumerate(blue_scale) ], "xaxis": "x2", "yaxis": "y2", "z": sd_plt, } f_trace.update(CONTOUR_CONFIG) sd_trace.update(CONTOUR_CONFIG) # get in-sample arms arm_text = list(arm_data["in_sample"].keys()) arm_x = [ arm_data["in_sample"][arm_name]["parameters"][xvar] for arm_name in arm_text ] arm_y = [ arm_data["in_sample"][arm_name]["parameters"][yvar] for arm_name in arm_text ] # configs for in-sample arms base_in_sample_arm_config = { "hoverinfo": "text", "legendgroup": "In-sample", "marker": {"color": "black", "symbol": 1, "opacity": 0.5}, "mode": "markers", "name": "In-sample", "text": arm_text, "type": "scatter", "x": arm_x, "y": arm_y, } f_in_sample_arm_trace = {"xaxis": "x", "yaxis": "y"} sd_in_sample_arm_trace = {"showlegend": False, "xaxis": "x2", "yaxis": "y2"} f_in_sample_arm_trace.update(base_in_sample_arm_config) sd_in_sample_arm_trace.update(base_in_sample_arm_config) traces = [f_trace, sd_trace, f_in_sample_arm_trace, sd_in_sample_arm_trace] # iterate over out-of-sample arms for i, generator_run_name in enumerate(arm_data["out_of_sample"].keys()): symbol = i + 2 # symbols starts from 2 for candidate markers ax = [] ay = [] atext = [] for arm_name in arm_data["out_of_sample"][generator_run_name].keys(): ax.append( arm_data["out_of_sample"][generator_run_name][arm_name]["parameters"][ xvar ] ) ay.append( arm_data["out_of_sample"][generator_run_name][arm_name]["parameters"][ yvar ] ) atext.append("<em>Candidate " + arm_name + "</em>") traces.append( { "hoverinfo": "text", "legendgroup": generator_run_name, "marker": {"color": "black", "symbol": symbol, "opacity": 0.5}, "mode": "markers", "name": generator_run_name, "text": atext, "type": "scatter", "xaxis": "x", "x": ax, "yaxis": "y", "y": ay, } ) traces.append( { "hoverinfo": "text", "legendgroup": generator_run_name, "marker": {"color": "black", "symbol": symbol, "opacity": 0.5}, "mode": "markers", "name": "In-sample", "showlegend": False, "text": atext, "type": "scatter", "x": ax, "xaxis": "x2", "y": ay, "yaxis": "y2", } ) return traces
[docs]def axis_range(grid: List[float], is_log: bool) -> List[float]: if is_log: return [math.log10(min(grid)), math.log10(max(grid))] else: return [min(grid), max(grid)]
def _relativize(m_t: float, sem_t: float, m_c: float, sem_c: float) -> List[float]: r_hat = (m_t - m_c) / abs(m_c) - sem_c ** 2 * m_t / abs(m_c) ** 3 variance = (sem_t ** 2 + (m_t / m_c * sem_c) ** 2) / m_c ** 2 return [r_hat, math.sqrt(variance)]
[docs]def relativize_data( f: List[float], sd: List[float], rel: bool, arm_data: Dict[Any, Any], metric: str ) -> List[List[float]]: # if relative, extract status quo & compute ratio f_final = [] if rel else f sd_final = [] if rel else sd if rel: f_sq = arm_data["in_sample"][arm_data["status_quo_name"]]["y"][metric] sd_sq = arm_data["in_sample"][arm_data["status_quo_name"]]["se"][metric] for i in range(len(f)): res = _relativize(f[i], sd[i], f_sq, sd_sq) f_final.append(100 * res[0]) sd_final.append(100 * res[1]) return [f_final, sd_final]
[docs]def rgb(arr: List[int]) -> str: return "rgb({},{},{})".format(*arr)
[docs]def slice_config_to_trace( arm_data, arm_name_to_parameters, f, fit_data, grid, metric, param, rel, setx, sd, is_log, visible, ): # format data res = relativize_data(f, sd, rel, arm_data, metric) f_final = res[0] sd_final = res[1] # get data for standard deviation fill plot sd_upper = [] sd_lower = [] for i in range(len(sd)): sd_upper.append(f_final[i] + 2 * sd_final[i]) sd_lower.append(f_final[i] - 2 * sd_final[i]) grid_rev = list(reversed(grid)) sd_lower_rev = list(reversed(sd_lower)) sd_x = grid + grid_rev sd_y = sd_upper + sd_lower_rev # get data for observed arms and error bars arm_x = [] arm_y = [] arm_sem = [] for row in fit_data: parameters = arm_name_to_parameters[row["arm_name"]] plot = True for p in setx.keys(): if p != param and parameters[p] != setx[p]: plot = False if plot: arm_x.append(parameters[param]) arm_y.append(row["mean"]) arm_sem.append(row["sem"]) arm_res = relativize_data(arm_y, arm_sem, rel, arm_data, metric) arm_y_final = arm_res[0] arm_sem_final = [x * 2 for x in arm_res[1]] # create traces f_trace = { "x": grid, "y": f_final, "showlegend": False, "hoverinfo": "x+y", "line": {"color": "rgba(128, 177, 211, 1)"}, "visible": visible, } arms_trace = { "x": arm_x, "y": arm_y_final, "mode": "markers", "error_y": { "type": "data", "array": arm_sem_final, "visible": True, "color": "black", }, "line": {"color": "black"}, "showlegend": False, "hoverinfo": "x+y", "visible": visible, } sd_trace = { "x": sd_x, "y": sd_y, "fill": "toself", "fillcolor": "rgba(128, 177, 211, 0.2)", "line": {"color": "transparent"}, "showlegend": False, "hoverinfo": "none", "visible": visible, } traces = [sd_trace, f_trace, arms_trace] # iterate over out-of-sample arms for i, generator_run_name in enumerate(arm_data["out_of_sample"].keys()): ax = [] ay = [] asem = [] atext = [] for arm_name in arm_data["out_of_sample"][generator_run_name].keys(): parameters = arm_data["out_of_sample"][generator_run_name][arm_name][ "parameters" ] plot = True for p in setx.keys(): if p != param and parameters[p] != setx[p]: plot = False if plot: ax.append(parameters[param]) ay.append( arm_data["out_of_sample"][generator_run_name][arm_name]["y_hat"][ metric ] ) asem.append( arm_data["out_of_sample"][generator_run_name][arm_name]["se_hat"][ metric ] ) atext.append("<em>Candidate " + arm_name + "</em>") out_of_sample_arm_res = relativize_data(ay, asem, rel, arm_data, metric) ay_final = out_of_sample_arm_res[0] asem_final = [x * 2 for x in out_of_sample_arm_res[1]] traces.append( { "hoverinfo": "text", "legendgroup": generator_run_name, "marker": {"color": "black", "symbol": i + 1, "opacity": 0.5}, "mode": "markers", "error_y": { "type": "data", "array": asem_final, "visible": True, "color": "black", }, "name": generator_run_name, "text": atext, "type": "scatter", "xaxis": "x", "x": ax, "yaxis": "y", "y": ay_final, "visible": visible, } ) return traces