Main package#

Subpackages#

Submodules#

Model Configuration module#

class ablator.main.configs.ModelConfig(*args, **kwargs)[source]#

Bases: ConfigBase

Model configuration. When initializing a model, the config is passed to the model constructor.

config_class#

alias of ModelConfig

class ablator.main.configs.Optim(value)[source]#

Bases: Enum

Type of optimization direction.

class ablator.main.configs.ParallelConfig(*args, **kwargs)[source]#

Bases: RunConfig

Parallel training configuration. {"val_loss": "min"}

Attributes:
total_trials: int

total number of trials.

concurrent_trials: int

number of trials to run concurrently.

search_space: Dict[SearchSpace]

search space for hyperparameter search, eg. {"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 10], value_type="int"),}

optim_metrics: Dict[Optim]

metrics to optimize, eg. {"val_loss": "min"}

search_algo: SearchAlgo = SearchAlgo.tpe

type of search algorithm.

ignore_invalid_params: bool = False

whether to ignore invalid parameters when sampling.

remote_config: Optional[RemoteConfig] = None

remote storage configuration.

gcp_config: Optional[GcpConfig] = None

gcp configuration.

config_class#

alias of ParallelConfig

class ablator.main.configs.RunConfig(*args, **kwargs)[source]#

Bases: ConfigBase

Base configuration for running an experiment.

Attributes:
experiment_dir: Optional[str] = None

location to store experiment artifacts.

random_seed: Optional[int] = None

random seed.

train_config: TrainConfig

training configuration. (check TrainConfig for more details)

model_config: ModelConfig

model configuration. (check ModelConfig for more details)

keep_n_checkpoints: int = 3

number of latest checkpoints to keep.

tensorboard: bool = True

whether to use tensorboardLogger.

amp: bool = True

whether to use automatic mixed precision when running on gpu.

device: str = “cuda” or “cpu”

device to run on.

verbose: Literal[“console”, “tqdm”, “silent”] = “console”

verbosity level.

eval_subsample: float = 1

fraction of the dataset to use for evaluation.

metrics_n_batches: int = 32

max number of batches stored in every tag(train, eval, test) for evaluation.

metrics_mb_limit: int = 100

max number of megabytes stored in every tag(train, eval, test) for evaluation.

early_stopping_iter: Optional[int] = None

The maximum allowed difference between the current iteration and the last iteration with the best metric before applying early stopping. Early stopping will be triggered if the difference (current_itr - best_itr) exceeds early_stopping_iter. If set to None, early stopping will not be applied.

eval_epoch: float = 1

The epoch interval between two evaluations.

log_epoch: float = 1

The epoch interval between two logging.

init_chkpt: Optional[str] = None

path to a checkpoint to initialize the model with.

warm_up_epochs: float = 0

number of epochs marked as warm up epochs.

divergence_factor: float = 100

if cur_loss > best_loss > divergence_factor, the model is considered to have diverged.

config_class#

alias of RunConfig

property uid: str#

Get the unique identifier for the configuration object.

Returns:
str

The unique identifier for the configuration object.

class ablator.main.configs.SearchAlgo(value)[source]#

Bases: Enum

Type of search algorithm.

class ablator.main.configs.SearchSpace(*args, **kwargs)[source]#

Bases: ConfigBase

Search space configuration.

__init__(*args, **kwargs) None[source]#
config_class#

alias of SearchSpace

class ablator.main.configs.SearchType(value)[source]#

Bases: Enum

Type of search space.

class ablator.main.configs.TrainConfig(*args, **kwargs)[source]#

Bases: ConfigBase

Training configuration.

Attributes:
dataset: str

dataset name. maybe used in custom dataset loader functions.

batch_size: int

batch size.

epochs: int

number of epochs to train.

optimizer_config: OptimizerConfig

optimizer configuration. (check OptimizerConfig for more details)

scheduler_config: Optional[SchedulerConfig]

scheduler configuration. (check SchedulerConfig for more details)

rand_weights_init: bool = True

whether to initialize model weights randomly.

config_class#

alias of TrainConfig

Multi-process Trainer module#

class ablator.main.mp.ParallelTrainer(*args, run_config: ParallelConfig, **kwargs)[source]#

Bases: ProtoTrainer

A class for parallelizing training of models of different configurations with ray. Metrics of these models are for optuna to tune hyperparameters. They are also logged to optuna storage.

Attributes:
run_configParallelConfig

Running configuration for parallel training.

devicestr

The device to use for training.

experiment_dirPath

The directory that stores experiment information (optuna storage, experiment state database).

loggerFileLogger

The logger that writes messages to a file and prints them to the console.

experiment_stateExperimentState

This attribute manages optuna trials.

total_trialsint

Number of trials to run.

gpu_mem_bottleneckint

The minimum memory capacity of all available gpus.

cpufloat

The number of cpu used per trial.

gpufloat

The number of gpu used per trial.

total_mem_usageint

Total amount of memory usage.

__init__(*args, run_config: ParallelConfig, **kwargs)[source]#

Initialize ParallelTrainer using config from run_config.

Parameters:
run_configParallelConfig

The runtime configuration for this trainer.

*argstuple

Extra arguments used for ProtoTrainer

**kwargsdict, optional

Extra arguments to ProtoTrainer, this can be {'wrapper': ModelWrapper}.

evaluate()[source]#

Evaluate model performance in trials that are completed, using evaluation functions defined in the model wrapper. Evaluation results will be logged to the console and log files in the experiment directory. This method also synchronizes the experiment directory to Google cloud storage and remote servers.

kill_idle()[source]#

Kill any ray processes that are idle.

launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = 'auto', resume: bool = False)[source]#

