Source code for ax.runners.single_running_trial_mixin
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-strict
from collections import defaultdict
from typing import Dict, Iterable, Set
from ax.core.base_trial import BaseTrial, TrialStatus
[docs]class SingleRunningTrialMixin:
"""Mixin for Runners with a single running trial.
This mixin implements a simple poll_trial_status method that
allows for a single running trial (the latest running trial).
The returned status of trials that currently are marked as
running is completed.
"""
[docs] def poll_trial_status(
self, trials: Iterable[BaseTrial]
) -> Dict[TrialStatus, Set[int]]:
"""Checks the status of any non-terminal trials and returns their
indices as a mapping from TrialStatus to a list of indices. Required
for runners used with Ax ``Scheduler``.
NOTE: Does not need to handle waiting between polling calls while trials
are running; this function should just perform a single poll.
Args:
trials: Trials to poll.
Returns:
A dictionary mapping TrialStatus to a list of trial indices that have
the respective status at the time of the polling. This does not need to
include trials that at the time of polling already have a terminal
(ABANDONED, FAILED, COMPLETED) status (but it may).
"""
trials = list(trials)
if len(trials) == 0:
return {}
trial_statuses = defaultdict(set)
running_trial_indices = trials[0].experiment.running_trial_indices
max_running_trial_index = (
-1 if len(running_trial_indices) == 0 else max(running_trial_indices)
)
for trial in trials:
if trial.status in (
TrialStatus.ABANDONED,
TrialStatus.FAILED,
TrialStatus.COMPLETED,
):
continue
elif (trial.status == TrialStatus.RUNNING) and (
trial.index < max_running_trial_index
):
trial_statuses[TrialStatus.COMPLETED].add(trial.index)
else:
trial_statuses[trial.status].add(trial.index)
return dict(trial_statuses)