Source code for ax.storage.sqa_store.encoder

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Type, cast

from ax.core.arm import Arm
from ax.core.base import Base
from ax.core.base_trial import BaseTrial
from ax.core.batch_trial import AbandonedArm, BatchTrial
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.generator_run import GeneratorRun, GeneratorRunType
from ax.core.metric import Metric
from ax.core.multi_type_experiment import MultiTypeExperiment
from ax.core.objective import Objective
from ax.core.optimization_config import OptimizationConfig
from ax.core.outcome_constraint import OutcomeConstraint
from ax.core.parameter import ChoiceParameter, FixedParameter, Parameter, RangeParameter
from ax.core.parameter_constraint import (
    OrderConstraint,
    ParameterConstraint,
    SumConstraint,
)
from ax.core.runner import Runner
from ax.core.search_space import SearchSpace
from ax.core.simple_experiment import SimpleExperiment
from ax.core.trial import Trial
from ax.exceptions.storage import SQAEncodeError
from ax.modelbridge.generation_strategy import GenerationStrategy
from ax.storage.json_store.encoder import object_to_json
from ax.storage.metric_registry import METRIC_REGISTRY
from ax.storage.runner_registry import RUNNER_REGISTRY
from ax.storage.sqa_store.sqa_classes import (
    SQAAbandonedArm,
    SQAArm,
    SQAData,
    SQAExperiment,
    SQAGenerationStrategy,
    SQAGeneratorRun,
    SQAMetric,
    SQAParameter,
    SQAParameterConstraint,
    SQARunner,
    SQATrial,
)
from ax.storage.sqa_store.sqa_config import SQAConfig
from ax.storage.utils import (
    DomainType,
    MetricIntent,
    ParameterConstraintType,
    get_object_properties,
)
from ax.utils.common.timeutils import current_timestamp_in_millis
from ax.utils.common.typeutils import not_none


