Metrics package#

Submodules#

Main Metrics module#

exception ablator.modules.metrics.main.LossDivergedError[source]#

Bases: Exception

class ablator.modules.metrics.main.TrainMetrics(*args, batch_limit=30, memory_limit=100000000.0, evaluation_functions: dict[str, collections.abc.Callable] | None = None, moving_average_limit=3000, tags: list[str] | None = None, static_aux_metrics: dict[str, Any] | None = None, moving_aux_metrics: Iterable[str] | None = None)[source]#

Bases: object

Stores and manages predictions and calculates metrics given some custom evaluation functions. Makes batch-updates Manages memory limits applies evaluation functions. provides cached or online updates on the train loss

__init__(*args, batch_limit=30, memory_limit=100000000.0, evaluation_functions: dict[str, collections.abc.Callable] | None = None, moving_average_limit=3000, tags: list[str] | None = None, static_aux_metrics: dict[str, Any] | None = None, moving_aux_metrics: Iterable[str] | None = None)[source]#

Initialize the train metrics settings

Parameters:
batch_limitint, optional

Maximum number of batches to keep for every category of data (specified by tags), so only batch_limit number of latest batches is stored for each of the categories. Default is 30.

memory_limitint, optional

Maximum memory (in bytes) of batches to keep for every category of data (specified by tags). Every time this limit is exceeded, batch_limit will be reduced by 1. Default is 1e8.

evaluation_functionsdict[str, Callable], optional

A dictionary of key-value pairs, keys are evaluation function names, values are callable evaluation functions, e.g mean, sum. Note that arguments to this Callable must match with names of prediction batches that the model returns. So if model prediction over a batch looks like this: {“preds”: <batch of predictions>, “labels”: <batch of predicted labels>}, then callable’s arguments should be preds and labels, e.g evaluation_functions= {"mean": lambda preds, labels: np.mean(preads) + np.mean(labels)}. Default is None.

moving_average_limitint, optional

The maximum number of values allowed to store moving average metrics. Default is 3000.

tagslist[str], optional

A list of tags to specify predictions results from different categories, a sample use case is to categorize different sets of data (train, evaluation, test sets), e.g: tags=["train", "val"] This will be combined with evaluation function names and moving auxiliary metrics names to create metrics. For example, if evaluation_functions.keys() = ["mean"], moving_aux_metrics = ["loss"], then metrics that will be tracked are: train_mean, train_loss, val_mean, val_loss. Default is ["train"].

static_aux_metricsdict[str, ty.Any], optional

A dictionary of static metrics, those with their initial value that are updated manually, such as learning rate, best loss, total steps, etc. Keys of this dictionary are static metric names, while values is a proper initial value. Default is None.

moving_aux_metricsIterable[str], optional

A list of metrics, those we update with their moving average, such as loss. Default is None.

Examples

Initialize an object of TrainMetrics:

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"mean": lambda x: np.mean(x)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.to_dict() # metrics are set to np.nan if it's not updated yet
{
    "train_mean": np.nan, "train_loss": np.nan,
    "val_mean": np.nan, "val_loss": np.nan,
    "lr": 1.0
}
append_batch(*args, tag, **kwargs)[source]#

Appends a batch of predictions to a specific set.

Parameters:
tagstr

A tag that specifies which set of predictions to evaluate.

**kwargsdict

A dictionary of key-value pairs, where key is type of prediction (e.g predictions, labels), and value is a batch of prediction values. Note that the passed keys in **kwrags must match arguments in evaluation functions arguments in Callable in evaluation_functions when we initialize TrainMetrics object.

Raises:
AssertionError

If any positional arguments are passed, or if the provided tag is not a defined metric category.

Notes

this is because it is easy to mix up the order of pred, labels and tags

Examples

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"mean": lambda labels: np.mean(labels)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.append_batch(labels=np.array([100]), tag="train")
>>> train_metrics.append_batch(labels=np.array([0] * 3), tag="train")
>>> train_metrics.append_batch(labels=np.array([50]), tag="val")
evaluate(tag, reset=True, update_ma=True)[source]#