Set up and launch the parallel training and tuning process. This includes:

  • prepare ray cluster for running optuna trials to tune hyperparameters.

  • if available, synchronize Google Cloud storage buckets to working directory defined in runtime configuration.

  • initialize optuna trials and add them to optuna storage and experiment state database for tracking training progress (or retrieve existing trials from optuna storage).

Trials initialized (or retrieved), experiment_state.pending_trials, will be pushed to ray nodes so they can be executed in parallel. After all trials have finished and progress is recorded in sqlite databases in the working directory, these changes will be synchronized back to the GCP nodes via rsync_up() method.

Parameters:
working_directorystr

The working directory that stores codes, modules that will be used by ray.

auxilary_moduleslist[tys.ModuleType], None

A list of modules to be used as ray clusters’ working environment.

ray_head_addressstr, default=’auto’

Ray cluster address.

resumebool, default=False

Whether to resume training the model from existing checkpoints and existing experiment state.

sync_down()[source]#

Synchronize content of Google cloud storage to current working directory and to all GCP nodes.

Notes

GCP nodes names should be equal to ray node names. Can be previously run trials if we are resuming the state. First sync down from the remote

sync_up()[source]#

Synchronize content of current experiment directory to Google cloud storage and other remote servers.

ablator.main.mp.parse_metrics(optim_direction: list[str], metrics: dict[str, float] | None)[source]#

Parse metrics to be optimized.

Parameters:
optim_direction: list[str]

The metrics to be optimized, defined in the ParallelConfig.

metrics: dict[str, float]

The metrics returned after a ray job finishes.

Returns:
dict[str, float]

A dictionary of metric names and their corresponding metric values.

ablator.main.mp.parse_rsync_paths(rsynced_folder: Path | str, root_folder: Path | str | None = None) dict[str, pathlib.Path | str][source]#

Parse the experiment directory that’s being in sync with remote servers (Google cloud storage, other remote nodes) and the root folder.

Parameters:
rsynced_folderPath, str

The experiment directory that’s being in sync with remote servers.

root_folderPath, str, None, default=None

The root folder that contains all experiment directories.

Returns:
dict[str, Path]

A dictionary with 2 keys: local_path and remote_path, which specifies the local directory and the remote path that will be in sync.