[docs]class Encoder: """Class that contains methods for storing an Ax experiment to SQLAlchemy. Instantiate with an instance of Config to customize the functionality. For even more flexibility, create a subclass. Attributes: config: Metadata needed to save and load an experiment to SQLAlchemy. """ def __init__(self, config: SQAConfig) -> None: self.config = config
[docs] def get_enum_value( self, value: Optional[str], enum: Optional[Enum] ) -> Optional[int]: """Given an enum name (string) and an enum (of ints), return the corresponding enum value. If the name is not present in the enum, throw an error. """ if value is None or enum is None: return None try: return enum[value].value # pyre-ignore T29651755 except KeyError: raise SQAEncodeError(f"Value {value} is invalid for enum {enum}.")
[docs] def experiment_to_sqa(self, experiment: Experiment) -> SQAExperiment: """Convert Ax Experiment to SQLAlchemy. In addition to creating and storing a new Experiment object, we need to create and store copies of the Trials, Metrics, Parameters, ParameterConstraints, and Runner owned by this Experiment. """ optimization_metrics = self.optimization_config_to_sqa( experiment.optimization_config ) tracking_metrics = [ self.metric_to_sqa(metric) for metric in experiment.metrics.values() if experiment.optimization_config is None or metric.name not in not_none(experiment.optimization_config).metrics ] parameters, parameter_constraints = self.search_space_to_sqa( experiment.search_space ) status_quo_name = None status_quo_parameters = None if experiment.status_quo is not None: # pyre-fixme[16]: `Optional` has no attribute `name`. status_quo_name = experiment.status_quo.name # pyre-fixme[16]: `Optional` has no attribute `parameters`. status_quo_parameters = experiment.status_quo.parameters trials = [ self.trial_to_sqa(trial=trial) for trial in experiment.trials.values() ] experiment_data = [] for trial_index, data_by_timestamp in experiment.data_by_trial.items(): for timestamp, data in data_by_timestamp.items(): experiment_data.append( self.data_to_sqa( data=data, trial_index=trial_index, timestamp=timestamp ) ) experiment_type = self.get_enum_value( value=experiment.experiment_type, enum=self.config.experiment_type_enum ) properties = {} if isinstance(experiment, MultiTypeExperiment): properties["subclass"] = "MultiTypeExperiment" runners = [ self.runner_to_sqa(runner, trial_type) for trial_type, runner in experiment._trial_type_to_runner.items() ] for metric in tracking_metrics: metric.trial_type = experiment._metric_to_trial_type[metric.name] if metric.name in experiment._metric_to_canonical_name: metric.canonical_name = experiment._metric_to_canonical_name[ metric.name ] else: runners = ( # pyre-fixme[6]: Expected `Runner` for 1st param but got # `Optional[Runner]`. [self.runner_to_sqa(experiment.runner)] if experiment.runner else [] ) if isinstance(experiment, SimpleExperiment): properties["subclass"] = "SimpleExperiment" # pyre-fixme: Expected `Base` for 1st...yping.Type[Experiment]`. experiment_class: Type[SQAExperiment] = self.config.class_to_sqa_class[ Experiment ] return experiment_class( # pyre-ignore[28] description=experiment.description, is_test=experiment.is_test, name=experiment.name, status_quo_name=status_quo_name, status_quo_parameters=status_quo_parameters, time_created=experiment.time_created, experiment_type=experiment_type, metrics=optimization_metrics + tracking_metrics, parameters=parameters, parameter_constraints=parameter_constraints, trials=trials, runners=runners, data=experiment_data, properties=properties, default_trial_type=experiment.default_trial_type, )
[docs] def parameter_to_sqa(self, parameter: Parameter) -> SQAParameter: """Convert Ax Parameter to SQLAlchemy.""" # pyre-fixme: Expected `Base` for 1st...typing.Type[Parameter]`. parameter_class: SQAParameter = self.config.class_to_sqa_class[Parameter] if isinstance(parameter, RangeParameter): # pyre-fixme[29]: `SQAParameter` is not a function. return parameter_class( name=parameter.name, domain_type=DomainType.RANGE, parameter_type=parameter.parameter_type, lower=float(parameter.lower), upper=float(parameter.upper), log_scale=parameter.log_scale, digits=parameter.digits, is_fidelity=parameter.is_fidelity, target_value=parameter.target_value, ) elif isinstance(parameter, ChoiceParameter): # pyre-fixme[29]: `SQAParameter` is not a function. return parameter_class( name=parameter.name, domain_type=DomainType.CHOICE, parameter_type=parameter.parameter_type, choice_values=parameter.values, is_ordered=parameter.is_ordered, is_task=parameter.is_task, is_fidelity=parameter.is_fidelity, target_value=parameter.target_value, ) elif isinstance(parameter, FixedParameter): # pyre-fixme[29]: `SQAParameter` is not a function. return parameter_class( name=parameter.name, domain_type=DomainType.FIXED, parameter_type=parameter.parameter_type, fixed_value=parameter.value, is_fidelity=parameter.is_fidelity, target_value=parameter.target_value, ) else: raise SQAEncodeError( "Cannot encode parameter to SQLAlchemy because parameter's " "subclass is invalid." ) # pragma: no cover
[docs] def parameter_constraint_to_sqa( self, parameter_constraint: ParameterConstraint ) -> SQAParameterConstraint: """Convert Ax ParameterConstraint to SQLAlchemy.""" # pyre-fixme[9]: parameter_constraint_cl... used as type `SQABase`. param_constraint_cls: SQAParameterConstraint = self.config.class_to_sqa_class[ ParameterConstraint ] if isinstance(parameter_constraint, OrderConstraint): # pyre-fixme[29]: `SQAParameterConstraint` is not a function. return param_constraint_cls( type=ParameterConstraintType.ORDER, constraint_dict=parameter_constraint.constraint_dict, bound=parameter_constraint.bound, ) elif isinstance(parameter_constraint, SumConstraint): # pyre-fixme[29]: `SQAParameterConstraint` is not a function. return param_constraint_cls( type=ParameterConstraintType.SUM, constraint_dict=parameter_constraint.constraint_dict, bound=parameter_constraint.bound, ) else: # pyre-fixme[29]: `SQAParameterConstraint` is not a function. return param_constraint_cls( type=ParameterConstraintType.LINEAR, constraint_dict=parameter_constraint.constraint_dict, bound=parameter_constraint.bound, )
[docs] def search_space_to_sqa( self, search_space: Optional[SearchSpace] ) -> Tuple[List[SQAParameter], List[SQAParameterConstraint]]: """Convert Ax SearchSpace to a list of SQLAlchemy Parameters and ParameterConstraints. """ if search_space is None: return [], [] parameters = [ self.parameter_to_sqa(parameter=parameter) for parameter in search_space.parameters.values() ] parameter_constraints = [ self.parameter_constraint_to_sqa(parameter_constraint=parameter_constraint) for parameter_constraint in search_space.parameter_constraints ] return parameters, parameter_constraints
[docs] def get_metric_type_and_properties( self, metric: Metric ) -> Tuple[int, Dict[str, Any]]: """Given an Ax Metric, convert its type into a member of MetricType enum, and construct a dictionary to be stored in the database `properties` json blob. """ metric_type = METRIC_REGISTRY.get(type(metric)) if metric_type is None: raise SQAEncodeError( "Cannot encode metric to SQLAlchemy because metric's " "subclass is invalid." ) # pragma: no cover # name and lower_is_better are stored directly on the metric, # so we don't need to include these in the properties blob properties = get_object_properties( object=metric, exclude_fields=["name", "lower_is_better"] ) return metric_type, properties
[docs] def metric_to_sqa(self, metric: Metric) -> SQAMetric: """Convert Ax Metric to SQLAlchemy.""" metric_type, properties = self.get_metric_type_and_properties(metric=metric) # pyre-fixme: Expected `Base` for 1st...t `typing.Type[Metric]`. metric_class: SQAMetric = self.config.class_to_sqa_class[Metric] # pyre-fixme[29]: `SQAMetric` is not a function. return metric_class( name=metric.name, metric_type=metric_type, intent=MetricIntent.TRACKING, properties=properties, lower_is_better=metric.lower_is_better, )
[docs] def objective_to_sqa(self, objective: Objective) -> SQAMetric: """Convert Ax Objective to SQLAlchemy.""" metric = objective.metric metric_type, properties = self.get_metric_type_and_properties(metric=metric) # pyre-fixme: Expected `Base` for 1st...t `typing.Type[Metric]`. metric_class: SQAMetric = self.config.class_to_sqa_class[Metric] # pyre-fixme[29]: `SQAMetric` is not a function. return metric_class( name=metric.name, metric_type=metric_type, intent=MetricIntent.OBJECTIVE, minimize=objective.minimize, properties=properties, lower_is_better=metric.lower_is_better, )
[docs] def outcome_constraint_to_sqa( self, outcome_constraint: OutcomeConstraint ) -> SQAMetric: """Convert Ax OutcomeConstraint to SQLAlchemy.""" metric = outcome_constraint.metric metric_type, properties = self.get_metric_type_and_properties(metric=metric) # pyre-fixme: Expected `Base` for 1st...t `typing.Type[Metric]`. metric_class: SQAMetric = self.config.class_to_sqa_class[Metric] # pyre-fixme[29]: `SQAMetric` is not a function. return metric_class( name=metric.name, metric_type=metric_type, intent=MetricIntent.OUTCOME_CONSTRAINT, bound=outcome_constraint.bound, op=outcome_constraint.op, relative=outcome_constraint.relative, properties=properties, lower_is_better=metric.lower_is_better, )
[docs] def optimization_config_to_sqa( self, optimization_config: Optional[OptimizationConfig] ) -> List[SQAMetric]: """Convert Ax OptimizationConfig to a list of SQLAlchemy Metrics.""" if optimization_config is None: return [] objective_sqa = self.objective_to_sqa(objective=optimization_config.objective) outcome_constraints_sqa = [ self.outcome_constraint_to_sqa(outcome_constraint=constraint) for constraint in optimization_config.outcome_constraints ] return [objective_sqa] + outcome_constraints_sqa
[docs] def arm_to_sqa(self, arm: Arm, weight: Optional[float] = 1.0) -> SQAArm: """Convert Ax Arm to SQLAlchemy.""" # pyre-fixme: Expected `Base` for 1st... got `typing.Type[Arm]`. arm_class: SQAArm = self.config.class_to_sqa_class[Arm] # pyre-fixme[29]: `SQAArm` is not a function. return arm_class(parameters=arm.parameters, name=arm._name, weight=weight)
[docs] def abandoned_arm_to_sqa(self, abandoned_arm: AbandonedArm) -> SQAAbandonedArm: """Convert Ax AbandonedArm to SQLAlchemy.""" # pyre-fixme[9]: abandoned_arm_class is ....sqa_store.db.SQABase]`. abandoned_arm_class: SQAAbandonedArm = self.config.class_to_sqa_class[ # pyre-fixme[6]: Expected `typing.Type[B...ing.Type[AbandonedArm]`. AbandonedArm ] # pyre-fixme[29]: `SQAAbandonedArm` is not a function. return abandoned_arm_class( name=abandoned_arm.name, abandoned_reason=abandoned_arm.reason, time_abandoned=abandoned_arm.time, )
[docs] def generator_run_to_sqa( self, generator_run: GeneratorRun, weight: Optional[float] = None ) -> SQAGeneratorRun: """Convert Ax GeneratorRun to SQLAlchemy. In addition to creating and storing a new GeneratorRun object, we need to create and store copies of the Arms, Metrics, Parameters, and ParameterConstraints owned by this GeneratorRun. """ arms = [ self.arm_to_sqa(arm=arm, weight=weight) for (arm, weight) in generator_run.arm_weights.items() ] metrics = self.optimization_config_to_sqa(generator_run.optimization_config) parameters, parameter_constraints = self.search_space_to_sqa( generator_run.search_space ) best_arm_name = None best_arm_parameters = None best_arm_predictions = None if generator_run.best_arm_predictions is not None: # pyre-fixme[16]: `Optional` has no attribute `__getitem__`. best_arm = generator_run.best_arm_predictions[0] best_arm_predictions = list(generator_run.best_arm_predictions[1]) best_arm_name = best_arm._name best_arm_parameters = best_arm.parameters model_predictions = ( list(generator_run.model_predictions) if generator_run.model_predictions is not None else None ) generator_run_type = self.get_enum_value( value=generator_run.generator_run_type, enum=self.config.generator_run_type_enum, ) # pyre-fixme: Expected `Base` for 1st...ing.Type[GeneratorRun]`. generator_run_class: SQAGeneratorRun = self.config.class_to_sqa_class[ GeneratorRun ] # pyre-fixme[29]: `SQAGeneratorRun` is not a function. return generator_run_class( arms=arms, metrics=metrics, parameters=parameters, parameter_constraints=parameter_constraints, time_created=generator_run.time_created, generator_run_type=generator_run_type, weight=weight, index=generator_run.index, fit_time=generator_run.fit_time, gen_time=generator_run.gen_time, best_arm_name=best_arm_name, best_arm_parameters=best_arm_parameters, best_arm_predictions=best_arm_predictions, model_predictions=model_predictions, model_key=generator_run._model_key, model_kwargs=object_to_json(generator_run._model_kwargs), bridge_kwargs=object_to_json(generator_run._bridge_kwargs), gen_metadata=object_to_json(generator_run._gen_metadata), model_state_after_gen=object_to_json(generator_run._model_state_after_gen), )
[docs] def generation_strategy_to_sqa( self, generation_strategy: GenerationStrategy, experiment_id: Optional[int] ) -> SQAGenerationStrategy: """Convert an Ax `GenerationStrategy` to SQLAlchemy, preserving its state, so that the restored generation strategy can be resumed from the point at which it was interrupted and stored. """ # pyre-ignore[9]: Expected Base, but redeclared to `SQAGenerationStrategy`. gs_class: SQAGenerationStrategy = self.config.class_to_sqa_class[ cast(Type[Base], GenerationStrategy) ] # pyre-fixme[29]: `SQAGenerationStrategy` is not a function. return gs_class( name=generation_strategy.name, steps=object_to_json(generation_strategy._steps), generated=generation_strategy._generated, observed=generation_strategy._observed, curr_index=generation_strategy._curr.index, generator_runs=[ self.generator_run_to_sqa(gr) for gr in generation_strategy._generator_runs ], data=self.data_to_sqa( data=generation_strategy._data, # Generation strategy data is a compilation of data, so it does # not strictly speaking have a timestamp. Setting timestamp to # current. timestamp=current_timestamp_in_millis(), trial_index=None, ), experiment_id=experiment_id, )
[docs] def runner_to_sqa( self, runner: Runner, trial_type: Optional[str] = None ) -> SQARunner: """Convert Ax Runner to SQLAlchemy.""" runner_type = RUNNER_REGISTRY.get(type(runner)) if runner_type is None: raise SQAEncodeError( "Cannot encode runner to SQLAlchemy because runner's " "subclass is invalid." ) # pragma: no cover properties = get_object_properties(object=runner) # pyre-fixme: Expected `Base` for 1st...t `typing.Type[Runner]`. runner_class: SQARunner = self.config.class_to_sqa_class[Runner] # pyre-fixme[29]: `SQARunner` is not a function. return runner_class( runner_type=runner_type, properties=properties, trial_type=trial_type )
[docs] def trial_to_sqa(self, trial: BaseTrial) -> SQATrial: """Convert Ax Trial to SQLAlchemy. In addition to creating and storing a new Trial object, we need to create and store the GeneratorRuns and Runner that it owns. """ # pyre-fixme[6]: Expected `Runner` for 1st param but got `Optional[Runner]`. runner = self.runner_to_sqa(runner=trial.runner) if trial.runner else None abandoned_arms = [] generator_runs = [] status_quo_name = None optimize_for_power = None if isinstance(trial, BatchTrial): abandoned_arms = [ self.abandoned_arm_to_sqa(abandoned_arm=abandoned_arm) for abandoned_arm in trial.abandoned_arms_metadata ] generator_runs = [ self.generator_run_to_sqa( generator_run=struct.generator_run, weight=struct.weight ) for struct in trial.generator_run_structs ] # appease pyre trial_status_quo = trial.status_quo trial_status_quo_weight_override = trial._status_quo_weight_override if ( trial_status_quo is not None and trial_status_quo_weight_override is not None ): status_quo_generator_run = GeneratorRun( arms=[trial_status_quo], weights=[trial_status_quo_weight_override], type=GeneratorRunType.STATUS_QUO.name, ) # this is a hack necessary to get equality tests passing; # otherwise you can encode same object and get two different results status_quo_generator_run._time_created = trial.time_created generator_runs.append( self.generator_run_to_sqa(generator_run=status_quo_generator_run) ) status_quo_name = trial_status_quo.name optimize_for_power = trial.optimize_for_power elif isinstance(trial, Trial): if trial.generator_run: generator_runs = [ # pyre-fixme[6]: Expected `GeneratorRun` for 1st param but got # `Optional[GeneratorRun]`. self.generator_run_to_sqa(generator_run=trial.generator_run) ] # pyre-fixme: Expected `Base` for 1st...ot `typing.Type[Trial]`. trial_class: SQATrial = self.config.class_to_sqa_class[Trial] # pyre-fixme[29]: `SQATrial` is not a function. return trial_class( abandoned_reason=trial.abandoned_reason, deployed_name=trial.deployed_name, index=trial.index, is_batch=isinstance(trial, BatchTrial), num_arms_created=trial._num_arms_created, optimize_for_power=optimize_for_power, run_metadata=trial.run_metadata, status=trial.status, status_quo_name=status_quo_name, time_completed=trial.time_completed, time_created=trial.time_created, time_staged=trial.time_staged, time_run_started=trial.time_run_started, trial_type=trial.trial_type, abandoned_arms=abandoned_arms, generator_runs=generator_runs, runner=runner, )
[docs] def data_to_sqa( self, data: Data, trial_index: Optional[int], timestamp: int ) -> SQAData: """Convert AE data to SQLAlchemy.""" # pyre-fixme: Expected `Base` for 1st...ot `typing.Type[Data]`. data_class: SQAData = self.config.class_to_sqa_class[Data] # pyre-fixme[29]: `SQAData` is not a function. return data_class( data_json=data.df.to_json(), description=data.description, time_created=timestamp, trial_index=trial_index, )