Apply evaluation_functions to a set of predictions specified by tag argument. Possibly update the moving averages (only those associated with evaluation functions, not moving auxiliary metrics) with the evaluated results, or reset the predictions.

Parameters:
tagstr

A tag that specifies which set of predictions to evaluate.

resetbool, optional

A flag that indicates whether to reset the predictions to empty after evaluation. Default is True.

update_mabool, optional

A flag that indicates whether to update the moving averages after evaluation. Default is True.

Returns:
metricsdict

A dictionary of metric values calculated from the predictions.

Examples

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"mean": lambda pred: np.mean(pred)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.append_batch(pred=np.array([100]), tag="val")
>>> train_metrics.evaluate("val", reset=False, update=True) # val_mean is updated to
    mean among batch mean values: (100 / 1) / 1 = 100.0
>>> train_metrics.append_batch(pred=np.array([0] * 3), tag="val")

For the following examples, the current evaluation result is: (100 + 0 + 0 + 0) / 4 = 25 (which is returned by evaluate() function), and since update=True, val_mean is updated to: (100.0 + 25) / 2 = 62.5 (we can see this if we use .to_dict())

>>> train_metrics.evaluate("val", reset=True, update=True)
{'mean': 25.0}
>>> train_metrics.to_dict()
{'val_mean': 62.5}
reset(tag: str)[source]#

Reset to empty all prediction sequences (e.g predictions, labels) in a set of predictions specified by tag argument.

Parameters:
tagstr

A tag that specifies which set of predictions to be reset.

Examples

>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"sum": lambda pred: np.mean(pred)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.append_batch(pred=np.array([1] * 3), tag="train")    # e.g add 3 predictions all of class 1
>>> train_metrics.reset(tag="train")
to_dict()[source]#

Get all metrics, i.e moving aux metrics, moving evaluation metrics, and static aux metrics. Note that moving attributes will be an averaged value of all previous batches. Metrics are set to np.nan if it’s never updated before

Examples

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"mean": lambda preds: np.mean(preds)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 0.75},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.append_batch(preds=np.array([100]), tag="val")
>>> train_metrics.evaluate("val", reset=False, update=True)
>>> train_metrics.to_dict()
{
    'train_mean': np.nan, 'train_loss': np.nan,
    'val_mean': 100.0, 'val_loss': np.nan,
    'lr': 0.75
}
>>> train_metrics.append_batch(preds=np.array([0] * 3), tag="val")
>>> train_metrics.evaluate("val", reset=True, update=True)
>>> train_metrics.to_dict()
{
    'train_mean': np.nan, 'train_loss': np.nan,
    'val_mean': 62.5, 'val_loss': np.nan,
    'lr': 0.75
}
update_ma_metrics(metric_dict: dict[str, Any], tag: str)[source]#

Keep the moving average aux metrics updated with new values from metric_dict. This method will append the new metric values to its collection of metric results. A sample use case for this method is when we finish a training iteration, we can add the training loss to loss moving average metric collection on tag train, aka the train set.

Parameters:
metric_dictdict[str, ty.Any]

A dictionary containing the moving average metric values to update.

tagstr

A tag that specifies which set of predictions to update metric values.

Raises:
AssertionError:

If metric_dict has metrics that are not in moving_aux_metrics.

Examples

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"sum": lambda x: np.mean(x)},
...     moving_average_limit=100,
...     tags=["train", "val"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.to_dict()
{
    "train_sum": np.nan, "train_loss": np.nan,
    "val_sum": np.nan, "val_loss": np.nan,
    "lr": 1.0
}
>>> train_metrics.update_ma_metrics({"loss": 0.35}, tag="val")
>>> train_metrics.to_dict()
{
    "train_sum": np.nan, "train_loss": np.nan,
    "val_sum": np.nan, "val_loss": 0.35,
    "lr": 1.0
}
update_static_metrics(metric_dict: dict[str, Any])[source]#

Update static metrics with the values in metric_dict.

Parameters:
metric_dictdict[str, ty.Any]

A dictionary containing the static metrics values to update.

Raises:
AssertionError:

If metric_dict has metrics that are not in static_aux_attributes.

Notes

Not all metric_dict items must be preset from static_aux_attributes. i.e. metric_dict.items - static_aux_attributes =/= static_aux_attributes - metric_dict.items

Examples

>>> from ablator.modules.metrics.main import TrainMetrics
>>> train_metrics = TrainMetrics(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"mean": lambda x: np.mean(x)},
...     moving_average_limit=100,
...     tags=["train"],
...     static_aux_metrics={"lr": 1.0},
...     moving_aux_metrics={"loss"},
... )
>>> train_metrics.to_dict()
{
    "train_mean": np.nan, "train_loss": np.nan,
    "lr": 1.0
}
>>> train_metrics.update_static_metrics({"lr": 0.3})
>>> train_metrics.to_dict()
{
    "train_mean": np.nan, "train_loss": np.nan,
    "lr": 0.3
}

