"""Function that evaluates trained models in the parametric UMAP family."""
from __future__ import annotations
import logging
import pathlib
from collections import OrderedDict
from datetime import datetime
import pandas as pd
import pytorch_lightning as lightning
import torch.utils.data
from .. import models, transforms
from ..common import validators
from ..datasets.parametric_umap import ParametricUMAPDataset
logger = logging.getLogger(__name__)
[docs]
def eval_parametric_umap_model(
model_name: str,
model_config: dict,
dataset_path: str | pathlib.Path,
checkpoint_path: str | pathlib.Path,
output_dir: str | pathlib.Path,
batch_size: int,
num_workers: int,
transform_params: dict | None = None,
dataset_params: dict | None = None,
split: str = "test",
device: str | None = None,
) -> None:
"""Evaluate a trained model.
Parameters
----------
model_name : str
Model name, must be one of vak.models.registry.MODEL_NAMES.
model_config : dict
Model configuration in a ``dict``,
as loaded from a .toml file,
and used by the model method ``from_config``.
dataset_path : str, pathlib.Path
Path to dataset, e.g., a csv file generated by running ``vak prep``.
checkpoint_path : str, pathlib.Path
Path to directory with checkpoint files saved by Torch, to reload model
output_dir : str, pathlib.Path
Path to location where .csv files with evaluation metrics should be saved.
batch_size : int
Number of samples per batch fed into model.
num_workers : int
Number of processes to use for parallel loading of data.
Argument to torch.DataLoader. Default is 2.
transform_params: dict, optional
Parameters for data transform.
Passed as keyword arguments.
Optional, default is None.
dataset_params: dict, optional
Parameters for dataset.
Passed as keyword arguments.
Optional, default is None.
split : str
Split of dataset on which model should be evaluated.
One of {'train', 'val', 'test'}. Default is 'test'.
device : str
Device on which to work with model + data.
Defaults to 'cuda' if torch.cuda.is_available is True.
"""
# ---- pre-conditions ----------------------------------------------------------------------------------------------
for path, path_name in zip(
(checkpoint_path,),
("checkpoint_path",),
):
if path is not None: # because `spect_scaler_path` is optional
if not validators.is_a_file(path):
raise FileNotFoundError(
f"value for ``{path_name}`` not recognized as a file: {path}"
)
dataset_path = pathlib.Path(dataset_path)
if not dataset_path.exists() or not dataset_path.is_dir():
raise NotADirectoryError(
f"`dataset_path` not found or not recognized as a directory: {dataset_path}"
)
logger.info(
f"Loading metadata from dataset path: {dataset_path}",
)
if not validators.is_a_directory(output_dir):
raise NotADirectoryError(
f"value for ``output_dir`` not recognized as a directory: {output_dir}"
)
# ---- get time for .csv file --------------------------------------------------------------------------------------
timenow = datetime.now().strftime("%y%m%d_%H%M%S")
# ---------------- load data for evaluation ------------------------------------------------------------------------
if transform_params is None:
transform_params = {}
item_transform = transforms.defaults.get_default_transform(
model_name, "eval", transform_params
)
if dataset_params is None:
dataset_params = {}
val_dataset = ParametricUMAPDataset.from_dataset_path(
dataset_path=dataset_path,
split=split,
transform=item_transform,
**dataset_params,
)
val_loader = torch.utils.data.DataLoader(
dataset=val_dataset,
shuffle=False,
batch_size=batch_size,
num_workers=num_workers,
)
# ---------------- do the actual evaluating ------------------------------------------------------------------------
model = models.get(
model_name,
model_config,
input_shape=val_dataset.shape,
)
logger.info(f"running evaluation for model: {model_name}")
model.load_state_dict_from_path(checkpoint_path)
# TODO: use accelerator parameter, https://github.com/vocalpy/vak/issues/691
if device == "cuda":
accelerator = "gpu"
else:
accelerator = "auto"
trainer_logger = lightning.loggers.TensorBoardLogger(save_dir=output_dir)
trainer = lightning.Trainer(accelerator=accelerator, logger=trainer_logger)
# TODO: check for hasattr(model, test_step) and if so run test
# below, [0] because validate returns list of dicts, length of no. of val loaders
metric_vals = trainer.validate(model, dataloaders=val_loader)[0]
for metric_name, metric_val in metric_vals.items():
logger.info(f"{metric_name}: {metric_val:0.5f}")
# create a "DataFrame" with just one row which we will save as a csv;
# the idea is to be able to concatenate csvs from multiple runs of eval
row = OrderedDict(
[
("model_name", model_name),
("checkpoint_path", checkpoint_path),
("dataset_path", dataset_path),
]
)
# order metrics by name to be extra sure they will be consistent across runs
row.update(sorted([(k, v) for k, v in metric_vals.items()]))
# pass index into dataframe, needed when using all scalar values (a single row)
# throw away index below when saving to avoid extra column
eval_df = pd.DataFrame(row, index=[0])
eval_csv_path = output_dir.joinpath(f"eval_{model_name}_{timenow}.csv")
logger.info(f"saving csv with evaluation metrics at: {eval_csv_path}")
eval_df.to_csv(
eval_csv_path, index=False
) # index is False to avoid having "Unnamed: 0" column when loading