Source code for ax.service.utils.instantiation

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import Dict, List, Optional, Union, cast

import numpy as np
from ax.core.arm import Arm
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.metric import Metric
from ax.core.objective import Objective
from ax.core.optimization_config import OptimizationConfig
from ax.core.outcome_constraint import OutcomeConstraint
from ax.core.parameter import (
    PARAMETER_PYTHON_TYPE_MAP,
    ChoiceParameter,
    FixedParameter,
    Parameter,
    ParameterType,
    RangeParameter,
    TParameterType,
)
from ax.core.parameter_constraint import OrderConstraint, ParameterConstraint
from ax.core.search_space import SearchSpace
from ax.core.simple_experiment import DEFAULT_OBJECTIVE_NAME
from ax.core.types import (
    ComparisonOp,
    TEvaluationOutcome,
    TFidelityTrialEvaluation,
    TParameterization,
    TParamValue,
    TTrialEvaluation,
)
from ax.utils.common.logger import get_logger
from ax.utils.common.typeutils import (
    checked_cast,
    checked_cast_to_tuple,
    not_none,
    numpy_type_to_python_type,
)


logger = get_logger(__name__)


"""Utilities for RESTful-like instantiation of Ax classes needed in AxClient."""


TParameterRepresentation = Dict[str, Union[TParamValue, List[TParamValue]]]
PARAM_CLASSES = ["range", "choice", "fixed"]
PARAM_TYPES = {"int": int, "float": float, "bool": bool, "str": str}
COMPARISON_OPS = {"<=": ComparisonOp.LEQ, ">=": ComparisonOp.GEQ}
EXPECTED_KEYS_IN_PARAM_REPR = {
    "name",
    "type",
    "values",
    "bounds",
    "value",
    "value_type",
    "log_scale",
    "target_value",
    "is_fidelity",
    "is_ordered",
    "is_task",
}


def _get_parameter_type(python_type: TParameterType) -> ParameterType:
    for param_type, py_type in PARAMETER_PYTHON_TYPE_MAP.items():
        if py_type is python_type:
            return param_type
    raise ValueError(f"No AE parameter type corresponding to {python_type}.")


def _to_parameter_type(
    vals: List[TParamValue], typ: Optional[str], param_name: str, field_name: str
) -> ParameterType:
    if typ is None:
        typ = type(not_none(vals[0]))
        parameter_type = _get_parameter_type(typ)  # pyre-ignore[6]
        assert all(isinstance(x, typ) for x in vals), (
            f"Values in `{field_name}` not of the same type and no `value_type` was "
            f"explicitly specified; cannot infer value type for parameter {param_name}."
        )
        logger.info(
            f"Inferred value type of {parameter_type} for parameter {param_name}. "
            "If that is not the expected value type, you can explicity specify "
            "'value_type' ('int', 'float', 'bool' or 'str') in parameter dict."
        )
        return parameter_type
    return _get_parameter_type(PARAM_TYPES[typ])  # pyre-ignore[6]


def _make_range_param(
    name: str, representation: TParameterRepresentation, parameter_type: Optional[str]
) -> RangeParameter:
    assert "bounds" in representation, "Bounds are required for range parameters."
    bounds = representation["bounds"]
    assert isinstance(bounds, list) and len(bounds) == 2, (
        f"Cannot parse parameter {name}: for range parameters, json representation "
        "should include a list of two values, lower and upper bounds of the range."
    )
    return RangeParameter(
        name=name,
        parameter_type=_to_parameter_type(bounds, parameter_type, name, "bounds"),
        lower=checked_cast_to_tuple((float, int), bounds[0]),
        upper=checked_cast_to_tuple((float, int), bounds[1]),
        log_scale=checked_cast(bool, representation.get("log_scale", False)),
        is_fidelity=checked_cast(bool, representation.get("is_fidelity", False)),
        target_value=representation.get("target_value", None),  # pyre-ignore[6]
    )