ablator.main.mp.train_main_remote(model: ModelWrapper, run_config: ParallelConfig, mp_logger: FileLogger, root_dir: Path, fault_tollerant: bool = True, crash_exceptions_types: list[type] | None = None, resume: bool = False, clean_reset: bool = False) tuple[ablator.main.configs.ParallelConfig, dict[str, float] | None, ablator.main.state.TrialState][source]#

The trial job that will be executed remotely at a ray node. This is where model training happens. In addition, experiment directory will be synchronized to the Google Cloud storage and remote nodes. Synchronization is done via GcpConfig and RemoteConfig rsync_up() methods. Refer to documentation of these 2 classes for more details.

Parameters:
modelModelWrapper

The ModelWrapper that is used to train a model.

run_configParallelConfig

Runtime configuration for this trial.

mp_loggerFileLogger

The file logger that’s used to log training progress.

root_dirPath

The root directory that stores experiment states (experiment directory).

fault_tollerantbool, optional, default=True

Whether to tollerate crashes, aka to cease execution when the ray job crashes.

crash_exceptions_typeslist[type], None, optional, default=None

Types of exceptions that are considered as crashes.

resumebool, default=False

Whether to resume training the model from existing checkpoints and existing experiment state.

clean_resetbool, default=False

Whether to remove model directory when CheckpointNotFoundError is raised.

Returns:
ParallelConfig

Running configuration of the trial.

dict[str, float], None

If exception raised (Except for LossDivergedError and TrainPlateauError), this will be None object. Otherwise, this will be a dictionary of metrics.

TrialState

A TrialState object indicating the state of the trial job

  • If LossDivergedError or TrainPlateauError is raised while training, returned state will be TrialState.PRUNED_POOR_PERFORMANCE

  • If DuplicateRunError, RuntimeError (with message 'CUDA out of memory'), or CheckpointNotFoundError (with clean_reset=True) is raised while training, returned state will be TrialState.RECOVERABLE_ERROR

  • If other types of error or CheckpointNotFoundError (with clean_reset=False) is raised, returned state will be TrialState.FAIL

Prototype Trainer module#

class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#

Bases: object

Manages resources for Prototyping.

Raises:
RuntimeError

If experiment directory is not defined in the running configuration.

Attributes:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

__init__(wrapper: ModelWrapper, run_config: RunConfig)[source]#

Initialize model wrapper and running configuration for the model.

Parameters:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

evaluate()[source]#

Run model evaluation on the training results, sync evaluation results to external logging services (e.g Google cloud storage, other remote servers).

Returns:
metricsTrainMetrics

Metrics returned after evaluation.

launch(debug: bool = False)[source]#

Initialize the data state of the wrapper and train the model inside the wrapper, then sync training results (logged to experiment directory while training) with external logging services (e.g Google cloud storage, other remote servers).

Parameters:
debugbool, default=False

Whether to train model in debug mode.

Returns:
metricsTrainMetrics

Metrics returned after training.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

smoke_test(config=None)[source]#

Run a smoke test training process on the model.

Parameters:
configRunConfig

Running configuration for the model.

sync()[source]#

Syncs training artifacts with external logging services.

Experiment and Optuna state module#

class ablator.main.state.Base(**kwargs: Any)[source]#

Bases: DeclarativeBase

class ablator.main.state.OptunaState(storage: str, study_name, optim_metrics: dict[str, ablator.main.configs.Optim], search_algo, search_space: dict[str, ablator.main.configs.SearchSpace])[source]#

Bases: object

A class to store the state of the Optuna study.

Attributes:
optim_metricsOrderedDict

The ordered dictionary containing the names of the metrics to optimize and their direction (min or max).

search_spacedict of str to SearchSpace

The search space containing the parameters to sample from.

optuna_studyoptuna.study.Study

The Optuna study object.