Metric Stores module#

class ablator.modules.metrics.stores.ArrayStore(batch_limit: int = 30, memory_limit: int | None = 100000000)[source]#

Bases: Sequence

Base class for manipulations (storing, getting, resetting) of batches of values.

__init__(batch_limit: int = 30, memory_limit: int | None = 100000000)[source]#

Initialize the storage settings.

Parameters:
batch_limitint, optional

The maximum number of batches of values to store for this single store. Default is 30.

memory_limitint or None, optional

The maximum memory allowed for all values in bytes. Default is 1e8.

Examples

>>> from ablator.modules.metrics.stores import ArrayStore
>>> train_metrics = ArrayStore(
...     batch_limit=50,
...     memory_limit=1000
... )
append(val: ndarray | float | int)[source]#

Appends a batch of values, or a single value, constrained on the limits. If after appending a new batch, batch_limit is exceeded, only batch_limit number of latest batches is kept. If memory limit is exceeded, batch_limit will be reduced.

Parameters:
valnp.ndarray or float or int

The data, can be a batch of data, or a scalar.

Raises:
AssertionError:

If appended value is not numpy array, an integer, or a float number.

Examples

The following example shows a case where batch limit is exceeded (100 values/batches to be appended while only 10 is allowed)

>>> from ablator.modules.metrics.stores import ArrayStore
>>> array_store = ArrayStore(
...     batch_limit=10,
...     memory_limit=1000
... )
>>> for i in range(100):
>>>     array_store.append(int(i))
>>> array_store.arr
[90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
>>> array_store.limit
10

This example shows a case where memory limit is exceeded. As soon as the 5th value is appended, memory of the list is 104 > 100), so batch_limit is set to the length of the store so far (which is 5) reduced by 1, which equals to 4. Therefore, from then on, only 4 values/batches is allowed.

>>> array_store = ArrayStore(
...     batch_limit=10,
...     memory_limit=100
... )
>>> for i in range(100):
>>>     array_store.append(int(i))
>>> array_store.arr
[96, 97, 98, 99]
>>> array_store.limit
4
get() ndarray[source]#

Returns a flatten array of values

Examples

>>> from ablator.modules.metrics.stores import ArrayStore
>>> array_store = ArrayStore(
...     batch_limit=10,
...     memory_limit=1000
... )
>>> for i in range(100):
>>>     array_store.append(np.array([int(i)]))
>>> array_store.get()
[[90 91 92 93 94 95 96 97 98 99]]
reset()[source]#

Reset list of values to empty.

Examples

