Scheduler
¶_We recommend reading through the "Building Blocks of Ax" tutorial before getting started with the Scheduler
, as using it in this tutorial will require an Ax Experiment
and an undertstanding of the experiment's subcomponents like the search space and the runner._
Scheduler.run_n_trials
.Scheduler.run_trials_and_yield_results
to run the optimization via a generator method.Scheduler
and external systems for trial evaluation¶Scheduler
is a closed-loop manager class in Ax that continuously deploys trial runs to an arbitrary external system in an asynchronous fashion, polls their status from that system, and leverages known trial results to generate more trials.
Key features of the Scheduler
:
Experiment
for optimization setup (an optimization config with metrics, a search space, a runner for trial evaluations),GenerationStrategy
for flexible specification of an optimization algorithm used to generate new trials to run,This scheme summarizes how the scheduler interacts with any external system used to run trial evaluations:
An example of an 'external system' running trial evaluations could be a remote server executing scheduled jobs, a subprocess conducting ML training runs, an engine running physics simulations, etc. For the sake of example here, let us assume a dummy external system with the following client:
from typing import Any, Dict, NamedTuple, List, Union
from time import time
from random import randint
from ax.core.base_trial import TrialStatus
from ax.utils.measurement.synthetic_functions import branin
class MockJob(NamedTuple):
"""Dummy class to represent a job scheduled on `MockJobQueue`."""
id: int
parameters: Dict[str, Union[str, float, int, bool]]
class MockJobQueueClient:
"""Dummy class to represent a job queue where the Ax `Scheduler` will
deploy trial evaluation runs during optimization.
"""
jobs: Dict[str, MockJob] = {}
def schedule_job_with_parameters(
self,
parameters: Dict[str, Union[str, float, int, bool]]
) -> int:
"""Schedules an evaluation job with given parameters and returns job ID.
"""
# Code to actually schedule the job and produce an ID would go here;
# using timestamp as dummy ID for this example.
job_id = int(time())
self.jobs[job_id] = MockJob(job_id, parameters)
return job_id
def get_job_status(self, job_id: int) -> TrialStatus:
""""Get status of the job by a given ID. For simplicity of the example,
return an Ax `TrialStatus`.
"""
job = self.jobs[job_id]
# Instead of randomizing trial status, code to check actual job status
# would go here.
if randint(0, 3) > 0:
return TrialStatus.COMPLETED
return TrialStatus.RUNNING
def get_outcome_value_for_completed_job(self, job_id: int) -> Dict[str, float]:
"""Get evaluation results for a given completed job."""
job = self.jobs[job_id]
# In a real external system, this would retrieve real relevant outcomes and
# not a synthetic function value.
return {
"branin": branin(job.parameters.get("x1"), job.parameters.get("x2"))
}
MOCK_JOB_QUEUE_CLIENT = MockJobQueueClient()
def get_mock_job_queue_client() -> MockJobQueueClient:
"""Obtain the singleton job queue instance."""
return MOCK_JOB_QUEUE_CLIENT
As mentioned above, using a Scheduler
requires a fully set up experiment with metrics and a runner. Refer to the "Building Blocks of Ax" tutorial to learn more about those components, as here we assume familiarity with them.
The following runner and metric set up intractions between the Scheduler
and the mock external system we assume:
from ax.core.runner import Runner
from ax.core.base_trial import BaseTrial
from ax.core.trial import Trial
class MockJobRunner(Runner): # Deploys trials to external system.
def run(self, trial: BaseTrial) -> Dict[str,Any]:
"""Deploys a trial and returns dict of run metadata."""
if not isinstance(trial, Trial):
raise ValueError("This runner only handles `Trial`.")
mock_job_queue = get_mock_job_queue_client()
job_id = mock_job_queue.schedule_job_with_parameters(
parameters=trial.arm.parameters
)
# This run metadata will be attached to trial as `trial.run_metadata`
# by the base `Scheduler`.
return {"job_id": job_id}
import pandas as pd
from ax.core.metric import Metric
from ax.core.base_trial import BaseTrial
from ax.core.data import Data
class BraninForMockJobMetric(Metric): # Pulls data for trial from external system.
def fetch_trial_data(self, trial: BaseTrial) -> Data:
"""Obtains data via fetching it from ` for a given trial."""
if not isinstance(trial, Trial):
raise ValueError("This metric only handles `Trial`.")
mock_job_queue = get_mock_job_queue_client()
# Here we leverage the "job_id" metadata created by `MockJobRunner.run`.
branin_data = mock_job_queue.get_outcome_value_for_completed_job(
job_id=trial.run_metadata.get("job_id")
)
df_dict = {
"trial_index": trial.index,
"metric_name": "branin",
"arm_name": trial.arm.name,
"mean": branin_data.get("branin"),
# Can be set to 0.0 if function is known to be noiseless
# or to an actual value when SEM is known. Setting SEM to
# `None` results in Ax assuming unknown noise and inferring
# noise level from data.
"sem": None,
}
return Data(df=pd.DataFrame.from_records([df_dict]))
Now we can set up the experiment using the runner and metric we defined. This experiment will have a single-objective optimization config, minimizing the Branin function, and the search space that corresponds to that function.
from ax import *
def make_branin_experiment_with_runner_and_metric() -> Experiment:
parameters = [
RangeParameter(
name="x1",
parameter_type=ParameterType.FLOAT,
lower=-5,
upper=10,
),
RangeParameter(
name="x2",
parameter_type=ParameterType.FLOAT,
lower=0,
upper=15,
),
]
objective=Objective(metric=BraninForMockJobMetric(name="branin"), minimize=True)
return Experiment(
name="branin_test_experiment",
search_space=SearchSpace(parameters=parameters),
optimization_config=OptimizationConfig(objective=objective),
runner=MockJobRunner(),
is_test=True, # Marking this experiment as a test experiment.
)
experiment = make_branin_experiment_with_runner_and_metric()
from typing import Dict, Set
from random import randint
from collections import defaultdict
from ax.service.scheduler import Scheduler, SchedulerOptions, TrialStatus
class MockJobQueueScheduler(Scheduler):
def poll_trial_status(self) -> Dict[TrialStatus, Set[int]]:
"""Queries the external system to compute a mapping from trial statuses
to a set indices of trials that are currently in that status. Only needs
to query for trials that are currently running but can query for all
trials too.
"""
status_dict = defaultdict(set)
for trial in self.running_trials: # `running_trials` is exposed on base `Scheduler`
mock_job_queue = get_mock_job_queue_client()
status = mock_job_queue.get_job_status(job_id=trial.run_metadata.get("job_id"))
status_dict[status].add(trial.index)
return status_dict
A Scheduler
also requires an Ax GenerationStrategy
specifying the algorithm to use for the optimization. Here we use the choose_generation_strategy
utility that auto-picks a generation strategy based on the search space properties. To construct a custom generation strategy instead, refer to the "Generation Strategy" tutorial.
Importantly, a generation strategy in Ax limits allowed parallelism levels for each generation step it contains. If you would like the Scheduler
to ensure parallelism limitations, set max_examples
on each generation step in your generation strategy.
from ax.modelbridge.dispatch_utils import choose_generation_strategy
generation_strategy = choose_generation_strategy(
search_space=experiment.search_space,
max_parallelism_cap=3,
)
[INFO 06-30 21:31:06] ax.modelbridge.dispatch_utils: Using GPEI (Bayesian optimization) since there are more continuous parameters than there are categories for the unordered categorical parameters. [INFO 06-30 21:31:06] ax.modelbridge.dispatch_utils: Using Bayesian Optimization generation strategy: GenerationStrategy(name='Sobol+GPEI', steps=[Sobol for 5 trials, GPEI for subsequent trials]). Iterations after 5 will take longer to generate due to model-fitting.
Now we have all the components needed to start the scheduler:
scheduler = MockJobQueueScheduler(
experiment=experiment,
generation_strategy=generation_strategy,
options=SchedulerOptions(),
)
[INFO 06-30 21:31:06] MockJobQueueScheduler: `Scheduler` requires experiment to have immutable search space and optimization config. Setting property immutable_search_space_and_opt_config to `True` on experiment.
Once the Scheduler
instance is set up, user can execute run_n_trials
as many times as needed, and each execution will add up to the specified max_trials
trials to the experiment. The number of trials actually run might be less than max_trials
if the optimization was concluded (e.g. there are no more points in the search space).
scheduler.run_n_trials(max_trials=3)
[INFO 06-30 21:31:06] MockJobQueueScheduler: Running trials [0]... [INFO 06-30 21:31:07] MockJobQueueScheduler: Running trials [1]... [INFO 06-30 21:31:08] MockJobQueueScheduler: Running trials [2]... [INFO 06-30 21:31:09] MockJobQueueScheduler: Retrieved COMPLETED trials: {1}. [INFO 06-30 21:31:09] MockJobQueueScheduler: Done submitting trials, waiting for remaining 2 running trials... [INFO 06-30 21:31:09] ax.core.experiment: No trials are in a state expecting data. Returning empty data [INFO 06-30 21:31:09] MockJobQueueScheduler: Retrieved COMPLETED trials: {0, 2}.
OptimizationResult()
We can examine experiment
to see that it now has three trials:
from ax.service.utils.report_utils import exp_to_df
exp_to_df(experiment)
branin | trial_index | arm_name | x1 | x2 | trial_status | generator_model | |
---|---|---|---|---|---|---|---|
0 | 200.996201 | 0 | 0_0 | 6.695751 | 14.633260 | COMPLETED | Sobol |
1 | 191.377605 | 1 | 1_0 | 5.950573 | 14.230081 | COMPLETED | Sobol |
2 | 97.219351 | 2 | 2_0 | -2.403462 | 0.859512 | COMPLETED | Sobol |
Now we can run run_n_trials
again to add three more trials to the experiment.
scheduler.run_n_trials(max_trials=3)
[INFO 06-30 21:31:09] MockJobQueueScheduler: Running trials [3]... [INFO 06-30 21:31:10] MockJobQueueScheduler: Running trials [4]... [INFO 06-30 21:31:12] MockJobQueueScheduler: Running trials [5]... [INFO 06-30 21:31:13] ax.core.experiment: No trials are in a state expecting data. Returning empty data [INFO 06-30 21:31:13] MockJobQueueScheduler: Retrieved COMPLETED trials: {3, 4, 5}.
OptimizationResult()
Examiniming the experiment, we now see 6 trials, one of which is produced by Bayesian optimization (GPEI):
exp_to_df(experiment)
branin | trial_index | arm_name | x1 | x2 | trial_status | generator_model | |
---|---|---|---|---|---|---|---|
0 | 200.996201 | 0 | 0_0 | 6.695751 | 14.633260 | COMPLETED | Sobol |
1 | 191.377605 | 1 | 1_0 | 5.950573 | 14.230081 | COMPLETED | Sobol |
2 | 97.219351 | 2 | 2_0 | -2.403462 | 0.859512 | COMPLETED | Sobol |
3 | 55.271749 | 3 | 3_0 | 8.509632 | 8.961485 | COMPLETED | Sobol |
4 | 39.863092 | 4 | 4_0 | -1.780897 | 3.599136 | COMPLETED | Sobol |
5 | 5.804770 | 5 | 5_0 | 3.082360 | 0.000000 | COMPLETED | GPEI |
For each call to run_n_trials
, one can specify a timeout; if run_n_trials
has been running for too long without finishing its max_trials
, the operation will exit gracefully:
scheduler.run_n_trials(max_trials=3, timeout_hours=0.00001)
[INFO 06-30 21:31:13] MockJobQueueScheduler: Running trials [6]... [ERROR 06-30 21:31:13] MockJobQueueScheduler: Optimization timed out (timeout hours: 1e-05)! [INFO 06-30 21:31:13] MockJobQueueScheduler: `should_abort` is `True`, not running more trials. [INFO 06-30 21:31:13] MockJobQueueScheduler: Waiting for completed trials (for 1 sec, currently running trials: 1). [INFO 06-30 21:31:14] MockJobQueueScheduler: Waiting for completed trials (for 1.5 sec, currently running trials: 1). [INFO 06-30 21:31:16] ax.core.experiment: No trials are in a state expecting data. Returning empty data [INFO 06-30 21:31:16] MockJobQueueScheduler: Retrieved COMPLETED trials: {6}. [ERROR 06-30 21:31:16] MockJobQueueScheduler: Optimization timed out (timeout hours: 1e-05)!
OptimizationResult()
When a scheduler is SQL-enabled, it will automatically save all updates it makes to the experiment in the course of the optimization. The experiment can then be resumed in the event of a crash or after a pause.
To store state of optimization to an SQL backend, first follow setup instructions on Ax website. Having set up the SQL backend, pass DBSettings
to the Scheduler
on instantiation (note that SQLAlchemy dependency will have to be installed – for installation, refer to optional dependencies on Ax website):
from ax.storage.sqa_store.structs import DBSettings
# URL is of the form "dialect+driver://username:password@host:port/database".
# Instead of URL, can provide a `creator function`; can specify custom encoders/decoders if necessary.
db_settings = DBSettings(url="postgresql+psycopg2://sarah:c82i94d@ocalhost:5432/foobar")
stored_experiment = make_branin_experiment_with_runner_and_metric()
generation_strategy = choose_generation_strategy(search_space=experiment.search_space)
scheduler_with_storage = MockJobQueueScheduler(
experiment=stored_experiment,
generation_strategy=generation_strategy,
options=SchedulerOptions(),
db_settings=db_settings,
)
[INFO 06-30 21:31:16] ax.modelbridge.dispatch_utils: Using GPEI (Bayesian optimization) since there are more continuous parameters than there are categories for the unordered categorical parameters. [INFO 06-30 21:31:16] ax.modelbridge.dispatch_utils: Using Bayesian Optimization generation strategy: GenerationStrategy(name='Sobol+GPEI', steps=[Sobol for 5 trials, GPEI for subsequent trials]). Iterations after 5 will take longer to generate due to model-fitting. [INFO 06-30 21:31:16] MockJobQueueScheduler: `Scheduler` requires experiment to have immutable search space and optimization config. Setting property immutable_search_space_and_opt_config to `True` on experiment.
--------------------------------------------------------------------------- OperationalError Traceback (most recent call last) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 3210 try: -> 3211 return fn() 3212 except dialect.dbapi.Error as e: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in connect(self) 306 """ --> 307 return _ConnectionFairy._checkout(self) 308 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy) 766 if not fairy: --> 767 fairy = _ConnectionRecord.checkout(pool) 768 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in checkout(cls, pool) 424 def checkout(cls, pool): --> 425 rec = pool._do_get() 426 try: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 145 with util.safe_reraise(): --> 146 self._dec_overflow() 147 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 71 exc_value, ---> 72 with_traceback=exc_tb, 73 ) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***) 206 try: --> 207 raise exception 208 finally: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 142 try: --> 143 return self._create_connection() 144 except: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _create_connection(self) 252 --> 253 return _ConnectionRecord(self) 254 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect) 367 if connect: --> 368 self.__connect() 369 self.finalize_callback = deque() /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self) 610 with util.safe_reraise(): --> 611 pool.logger.debug("Error on connect(): %s", e) 612 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 71 exc_value, ---> 72 with_traceback=exc_tb, 73 ) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***) 206 try: --> 207 raise exception 208 finally: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self) 604 self.starttime = time.time() --> 605 connection = pool._invoke_creator(self) 606 pool.logger.debug("Created new connection %r", connection) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/create.py in connect(connection_record) 577 return connection --> 578 return dialect.connect(*cargs, **cparams) 579 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 583 # inherits the docstring from interfaces.Dialect.connect --> 584 return self.dbapi.connect(*cargs, **cparams) 585 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs) 121 dsn = _ext.make_dsn(dsn, **kwargs) --> 122 conn = _connect(dsn, connection_factory=connection_factory, **kwasync) 123 if cursor_factory is not None: OperationalError: could not translate host name "ocalhost" to address: Temporary failure in name resolution The above exception was the direct cause of the following exception: OperationalError Traceback (most recent call last) /tmp/ipykernel_2875/514109194.py in <module> 11 generation_strategy=generation_strategy, 12 options=SchedulerOptions(), ---> 13 db_settings=db_settings, 14 ) ~/work/Ax/Ax/ax/service/scheduler.py in __init__(self, experiment, generation_strategy, options, db_settings, _skip_experiment_save) 281 if self.db_settings_set and not _skip_experiment_save: 282 self._maybe_save_experiment_and_generation_strategy( --> 283 experiment=experiment, generation_strategy=generation_strategy 284 ) 285 ~/work/Ax/Ax/ax/service/utils/with_db_settings_base.py in _maybe_save_experiment_and_generation_strategy(self, experiment, generation_strategy) 126 exp_name = not_none(experiment.name) 127 exp_id, gs_id = self._get_experiment_and_generation_strategy_db_id( --> 128 experiment_name=exp_name 129 ) 130 if exp_id: # Experiment in DB. ~/work/Ax/Ax/ax/service/utils/with_db_settings_base.py in _get_experiment_and_generation_strategy_db_id(self, experiment_name) 97 98 exp_id = _get_experiment_id( ---> 99 experiment_name=experiment_name, config=self.db_settings.decoder.config 100 ) 101 if not exp_id: ~/work/Ax/Ax/ax/storage/sqa_store/load.py in _get_experiment_id(experiment_name, config) 180 sqa_experiment_id = ( 181 session.query(exp_sqa_class.id) # pyre-ignore --> 182 .filter_by(name=experiment_name) 183 .one_or_none() 184 ) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/orm/query.py in one_or_none(self) 2784 2785 """ -> 2786 return self._iter().one_or_none() 2787 2788 def one(self): /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/orm/query.py in _iter(self) 2845 statement, 2846 params, -> 2847 execution_options={"_sa_orm_load_options": self.load_options}, 2848 ) 2849 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/orm/session.py in execute(self, statement, params, execution_options, bind_arguments, _parent_execute_state, _add_event, **kw) 1686 ) 1687 else: -> 1688 conn = self._connection_for_bind(bind) 1689 result = conn._execute_20(statement, params or {}, execution_options) 1690 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/orm/session.py in _connection_for_bind(self, engine, execution_options, **kw) 1528 if self._transaction is not None or self._autobegin(): 1529 return self._transaction._connection_for_bind( -> 1530 engine, execution_options 1531 ) 1532 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/orm/session.py in _connection_for_bind(self, bind, execution_options) 745 ) 746 else: --> 747 conn = bind.connect() 748 local_connect = True 749 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in connect(self, close_with_result) 3163 """ 3164 -> 3165 return self._connection_cls(self, close_with_result=close_with_result) 3166 3167 @util.deprecated( /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in __init__(self, engine, connection, close_with_result, _branch_from, _execution_options, _dispatch, _has_events, _allow_revalidate) 94 connection 95 if connection is not None ---> 96 else engine.raw_connection() 97 ) 98 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in raw_connection(self, _connection) 3242 3243 """ -> 3244 return self._wrap_pool_connect(self.pool.connect, _connection) 3245 3246 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 3213 if connection is None: 3214 Connection._handle_dbapi_exception_noconnection( -> 3215 e, dialect, self 3216 ) 3217 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine) 2067 elif should_wrap: 2068 util.raise_( -> 2069 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 2070 ) 2071 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***) 205 206 try: --> 207 raise exception 208 finally: 209 # credit to /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 3209 dialect = self.dialect 3210 try: -> 3211 return fn() 3212 except dialect.dbapi.Error as e: 3213 if connection is None: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in connect(self) 305 306 """ --> 307 return _ConnectionFairy._checkout(self) 308 309 def _return_conn(self, record): /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _checkout(cls, pool, threadconns, fairy) 765 def _checkout(cls, pool, threadconns=None, fairy=None): 766 if not fairy: --> 767 fairy = _ConnectionRecord.checkout(pool) 768 769 fairy._pool = pool /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in checkout(cls, pool) 423 @classmethod 424 def checkout(cls, pool): --> 425 rec = pool._do_get() 426 try: 427 dbapi_connection = rec.get_connection() /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 144 except: 145 with util.safe_reraise(): --> 146 self._dec_overflow() 147 else: 148 return self._do_get() /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 70 compat.raise_( 71 exc_value, ---> 72 with_traceback=exc_tb, 73 ) 74 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***) 205 206 try: --> 207 raise exception 208 finally: 209 # credit to /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/impl.py in _do_get(self) 141 if self._inc_overflow(): 142 try: --> 143 return self._create_connection() 144 except: 145 with util.safe_reraise(): /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in _create_connection(self) 251 """Called by subclasses to create a new ConnectionRecord.""" 252 --> 253 return _ConnectionRecord(self) 254 255 def _invalidate(self, connection, exception=None, _checkin=True): /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __init__(self, pool, connect) 366 self.__pool = pool 367 if connect: --> 368 self.__connect() 369 self.finalize_callback = deque() 370 /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self) 609 except Exception as e: 610 with util.safe_reraise(): --> 611 pool.logger.debug("Error on connect(): %s", e) 612 else: 613 # in SQLAlchemy 1.4 the first_connect event is not used by /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py in __exit__(self, type_, value, traceback) 70 compat.raise_( 71 exc_value, ---> 72 with_traceback=exc_tb, 73 ) 74 else: /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/util/compat.py in raise_(***failed resolving arguments***) 205 206 try: --> 207 raise exception 208 finally: 209 # credit to /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/pool/base.py in __connect(self) 603 try: 604 self.starttime = time.time() --> 605 connection = pool._invoke_creator(self) 606 pool.logger.debug("Created new connection %r", connection) 607 self.connection = connection /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/create.py in connect(connection_record) 576 if connection is not None: 577 return connection --> 578 return dialect.connect(*cargs, **cparams) 579 580 creator = pop_kwarg("creator", connect) /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 582 def connect(self, *cargs, **cparams): 583 # inherits the docstring from interfaces.Dialect.connect --> 584 return self.dbapi.connect(*cargs, **cparams) 585 586 def create_connect_args(self, url): /opt/hostedtoolcache/Python/3.7.10/x64/lib/python3.7/site-packages/psycopg2/__init__.py in connect(dsn, connection_factory, cursor_factory, **kwargs) 120 121 dsn = _ext.make_dsn(dsn, **kwargs) --> 122 conn = _connect(dsn, connection_factory=connection_factory, **kwasync) 123 if cursor_factory is not None: 124 conn.cursor_factory = cursor_factory OperationalError: (psycopg2.OperationalError) could not translate host name "ocalhost" to address: Temporary failure in name resolution (Background on this error at: http://sqlalche.me/e/14/e3q8)
stored_experiment.name
To resume a stored experiment:
reloaded_experiment_scheduler = MockJobQueueScheduler.from_stored_experiment(
experiment_name='branin_test_experiment',
options=SchedulerOptions(),
# `DBSettings` are also required here so scheduler has access to the
# database, from which it needs to load the experiment.
db_settings=db_settings,
)
With the newly reloaded experiment, the Scheduler
can continue the optimization:
reloaded_experiment_scheduler.run_n_trials(max_trials=3)
Scheduler
exposes many options to configure the exact settings of the closed-loop optimization to perform. A few notable ones are:
trial_type
–– currently only Trial
and not BatchTrial
is supported, but support for BatchTrial
-s will follow,tolerated_trial_failure_rate
and min_failed_trials_for_failure_rate_check
–– together these two settings control how the scheduler monitors the failure rate among trial runs it deploys. Once min_failed_trials_for_failure_rate_check
is deployed, the scheduler will start checking whether the ratio of failed to total trials is greater than tolerated_trial_failure_rate
, and if it is, scheduler will exit the optimization with a FailureRateExceededError
,ttl_seconds_for_trials
–– sometimes a failure in a trial run means that it will be difficult to query its status (e.g. due to a crash). If this setting is specified, the Ax Experiment
will automatically mark trials that have been running for too long (more than their 'time-to-live' (TTL) seconds) as failed,run_trials_in_batches
–– if True
, the scheduler will attempt to run trials not by calling Scheduler.run_trial
in a loop, but by calling Scheduler.run_trials
on all ready-to-deploy trials at once. This could allow for saving compute in cases where the deployment operation has large overhead and deploying many trials at once saves compute. Note that using this option successfully will require your scheduler subclass to implement MySchedulerSubclass.run_trials
and MySchedulerSubclass.poll_available_capacity
.The rest of the options is described in the docstring below:
print(SchedulerOptions.__doc__)
Settings for a scheduler instance. Attributes: trial_type: Type of trials (1-arm ``Trial`` or multi-arm ``Batch Trial``) that will be deployed using the scheduler. Defaults to 1-arm `Trial`. NOTE: use ``BatchTrial`` only if need to evaluate multiple arms *together*, e.g. in an A/B-test influenced by data nonstationarity. For cases where just deploying multiple arms at once is beneficial but the trials are evaluated *independently*, implement ``run_trials`` method in scheduler subclass, to deploy multiple 1-arm trials at the same time. total_trials: Limit on number of trials a given ``Scheduler`` should run. If no stopping criteria are implemented on a given scheduler, exhaustion of this number of trials will be used as default stopping criterion in ``Scheduler.run_all_trials``. Required to be non-null if using ``Scheduler.run_all_trials`` (not required for ``Scheduler.run_n_trials``). tolerated_trial_failure_rate: Fraction of trials in this optimization that are allowed to fail without the whole optimization ending. Expects value between 0 and 1. NOTE: Failure rate checks begin once min_failed_trials_for_failure_rate_check trials have failed; after that point if the ratio of failed trials to total trials ran so far exceeds the failure rate, the optimization will halt. min_failed_trials_for_failure_rate_check: The minimum number of trials that must fail in `Scheduler` in order to start checking failure rate. log_filepath: File, to which to write optimization logs. logging_level: Minimum level of logging statements to log, defaults to ``logging.INFO``. ttl_seconds_for_trials: Optional TTL for all trials created within this ``Scheduler``, in seconds. Trials that remain ``RUNNING`` for more than their TTL seconds will be marked ``FAILED`` once the TTL elapses and may be re-suggested by the Ax optimization models. init_seconds_between_polls: Initial wait between rounds of polling, in seconds. Relevant if using the default wait- for-completed-runs functionality of the base ``Scheduler`` (if ``wait_for_completed_trials_and_report_results`` is not overridden). With the default waiting, every time a poll returns that no trial evaluations completed, wait time will increase; once some completed trial evaluations are found, it will reset back to this value. Specify 0 to not introduce any wait between polls. min_seconds_before_poll: Minimum number of seconds between beginning to run a trial and the first poll to check trial status. seconds_between_polls_backoff_factor: The rate at which the poll interval increases. run_trials_in_batches: If True and ``poll_available_capacity`` is implemented to return non-null results, trials will be dispatched in groups via `run_trials` instead of one-by-one via ``run_trial``. This allows to save time, IO calls or computation in cases where dispatching trials in groups is more efficient then sequential deployment. The size of the groups will be determined as the minimum of ``self.poll_available_capacity()`` and the number of generator runs that the generation strategy is able to produce without more data or reaching its allowed max paralellism limit. early_stopping_strategy: A ``BaseEarlyStoppingStrategy`` that determines whether a trial should be stopped given the current state of the experiment. Used in ``should_stop_trials_early``.
The Scheduler
can report the optimization result to an external system each time there are new completed trials if the user-implemented subclass implements MySchedulerSubclass.report_results
to do so. For example, the folliwing method:
class MySchedulerSubclass(Scheduler):
...
def report_results(self):
write_to_external_database(len(self.experiment.trials))
return (True, {}) # Returns optimization success status and optional dict of outputs.
could be used to record number of trials in experiment so far in an external database.
Since report_results
is an instance method, it has access to self.experiment
and self.generation_strategy
, which contain all the information about the state of the optimization thus far.
run_trials_and_yield_results
generator method¶In some systems it's beneficial to have greater control over Scheduler.run_n_trials
instead of just starting it and needing to wait for it to run all the way to completion before having access to its output. For this purpose, the Scheduler
implements a generator method run_trials_and_yield_results
, which yields the output of Scheduler.report_results
each time there are new completed trials and can be used like so:
class ResultReportingScheduler(MockJobQueueScheduler):
def report_results(self):
return True, {
"trials so far": len(self.experiment.trials),
"currently producing trials from generation step": self.generation_strategy._curr.model_name,
"running trials": [t.index for t in self.running_trials],
}
experiment = make_branin_experiment_with_runner_and_metric()
scheduler = ResultReportingScheduler(
experiment=experiment,
generation_strategy=choose_generation_strategy(
search_space=experiment.search_space,
max_parallelism_cap=3,
),
options=SchedulerOptions(),
)
for reported_result in scheduler.run_trials_and_yield_results(max_trials=6):
print("Reported result: ", reported_result)
[INFO 06-14 09:38:12] ax.modelbridge.dispatch_utils: Using Bayesian Optimization generation strategy: GenerationStrategy(name='Sobol+GPEI', steps=[Sobol for 5 trials, GPEI for subsequent trials]). Iterations after 5 will take longer to generate due to model-fitting. [INFO 06-14 09:38:12] ResultReportingScheduler: `Scheduler` requires experiment to have immutable search space and optimization config. Setting property immutable_search_space_and_opt_config to `True` on experiment. [INFO 06-14 09:38:12] ResultReportingScheduler: Running trials [0]... [INFO 06-14 09:38:13] ResultReportingScheduler: Running trials [1]... [INFO 06-14 09:38:13] ResultReportingScheduler: Running trials [2]... [INFO 06-14 09:38:14] ResultReportingScheduler: Generated all trials that can be generated currently. Max parallelism currently reached. [INFO 06-14 09:38:14] ax.core.experiment: No trials are in a state expecting data. Returning empty data [INFO 06-14 09:38:14] ResultReportingScheduler: Retrieved COMPLETED trials: {0, 1, 2}. [INFO 06-14 09:38:14] ResultReportingScheduler: Running trials [3]...
Reported result: {'trials so far': 3, 'currently producing trials from generation step': 'Sobol', 'running trials': []}
[INFO 06-14 09:38:15] ResultReportingScheduler: Running trials [4]... [INFO 06-14 09:38:17] ResultReportingScheduler: Running trials [5]... [INFO 06-14 09:38:18] ResultReportingScheduler: Retrieved COMPLETED trials: {4, 5}. [INFO 06-14 09:38:18] ResultReportingScheduler: Done submitting trials, waiting for remaining 1 running trials... [INFO 06-14 09:38:18] ax.core.experiment: No trials are in a state expecting data. Returning empty data [INFO 06-14 09:38:18] ResultReportingScheduler: Retrieved COMPLETED trials: {3}.
Reported result: {'trials so far': 6, 'currently producing trials from generation step': 'GPEI', 'running trials': [3]} Reported result: {'trials so far': 6, 'currently producing trials from generation step': 'GPEI', 'running trials': []}