def _make_choice_param(
    name: str, representation: TParameterRepresentation, parameter_type: Optional[str]
) -> ChoiceParameter:
    assert "values" in representation, "Values are required for choice parameters."
    values = representation["values"]
    assert isinstance(values, list) and len(values) > 1, (
        f"Cannot parse parameter {name}: for choice parameters, json representation"
        " should include a list of two or more values."
    )
    return ChoiceParameter(
        name=name,
        parameter_type=_to_parameter_type(values, parameter_type, name, "values"),
        values=values,
        is_ordered=checked_cast(bool, representation.get("is_ordered", False)),
        is_fidelity=checked_cast(bool, representation.get("is_fidelity", False)),
        is_task=checked_cast(bool, representation.get("is_task", False)),
        target_value=representation.get("target_value", None),  # pyre-ignore[6]
    )


def _make_fixed_param(
    name: str, representation: Dict[str, TParamValue], parameter_type: Optional[str]
) -> FixedParameter:
    assert "value" in representation, "Value is required for fixed parameters."
    value = representation["value"]
    assert type(value) in PARAM_TYPES.values(), (
        f"Cannot parse fixed parameter {name}: for fixed parameters, json "
        "representation should include a single value."
    )
    return FixedParameter(
        name=name,
        parameter_type=_get_parameter_type(type(value))  # pyre-ignore[6]
        if parameter_type is None
        else _get_parameter_type(PARAM_TYPES[parameter_type]),  # pyre-ignore[6]
        value=value,
        is_fidelity=checked_cast(bool, representation.get("is_fidelity", False)),
        target_value=representation.get("target_value", None),
    )