>>> from ablator.modules.metrics.stores import ArrayStore
>>> array_store = ArrayStore(
...     batch_limit=10,
...     memory_limit=1000
... )
>>> for i in range(100):
>>>     array_store.append(int(i))
>>> array_store.arr
[90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
>>> array_store.reset()
>>> array_store.arr
[]
class ablator.modules.metrics.stores.MovingAverage(batch_limit: int = 30, memory_limit: int | None = 100000000)[source]#

Bases: ArrayStore

This class is used to store moving average metrics

append(val: ndarray | Tensor | float | int)[source]#

Appends a batch of values, or a single value, constrained on the limits.

Parameters:
valty.Union[np.ndarray, torch.Tensor, float, int]

The data to be appended

Raises:
ValueError:

If appended value is of required type, or if val is not a scalar.

Examples

>>> from ablator.modules.metrics.stores import MovingAverage
>>> ma_store = MovingAverage()
>>> for i in range(100):
>>>     ma_store.append(np.array([int(i)]))
>>> ma_store.arr
[70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85,
86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
class ablator.modules.metrics.stores.PredictionStore(batch_limit: int = 30, memory_limit: int = 100000000, moving_average_limit: int = 3000, evaluation_functions: dict[str, collections.abc.Callable] | None = None)[source]#

Bases: object

A class for storing prediction scores. This allows for evaluating prediction results using evaluation functions

__init__(batch_limit: int = 30, memory_limit: int = 100000000, moving_average_limit: int = 3000, evaluation_functions: dict[str, collections.abc.Callable] | None = None)[source]#

Initialize the storage settings.

Parameters:
batch_limitint, optional

Maximum number of batches to keep for each array store corresponding to each category of prediction outputs (e.g preds, labels), so only batch_limit number of latest batches is stored per set of array store. Default is 30.

memory_limitint or None, optional

Maximum memory (in bytes) of batches to keep for each array store corresponding to each category of prediction outputs (e.g preds, labels). Default is 1e8.

moving_average_limitint, optional

The maximum number of values allowed to store moving average metrics. Default is 3000.

evaluation_functionsdict[str, Callable], optional

A dictionary of key-value pairs, keys are evaluation function names, values are callable evaluation functions, e.g mean, sum. Note that arguments to this Callable must match with names of prediction batches that the model returns. So if model prediction over a batch looks like this: {"preds": <batch of predictions>, "labels": <batch of predicted labels>}, then callable’s arguments should be preds and labels, e.g evaluation_functions= {"mean": lambda preds, labels: np.mean(preads) + np.mean(labels)}. Default is None.

Examples

>>> from ablator.modules.metrics.stores import PredictionStore
>>> pred_store = PredictionStore(
...     batch_limit=10,
...     memory_limit=1000,
...     moving_average_limit=1000,
...     evaluation_functions={"mean": lambda x: np.mean(x)}
... )
append(**batches: dict[str, numpy.ndarray])[source]#

Appends batches of values, constrained on the limits.

Parameters:
**batchesdict[str, np.ndarray]

A dictionary of key-value pairs, where key is type of prediction (e.g predictions, labels), and value is a batch of prediction values. Note that the passed keys in **batches must match arguments in evaluation functions arguments in the Callable in evaluation_functions when we initialize PredictionStore object.

Raises:
AssertionError

If passed keys do not match arguments in evaluation functions, or when batches among the keys are different in size.

Examples

>>> from ablator.modules.metrics.stores import PredictionStore
>>> pred_store = PredictionStore(
...     batch_limit=10,
...     memory_limit=1000,
...     moving_average_limit=1000,
...     evaluation_functions={"mean": lambda preds, labels: np.mean(preds) + np.mean(labels)}
... )
>>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,1]))
evaluate() dict[str, float][source]#

Apply evaluation_functions to predictions sets, e.g preds, labels.

Returns:
metricsdict

A dictionary of metric values calculated from different sets of predictions.

Raises:
AssertionError

If passed keys do not match arguments in evaluation functions.

ValueError

If evaluation result is not a numeric scalar.

Examples

>>> from ablator.modules.metrics.main import PredictionStore
>>> pred_store = PredictionStore(
...     batch_limit=30,
...     evaluation_functions={"mean": lambda preds, labels: np.mean(preds) + np.mean(labels)
...     moving_average_limit=100
... )
>>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,3]))
>>> pred_store.evaluate()
{'mean': 5.333333333333334}
reset()[source]#

Reset to empty all prediction sequences (e.g predictions, labels).

Examples

>>> from ablator.modules.metrics.main import PredictionStore
>>> pred_store = PredictionStore(
...     batch_limit=30,
...     memory_limit=None,
...     evaluation_functions={"sum": lambda pred: np.mean(pred)},
...     moving_average_limit=100
... )
>>> pred_store.append(preds=np.array([4,3,0]), labels=np.array([5,1,3]))
>>> pred_store.reset()

Module contents#