__init__(storage: str, study_name, optim_metrics: dict[str, ablator.main.configs.Optim], search_algo, search_space: dict[str, ablator.main.configs.SearchSpace]) None[source]#

Initialize the Optuna state.

Parameters:
storagestr

The path to the database URL or a database URL.

study_namestr

The name of the study.

optim_metricsdict[str, Optim]

A dictionary of metric names and their optimization directions (either 'max' or 'min').

search_algoSearchAlgo

The search algorithm to use ('random' or 'tpe').

search_spacedict[str, SearchSpace]

A dictionary of parameter names and their corresponding SearchSpace instances.

Raises:
NotImplementedError

If the specified search algorithm is not implemented.

ValueError

If optim_metrics is None.

Notes

For tuning, add an attribute to the searchspace whose name is the name of the hyperparameter and whose value is the search space eg. search_space = {"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 0.1], value_type="float")}

sample_trial()[source]#

Sample a new set of trial parameters.

Returns:
Tuple[int, dict[str, Any]]

A tuple of the trial number and a dictionary of parameter names and their corresponding values.

update_trial(trial_num: int, metrics: dict[str, float] | None, state: TrialState)[source]#

Update the state of a trial when it is completed with metrics.

Parameters:
trial_numint

The trial number.

metricsdict[str, float] or None

A dictionary of metric names and their corresponding values, or None if the trial is not complete.

stateTrialState

The state of the trial.

Raises:
RuntimeError

If metrics is None and state is COMPLETE.

class ablator.main.state.Trial(**kwargs)[source]#

Bases: Base

class ablator.main.state.TrialState(value)[source]#

Bases: IntEnum

An enumeration of possible states for a trial with more pruned states.

Attributes:
RUNNINGint

A trial that has been succesfully scheduled to run

COMPLETEint

Succesfully completed trial

PRUNEDint

Trial pruned because of various reasons

FAILint

Trial that produced an error during execution

WAITINGint

Trial that has been sampled but is not scheduled to run yet

PRUNED_INVALIDint

Trial that was pruned during sampling as it was invalid

PRUNED_DUPLICATEint

Trial that was sampled but was already present

PRUNED_POOR_PERFORMANCEint

Trial that was pruned during execution for poor performance

RECOVERABLE_ERRORint

Trial that was pruned during execution for poor performance

RESUMEint

Trial that needs to be resumed

Methods

to_optuna_state: Convert this TrialState to an OptunaTrialState.

to_optuna_state() TrialState | None[source]#

Convert this TrialState to an OptunaTrialState.

Returns:
OptunaTrialState | None:

Corresponding OptunaTrialState or None if the state is not applicable.

class ablator.main.state.ExperimentState(experiment_dir: Path, config: ParallelConfig, logger: FileLogger | None = None, resume: bool = False)[source]#

Bases: object

__init__(experiment_dir: Path, config: ParallelConfig, logger: FileLogger | None = None, resume: bool = False) None[source]#

Initializes the ExperimentState. Initialize databases for storing training states and optuna states Create trials based on total num of trials specified in config

Parameters:
experiment_dirPath

The directory where the experiment data will be stored.

configParallelConfig

The configuration object that defines the experiment settings.

loggerFileLogger, optional

The logger to use for outputting experiment logs. If not specified, a dummy logger will be used.

resumebool, optional

Whether to resume a previously interrupted experiment. Default is False.

Raises:
RuntimeError

If the specified search_space parameter is not found in the configuration.

AssertionError

If config.search_space is empty.

RuntimeError

if the optuna database already exists and resume is False.

property n_trials_remaining: int#

We get all trials as it can include, trials at different states. We exclude the unscheduled trials (pending), and the ones that are pruned during sampling.

property pruned_errored_trials: list[dict[str, Any]]#

Error trials can not be initialized to a configuration and such as return the kwargs parameters.

sample_trials(n_trials_to_sample: int) list[ablator.main.configs.ParallelConfig] | None[source]#

Sample n trials from the search space and update database. Number n is the miniumn value of n_trials_to_sample and n_trials_remaining. n_trials_remaining is the number of total_trials (defined in config) minus the number of trials that have been sampled.

Parameters:
n_trials_to_sampleint

The number of trials to sample.

Returns:
list[ParallelConfig] | None

The list of sampled trials.

static search_space_dot_path(trial: ParallelConfig) dict[str, Any][source]#

Returns a dictionary of parameter names and their corresponding values for a given trial.

Parameters:
trialParallelConfig

The trial object to get the search space dot paths from.

Returns:
dict[str, Any]

A dictionary of parameter names and their corresponding values.

Examples

>>> search_space = {"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 0.1], value_type="float")}
>>> {"train_config.optimizer_config.arguments.lr": 0.1}
static tune_trial_str(trial: ParallelConfig) str[source]#

Generate a string representation of a trial object.

Parameters:
trialParallelConfig

The trial object to generate a string representation for.

Returns:
str

A string representation of the trial object.

update_trial_state(config_uid: str, metrics: dict[str, float] | None = None, state: TrialState = TrialState.RUNNING) None[source]#

Update the state of a trial in both the Experiment database and tell Optuna.

Parameters:
config_uidstr

The uid of the trial to update.

metricsdict[str, float] | None, optional

The metrics of the trial, by default None.

stateTrialState, optional

The state of the trial, by default TrialState.RUNNING.

Examples

>>> experiment.update_trial_state("fje_2211", {"loss": 0.1}, TrialState.COMPLETED)
ablator.main.state.augment_trial_kwargs(trial_kwargs: dict[str, Any], augmentation: dict[str, Any]) dict[str, Any][source]#

Augment the trial_kwargs with additional key-value pairs specified in the augmentation dictionary.

Parameters:
trial_kwargsdict

The dictionary containing the key-value pairs to be augmented.

augmentationdict

The dictionary containing the additional key-value pairs.

Returns:
dict

The augmented dictionary.

Examples

>>> trial_kwargs = {'a': 1, 'b': 2}
>>> augmentation = {'c': 3, 'd.e': 4}
>>> augment_trial_kwargs(trial_kwargs, augmentation)
{'a': 1, 'b': 2, 'c': 3, 'd': {'e': 4}}
ablator.main.state.parse_metrics(metric_directions: dict[str, ablator.main.configs.Optim], metrics: dict[str, float]) dict[str, float][source]#

Convert metrics to ordered dictionary of float values using their direction (minimize or maximize).

Parameters:
metric_directionsdict

The ordered dictionary containing the directions of the metrics (minimize or maximize).

metricsdict

The dictionary containing the metric values.

Returns:
OrderedDict

The ordered dictionary of metric values converted to float using their direction.

Examples

>>> metric_directions = OrderedDict([('a', 'max'), ('b', 'min')])
>>> metrics = {'a': 1, 'b': None}
>>> parse_metrics(metric_directions, metrics)
OrderedDict([('a', 1.0), ('b', inf)])
ablator.main.state.sample_trial_params(optuna_trial: Trial, search_space: dict[str, ablator.main.configs.SearchSpace]) dict[str, Any][source]#

Sample parameter values from the search space for a given Optuna trial.

Parameters:
optuna_trialoptuna.Trial

The Optuna trial object.

search_spacedict of str to SearchSpace

The search space containing the parameters to sample from.

Returns:
dict of str to any

The dictionary containing the sampled parameter values.

Raises:
ValueError

If the search space contains an invalid SearchSpace object.

Examples

>>> optuna_trial = self.optuna_study.ask()
>>> search_space = {'x': SearchSpace(value_type=SearchType.numerical, value_range=(0.0, 1.0)),
... 'y': SearchSpace(categorical_values=['a', 'b']),
... 'z': SearchSpace(value_type=SearchType.integer, value_range=(1, 10))}
>>> sample_trial_params(optuna_trial, search_space)
{'x': 0.030961748695615783, 'y': 'a', 'z': 9}

Module contents#