Main package#
Subpackages#
- Main model package
- Submodules
- Main Model module
CheckpointNotFoundError
EvaluationError
LogStepError
ModelBase
ModelBase.__init__()
ModelBase.checkpoint()
ModelBase.config_parser()
ModelBase.create_model()
ModelBase.current_epoch
ModelBase.epoch_len
ModelBase.eval_itr
ModelBase.evaluate()
ModelBase.evaluation_functions()
ModelBase.load_checkpoint()
ModelBase.log_itr
ModelBase.make_dataloaders()
ModelBase.save_dict()
ModelBase.train()
ModelBase.train_stats
ModelBase.uid
TrainPlateauError
- Model Wrapper module
ModelWrapper
ModelWrapper.__init__()
ModelWrapper.apply_loss()
ModelWrapper.aux_metrics()
ModelWrapper.checkpoint()
ModelWrapper.config_parser()
ModelWrapper.create_model()
ModelWrapper.create_optimizer()
ModelWrapper.create_scaler()
ModelWrapper.create_scheduler()
ModelWrapper.eval()
ModelWrapper.evaluate()
ModelWrapper.evaluation_functions()
ModelWrapper.load_checkpoint()
ModelWrapper.log()
ModelWrapper.log_step()
ModelWrapper.make_dataloader_test()
ModelWrapper.make_dataloader_train()
ModelWrapper.make_dataloader_val()
ModelWrapper.make_dataloaders()
ModelWrapper.mock_train()
ModelWrapper.model_step()
ModelWrapper.reset_optimizer_scheduler()
ModelWrapper.save_dict()
ModelWrapper.status_message()
ModelWrapper.to_device()
ModelWrapper.total_steps
ModelWrapper.train()
ModelWrapper.train_loop()
ModelWrapper.train_step()
ModelWrapper.update_status()
ModelWrapper.validation_loop()
- Module contents
Submodules#
Model Configuration module#
- class ablator.main.configs.ModelConfig(*args, **kwargs)[source]#
Bases:
ConfigBase
Model configuration. When initializing a model, the config is passed to the model constructor.
- config_class#
alias of
ModelConfig
- class ablator.main.configs.ParallelConfig(*args, **kwargs)[source]#
Bases:
RunConfig
Parallel training configuration.
{"val_loss": "min"}
- Attributes:
- total_trials: int
total number of trials.
- concurrent_trials: int
number of trials to run concurrently.
- search_space: Dict[SearchSpace]
search space for hyperparameter search, eg.
{"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 10], value_type="int"),}
- optim_metrics: Dict[Optim]
metrics to optimize, eg.
{"val_loss": "min"}
- search_algo: SearchAlgo = SearchAlgo.tpe
type of search algorithm.
- ignore_invalid_params: bool = False
whether to ignore invalid parameters when sampling.
- remote_config: Optional[RemoteConfig] = None
remote storage configuration.
- gcp_config: Optional[GcpConfig] = None
gcp configuration.
- config_class#
alias of
ParallelConfig
- class ablator.main.configs.RunConfig(*args, **kwargs)[source]#
Bases:
ConfigBase
Base configuration for running an experiment.
- Attributes:
- experiment_dir: Optional[str] = None
location to store experiment artifacts.
- random_seed: Optional[int] = None
random seed.
- train_config: TrainConfig
training configuration. (check
TrainConfig
for more details)- model_config: ModelConfig
model configuration. (check
ModelConfig
for more details)- keep_n_checkpoints: int = 3
number of latest checkpoints to keep.
- tensorboard: bool = True
whether to use tensorboardLogger.
- amp: bool = True
whether to use automatic mixed precision when running on gpu.
- device: str = “cuda” or “cpu”
device to run on.
- verbose: Literal[“console”, “tqdm”, “silent”] = “console”
verbosity level.
- eval_subsample: float = 1
fraction of the dataset to use for evaluation.
- metrics_n_batches: int = 32
max number of batches stored in every tag(train, eval, test) for evaluation.
- metrics_mb_limit: int = 100
max number of megabytes stored in every tag(train, eval, test) for evaluation.
- early_stopping_iter: Optional[int] = None
The maximum allowed difference between the current iteration and the last iteration with the best metric before applying early stopping. Early stopping will be triggered if the difference
(current_itr - best_itr)
exceedsearly_stopping_iter
. If set toNone
, early stopping will not be applied.- eval_epoch: float = 1
The epoch interval between two evaluations.
- log_epoch: float = 1
The epoch interval between two logging.
- init_chkpt: Optional[str] = None
path to a checkpoint to initialize the model with.
- warm_up_epochs: float = 0
number of epochs marked as warm up epochs.
- divergence_factor: float = 100
if
cur_loss > best_loss > divergence_factor
, the model is considered to have diverged.
- property uid: str#
Get the unique identifier for the configuration object.
- Returns:
- str
The unique identifier for the configuration object.
- class ablator.main.configs.SearchSpace(*args, **kwargs)[source]#
Bases:
ConfigBase
Search space configuration.
- config_class#
alias of
SearchSpace
- class ablator.main.configs.TrainConfig(*args, **kwargs)[source]#
Bases:
ConfigBase
Training configuration.
- Attributes:
- dataset: str
dataset name. maybe used in custom dataset loader functions.
- batch_size: int
batch size.
- epochs: int
number of epochs to train.
- optimizer_config: OptimizerConfig
optimizer configuration. (check
OptimizerConfig
for more details)- scheduler_config: Optional[SchedulerConfig]
scheduler configuration. (check
SchedulerConfig
for more details)- rand_weights_init: bool = True
whether to initialize model weights randomly.
- config_class#
alias of
TrainConfig
Multi-process Trainer module#
- class ablator.main.mp.ParallelTrainer(*args, run_config: ParallelConfig, **kwargs)[source]#
Bases:
ProtoTrainer
A class for parallelizing training of models of different configurations with ray. Metrics of these models are for optuna to tune hyperparameters. They are also logged to optuna storage.
- Attributes:
- run_configParallelConfig
Running configuration for parallel training.
- devicestr
The device to use for training.
- experiment_dirPath
The directory that stores experiment information (optuna storage, experiment state database).
- loggerFileLogger
The logger that writes messages to a file and prints them to the console.
- experiment_stateExperimentState
This attribute manages optuna trials.
- total_trialsint
Number of trials to run.
- gpu_mem_bottleneckint
The minimum memory capacity of all available gpus.
- cpufloat
The number of cpu used per trial.
- gpufloat
The number of gpu used per trial.
- total_mem_usageint
Total amount of memory usage.
- __init__(*args, run_config: ParallelConfig, **kwargs)[source]#
Initialize
ParallelTrainer
using config fromrun_config
.- Parameters:
- run_configParallelConfig
The runtime configuration for this trainer.
- *argstuple
Extra arguments used for
ProtoTrainer
- **kwargsdict, optional
Extra arguments to
ProtoTrainer
, this can be{'wrapper': ModelWrapper}
.
- evaluate()[source]#
Evaluate model performance in trials that are completed, using evaluation functions defined in the model wrapper. Evaluation results will be logged to the console and log files in the experiment directory. This method also synchronizes the experiment directory to Google cloud storage and remote servers.
- launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = 'auto', resume: bool = False)[source]#
Set up and launch the parallel training and tuning process. This includes:
prepare ray cluster for running optuna trials to tune hyperparameters.
if available, synchronize Google Cloud storage buckets to working directory defined in runtime configuration.
initialize optuna trials and add them to optuna storage and experiment state database for tracking training progress (or retrieve existing trials from optuna storage).
Trials initialized (or retrieved),
experiment_state.pending_trials
, will be pushed to ray nodes so they can be executed in parallel. After all trials have finished and progress is recorded in sqlite databases in the working directory, these changes will be synchronized back to the GCP nodes viarsync_up()
method.- Parameters:
- working_directorystr
The working directory that stores codes, modules that will be used by ray.
- auxilary_moduleslist[tys.ModuleType], None
A list of modules to be used as ray clusters’ working environment.
- ray_head_addressstr, default=’auto’
Ray cluster address.
- resumebool, default=False
Whether to resume training the model from existing checkpoints and existing experiment state.
- ablator.main.mp.parse_metrics(optim_direction: list[str], metrics: dict[str, float] | None)[source]#
Parse metrics to be optimized.
- Parameters:
- optim_direction: list[str]
The metrics to be optimized, defined in the
ParallelConfig
.- metrics: dict[str, float]
The metrics returned after a ray job finishes.
- Returns:
- dict[str, float]
A dictionary of metric names and their corresponding metric values.
- ablator.main.mp.parse_rsync_paths(rsynced_folder: Path | str, root_folder: Path | str | None = None) dict[str, pathlib.Path | str] [source]#
Parse the experiment directory that’s being in sync with remote servers (Google cloud storage, other remote nodes) and the root folder.
- Parameters:
- rsynced_folderPath, str
The experiment directory that’s being in sync with remote servers.
- root_folderPath, str, None, default=None
The root folder that contains all experiment directories.
- Returns:
- dict[str, Path]
A dictionary with 2 keys:
local_path
andremote_path
, which specifies the local directory and the remote path that will be in sync.
- ablator.main.mp.train_main_remote(model: ModelWrapper, run_config: ParallelConfig, mp_logger: FileLogger, root_dir: Path, fault_tollerant: bool = True, crash_exceptions_types: list[type] | None = None, resume: bool = False, clean_reset: bool = False) tuple[ablator.main.configs.ParallelConfig, dict[str, float] | None, ablator.main.state.TrialState] [source]#
The trial job that will be executed remotely at a ray node. This is where model training happens. In addition, experiment directory will be synchronized to the Google Cloud storage and remote nodes. Synchronization is done via GcpConfig and RemoteConfig
rsync_up()
methods. Refer to documentation of these 2 classes for more details.- Parameters:
- modelModelWrapper
The ModelWrapper that is used to train a model.
- run_configParallelConfig
Runtime configuration for this trial.
- mp_loggerFileLogger
The file logger that’s used to log training progress.
- root_dirPath
The root directory that stores experiment states (experiment directory).
- fault_tollerantbool, optional, default=True
Whether to tollerate crashes, aka to cease execution when the ray job crashes.
- crash_exceptions_typeslist[type], None, optional, default=None
Types of exceptions that are considered as crashes.
- resumebool, default=False
Whether to resume training the model from existing checkpoints and existing experiment state.
- clean_resetbool, default=False
Whether to remove model directory when
CheckpointNotFoundError
is raised.
- Returns:
- ParallelConfig
Running configuration of the trial.
- dict[str, float], None
If exception raised (Except for LossDivergedError and TrainPlateauError), this will be
None
object. Otherwise, this will be a dictionary of metrics.- TrialState
A TrialState object indicating the state of the trial job
If
LossDivergedError
orTrainPlateauError
is raised while training, returned state will beTrialState.PRUNED_POOR_PERFORMANCE
If
DuplicateRunError
,RuntimeError
(with message'CUDA out of memory'
), orCheckpointNotFoundError
(withclean_reset=True
) is raised while training, returned state will beTrialState.RECOVERABLE_ERROR
If other types of error or
CheckpointNotFoundError
(withclean_reset=False
) is raised, returned state will beTrialState.FAIL
Prototype Trainer module#
- class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#
Bases:
object
Manages resources for Prototyping.
- Raises:
- RuntimeError
If experiment directory is not defined in the running configuration.
- Attributes:
- wrapperModelWrapper
The main model wrapper.
- run_configRunConfig
Running configuration for the model.
- __init__(wrapper: ModelWrapper, run_config: RunConfig)[source]#
Initialize model wrapper and running configuration for the model.
- Parameters:
- wrapperModelWrapper
The main model wrapper.
- run_configRunConfig
Running configuration for the model.
- evaluate()[source]#
Run model evaluation on the training results, sync evaluation results to external logging services (e.g Google cloud storage, other remote servers).
- Returns:
- metricsTrainMetrics
Metrics returned after evaluation.
- launch(debug: bool = False)[source]#
Initialize the data state of the wrapper and train the model inside the wrapper, then sync training results (logged to experiment directory while training) with external logging services (e.g Google cloud storage, other remote servers).
- Parameters:
- debugbool, default=False
Whether to train model in debug mode.
- Returns:
- metricsTrainMetrics
Metrics returned after training.
- pre_train_setup()[source]#
Used to prepare resources to avoid stalling during training or when resources are shared between trainers.
Experiment and Optuna state module#
- class ablator.main.state.OptunaState(storage: str, study_name, optim_metrics: dict[str, ablator.main.configs.Optim], search_algo, search_space: dict[str, ablator.main.configs.SearchSpace])[source]#
Bases:
object
A class to store the state of the Optuna study.
- Attributes:
- optim_metricsOrderedDict
The ordered dictionary containing the names of the metrics to optimize and their direction (min or max).
- search_spacedict of str to SearchSpace
The search space containing the parameters to sample from.
- optuna_studyoptuna.study.Study
The Optuna study object.
- __init__(storage: str, study_name, optim_metrics: dict[str, ablator.main.configs.Optim], search_algo, search_space: dict[str, ablator.main.configs.SearchSpace]) None [source]#
Initialize the Optuna state.
- Parameters:
- storagestr
The path to the database URL or a database URL.
- study_namestr
The name of the study.
- optim_metricsdict[str, Optim]
A dictionary of metric names and their optimization directions (either
'max'
or'min'
).- search_algoSearchAlgo
The search algorithm to use (
'random'
or'tpe'
).- search_spacedict[str, SearchSpace]
A dictionary of parameter names and their corresponding SearchSpace instances.
- Raises:
- NotImplementedError
If the specified search algorithm is not implemented.
- ValueError
If
optim_metrics
isNone
.
Notes
For tuning, add an attribute to the searchspace whose name is the name of the hyperparameter and whose value is the search space eg.
search_space = {"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 0.1], value_type="float")}
- sample_trial()[source]#
Sample a new set of trial parameters.
- Returns:
- Tuple[int, dict[str, Any]]
A tuple of the trial number and a dictionary of parameter names and their corresponding values.
- update_trial(trial_num: int, metrics: dict[str, float] | None, state: TrialState)[source]#
Update the state of a trial when it is completed with metrics.
- Parameters:
- trial_numint
The trial number.
- metricsdict[str, float] or None
A dictionary of metric names and their corresponding values, or
None
if the trial is not complete.- stateTrialState
The state of the trial.
- Raises:
- RuntimeError
If
metrics
isNone
andstate
isCOMPLETE
.
- class ablator.main.state.TrialState(value)[source]#
Bases:
IntEnum
An enumeration of possible states for a trial with more pruned states.
- Attributes:
- RUNNINGint
A trial that has been succesfully scheduled to run
- COMPLETEint
Succesfully completed trial
- PRUNEDint
Trial pruned because of various reasons
- FAILint
Trial that produced an error during execution
- WAITINGint
Trial that has been sampled but is not scheduled to run yet
- PRUNED_INVALIDint
Trial that was pruned during sampling as it was invalid
- PRUNED_DUPLICATEint
Trial that was sampled but was already present
- PRUNED_POOR_PERFORMANCEint
Trial that was pruned during execution for poor performance
- RECOVERABLE_ERRORint
Trial that was pruned during execution for poor performance
- RESUMEint
Trial that needs to be resumed
Methods
to_optuna_state: Convert this TrialState to an OptunaTrialState.
- class ablator.main.state.ExperimentState(experiment_dir: Path, config: ParallelConfig, logger: FileLogger | None = None, resume: bool = False)[source]#
Bases:
object
- __init__(experiment_dir: Path, config: ParallelConfig, logger: FileLogger | None = None, resume: bool = False) None [source]#
Initializes the ExperimentState. Initialize databases for storing training states and optuna states Create trials based on total num of trials specified in config
- Parameters:
- experiment_dirPath
The directory where the experiment data will be stored.
- configParallelConfig
The configuration object that defines the experiment settings.
- loggerFileLogger, optional
The logger to use for outputting experiment logs. If not specified, a dummy logger will be used.
- resumebool, optional
Whether to resume a previously interrupted experiment. Default is
False
.
- Raises:
- RuntimeError
If the specified
search_space
parameter is not found in the configuration.- AssertionError
If
config.search_space
is empty.- RuntimeError
if the optuna database already exists and
resume
isFalse
.
- property n_trials_remaining: int#
We get all trials as it can include, trials at different states. We exclude the unscheduled trials (pending), and the ones that are pruned during sampling.
- property pruned_errored_trials: list[dict[str, Any]]#
Error trials can not be initialized to a configuration and such as return the kwargs parameters.
- sample_trials(n_trials_to_sample: int) list[ablator.main.configs.ParallelConfig] | None [source]#
Sample
n
trials from the search space and update database. Numbern
is the miniumn value ofn_trials_to_sample
andn_trials_remaining
.n_trials_remaining
is the number oftotal_trials
(defined in config) minus the number of trials that have been sampled.- Parameters:
- n_trials_to_sampleint
The number of trials to sample.
- Returns:
- list[ParallelConfig] | None
The list of sampled trials.
- static search_space_dot_path(trial: ParallelConfig) dict[str, Any] [source]#
Returns a dictionary of parameter names and their corresponding values for a given trial.
- Parameters:
- trialParallelConfig
The trial object to get the search space dot paths from.
- Returns:
- dict[str, Any]
A dictionary of parameter names and their corresponding values.
Examples
>>> search_space = {"train_config.optimizer_config.arguments.lr": SearchSpace(value_range=[0, 0.1], value_type="float")} >>> {"train_config.optimizer_config.arguments.lr": 0.1}
- static tune_trial_str(trial: ParallelConfig) str [source]#
Generate a string representation of a trial object.
- Parameters:
- trialParallelConfig
The trial object to generate a string representation for.
- Returns:
- str
A string representation of the trial object.
- update_trial_state(config_uid: str, metrics: dict[str, float] | None = None, state: TrialState = TrialState.RUNNING) None [source]#
Update the state of a trial in both the Experiment database and tell Optuna.
- Parameters:
- config_uidstr
The uid of the trial to update.
- metricsdict[str, float] | None, optional
The metrics of the trial, by default
None
.- stateTrialState, optional
The state of the trial, by default
TrialState.RUNNING
.
Examples
>>> experiment.update_trial_state("fje_2211", {"loss": 0.1}, TrialState.COMPLETED)
- ablator.main.state.augment_trial_kwargs(trial_kwargs: dict[str, Any], augmentation: dict[str, Any]) dict[str, Any] [source]#
Augment the
trial_kwargs
with additional key-value pairs specified in the augmentation dictionary.- Parameters:
- trial_kwargsdict
The dictionary containing the key-value pairs to be augmented.
- augmentationdict
The dictionary containing the additional key-value pairs.
- Returns:
- dict
The augmented dictionary.
Examples
>>> trial_kwargs = {'a': 1, 'b': 2} >>> augmentation = {'c': 3, 'd.e': 4} >>> augment_trial_kwargs(trial_kwargs, augmentation) {'a': 1, 'b': 2, 'c': 3, 'd': {'e': 4}}
- ablator.main.state.parse_metrics(metric_directions: dict[str, ablator.main.configs.Optim], metrics: dict[str, float]) dict[str, float] [source]#
Convert metrics to ordered dictionary of float values using their direction (minimize or maximize).
- Parameters:
- metric_directionsdict
The ordered dictionary containing the directions of the metrics (minimize or maximize).
- metricsdict
The dictionary containing the metric values.
- Returns:
- OrderedDict
The ordered dictionary of metric values converted to float using their direction.
Examples
>>> metric_directions = OrderedDict([('a', 'max'), ('b', 'min')]) >>> metrics = {'a': 1, 'b': None} >>> parse_metrics(metric_directions, metrics) OrderedDict([('a', 1.0), ('b', inf)])
- ablator.main.state.sample_trial_params(optuna_trial: Trial, search_space: dict[str, ablator.main.configs.SearchSpace]) dict[str, Any] [source]#
Sample parameter values from the search space for a given Optuna trial.
- Parameters:
- optuna_trialoptuna.Trial
The Optuna trial object.
- search_spacedict of str to SearchSpace
The search space containing the parameters to sample from.
- Returns:
- dict of str to any
The dictionary containing the sampled parameter values.
- Raises:
- ValueError
If the search space contains an invalid
SearchSpace
object.
Examples
>>> optuna_trial = self.optuna_study.ask() >>> search_space = {'x': SearchSpace(value_type=SearchType.numerical, value_range=(0.0, 1.0)), ... 'y': SearchSpace(categorical_values=['a', 'b']), ... 'z': SearchSpace(value_type=SearchType.integer, value_range=(1, 10))} >>> sample_trial_params(optuna_trial, search_space) {'x': 0.030961748695615783, 'y': 'a', 'z': 9}