[docs]def parameter_from_json( representation: Dict[str, Union[TParamValue, List[TParamValue]]] ) -> Parameter: """Instantiate a parameter from JSON representation.""" if "parameter_type" in representation: raise ValueError( "'parameter_type' is not an expected key in parameter dictionary. " "If you are looking to specify the type of values that this " "parameter should take, use 'value_type' (expects 'int', 'float', " "'str' or 'bool')." ) unexpected_keys = set(representation.keys()) - EXPECTED_KEYS_IN_PARAM_REPR if unexpected_keys: raise ValueError( f"Unexpected keys {unexpected_keys} in parameter representation." f"Exhaustive set of expected keys: {EXPECTED_KEYS_IN_PARAM_REPR}." ) name = representation["name"] assert isinstance(name, str), "Parameter name must be a string." parameter_class = representation["type"] assert ( isinstance(parameter_class, str) and parameter_class in PARAM_CLASSES ), "Type in parameter JSON representation must be `range`, `choice`, or `fixed`." parameter_type = representation.get("value_type", None) if parameter_type is not None: assert isinstance(parameter_type, str) and parameter_type in PARAM_TYPES, ( "Value type in parameter JSON representation must be 'int', 'float', " "'bool' or 'str'." ) if parameter_class == "range": return _make_range_param( name=name, representation=representation, parameter_type=parameter_type ) if parameter_class == "choice": return _make_choice_param( name=name, representation=representation, parameter_type=parameter_type ) if parameter_class == "fixed": assert not any(isinstance(val, list) for val in representation.values()) return _make_fixed_param( name=name, representation=cast(Dict[str, TParamValue], representation), parameter_type=parameter_type, ) else: raise ValueError( # pragma: no cover (this is unreachable) f"Unrecognized parameter type {parameter_class}." )
[docs]def constraint_from_str( representation: str, parameters: Dict[str, Parameter] ) -> ParameterConstraint: """Parse string representation of a parameter constraint.""" tokens = representation.split() parameter_names = parameters.keys() order_const = len(tokens) == 3 and tokens[1] in COMPARISON_OPS sum_const = ( len(tokens) >= 5 and len(tokens) % 2 == 1 and tokens[-2] in COMPARISON_OPS ) if not (order_const or sum_const): raise ValueError( "Parameter constraint should be of form <parameter_name> >= " "<other_parameter_name> for order constraints or `<parameter_name> " "+ <other_parameter_name> >= x, where any number of terms can be " "added and `x` is a float bound. Acceptable comparison operators " 'are ">=" and "<=".' ) if len(tokens) == 3: # Case "x1 >= x2" => order constraint. left, right = tokens[0], tokens[2] assert left in parameter_names, f"Parameter {left} not in {parameter_names}." assert right in parameter_names, f"Parameter {right} not in {parameter_names}." return ( OrderConstraint( lower_parameter=parameters[left], upper_parameter=parameters[right] ) if COMPARISON_OPS[tokens[1]] is ComparisonOp.LEQ else OrderConstraint( lower_parameter=parameters[right], upper_parameter=parameters[left] ) ) try: # Case "x1 - 2*x2 + x3 >= 2" => parameter constraint. bound = float(tokens[-1]) except ValueError: raise ValueError(f"Bound for the constraint must be a number; got {tokens[-1]}") if any(token[0] == "*" or token[-1] == "*" for token in tokens): raise ValueError( "A linear constraint should be the form a*x + b*y - c*z <= d" ", where a,b,c,d are float constants and x,y,z are parameters. " "There should be no space in each term around the operator * while " "there should be a single space around each operator +, -, <= and >=." ) parameter_weight = {} comparison_multiplier = ( 1.0 if COMPARISON_OPS[tokens[-2]] is ComparisonOp.LEQ else -1.0 ) operator_sign = 1.0 # Determines whether the operator is + or - for idx, token in enumerate(tokens[:-2]): if idx % 2 == 0: split_token = token.split("*") parameter = "" # Initializing the parameter multiplier = 1.0 # Initializing the multiplier if len(split_token) == 2: # There is a non-unit multiplier try: multiplier = float(split_token[0]) except ValueError: raise ValueError( f"Multiplier should be float; got {split_token[0]}" ) parameter = split_token[1] elif len(split_token) == 1: # The multiplier is either -1 or 1 parameter = split_token[0] if parameter[0] == "-": # The multiplier is -1 parameter = parameter[1:] multiplier = -1.0 else: multiplier = 1.0 assert ( parameter in parameter_names ), f"Parameter {parameter} not in {parameter_names}." parameter_weight[parameter] = operator_sign * multiplier else: assert ( token == "+" or token == "-" ), f"Expected a mixed constraint, found operator {token}." operator_sign = 1.0 if token == "+" else -1.0 return ParameterConstraint( constraint_dict={ p: comparison_multiplier * parameter_weight[p] for p in parameter_weight }, bound=comparison_multiplier * bound, )
[docs]def outcome_constraint_from_str(representation: str) -> OutcomeConstraint: """Parse string representation of an outcome constraint.""" tokens = representation.split() assert len(tokens) == 3 and tokens[1] in COMPARISON_OPS, ( "Outcome constraint should be of form `metric_name >= x`, where x is a " "float bound and comparison operator is >= or <=." ) op = COMPARISON_OPS[tokens[1]] rel = False try: bound_repr = tokens[2] if bound_repr[-1] == "%": rel = True bound_repr = bound_repr[:-1] bound = float(bound_repr) except ValueError: raise ValueError("Outcome constraint bound should be a float.") return OutcomeConstraint(Metric(name=tokens[0]), op=op, bound=bound, relative=rel)
[docs]def make_experiment( parameters: List[TParameterRepresentation], name: Optional[str] = None, objective_name: Optional[str] = None, minimize: bool = False, parameter_constraints: Optional[List[str]] = None, outcome_constraints: Optional[List[str]] = None, status_quo: Optional[TParameterization] = None, experiment_type: Optional[str] = None, ) -> Experiment: """Instantiation wrapper that allows for creation of SimpleExperiment without importing or instantiating any Ax classes.""" exp_parameters: List[Parameter] = [parameter_from_json(p) for p in parameters] status_quo_arm = None if status_quo is None else Arm(parameters=status_quo) parameter_map = {p.name: p for p in exp_parameters} ocs = [outcome_constraint_from_str(c) for c in (outcome_constraints or [])] if status_quo_arm is None and any(oc.relative for oc in ocs): raise ValueError("Must set status_quo to have relative outcome constraints.") return Experiment( name=name, search_space=SearchSpace( parameters=exp_parameters, parameter_constraints=None if parameter_constraints is None else [constraint_from_str(c, parameter_map) for c in parameter_constraints], ), optimization_config=OptimizationConfig( objective=Objective( metric=Metric( name=objective_name or DEFAULT_OBJECTIVE_NAME, lower_is_better=minimize, ), minimize=minimize, ), outcome_constraints=ocs, ), status_quo=status_quo_arm, experiment_type=experiment_type, )
[docs]def raw_data_to_evaluation( raw_data: TEvaluationOutcome, objective_name: str, start_time: Optional[int] = None, end_time: Optional[int] = None, ) -> TEvaluationOutcome: """Format the trial evaluation data to a standard `TTrialEvaluation` (mapping from metric names to a tuple of mean and SEM) representation, or to a TFidelityTrialEvaluation. Note: this function expects raw_data to be data for a `Trial`, not a `BatchedTrial`. """ if isinstance(raw_data, dict): if any(isinstance(x, dict) for x in raw_data.values()): # pragma: no cover raise ValueError("Raw data is expected to be just for one arm.") for metric_name, dat in raw_data.items(): if not isinstance(dat, tuple): if not isinstance(dat, (float, int)): raise ValueError( "Raw data for an arm is expected to either be a tuple of " "numerical mean and SEM or just a numerical mean." f"Got: {dat} for metric '{metric_name}'." ) raw_data[metric_name] = (float(dat), None) return raw_data elif isinstance(raw_data, list): return raw_data elif isinstance(raw_data, tuple): return {objective_name: raw_data} elif isinstance(raw_data, (float, int)): return {objective_name: (raw_data, None)} elif isinstance(raw_data, (np.float32, np.float64, np.int32, np.int64)): return {objective_name: (numpy_type_to_python_type(raw_data), None)} else: raise ValueError( "Raw data has an invalid type. The data must either be in the form " "of a dictionary of metric names to mean, sem tuples, " "or a single mean, sem tuple, or a single mean." )
[docs]def data_from_evaluations( evaluations: Dict[str, TEvaluationOutcome], trial_index: int, sample_sizes: Dict[str, int], start_time: Optional[int] = None, end_time: Optional[int] = None, ) -> Data: """Transforms evaluations into Ax Data. Each evaluation is either a trial evaluation: {metric_name -> (mean, SEM)} or a fidelity trial evaluation for multi-fidelity optimizations: [(fidelities, {metric_name -> (mean, SEM)})]. Args: evalutions: Mapping from arm name to evaluation. trial_index: Index of the trial, for which the evaluations are. sample_sizes: Number of samples collected for each arm, may be empty if unavailable. start_time: Optional start time of run of the trial that produced this data, in milliseconds. end_time: Optional end time of run of the trial that produced this data, in milliseconds. """ if all(isinstance(evaluations[x], dict) for x in evaluations.keys()): # All evaluations are no-fidelity evaluations. data = Data.from_evaluations( evaluations=cast(Dict[str, TTrialEvaluation], evaluations), trial_index=trial_index, sample_sizes=sample_sizes, start_time=start_time, end_time=end_time, ) elif all(isinstance(evaluations[x], list) for x in evaluations.keys()): # All evaluations are with-fidelity evaluations. data = Data.from_fidelity_evaluations( evaluations=cast(Dict[str, TFidelityTrialEvaluation], evaluations), trial_index=trial_index, sample_sizes=sample_sizes, start_time=start_time, end_time=end_time, ) else: raise ValueError( # pragma: no cover "Evaluations included a mixture of no-fidelity and with-fidelity " "evaluations, which is not currently supported." ) return data