Source code for vak.eval.parametric_umap

"""Function that evaluates trained models in the parametric UMAP family."""
from __future__ import annotations

import logging
import pathlib
from collections import OrderedDict
from datetime import datetime

import pandas as pd
import pytorch_lightning as lightning
import torch.utils.data

from .. import models, transforms
from ..common import validators
from ..datasets.parametric_umap import ParametricUMAPDataset

logger = logging.getLogger(__name__)


[docs] def eval_parametric_umap_model( model_name: str, model_config: dict, dataset_path: str | pathlib.Path, checkpoint_path: str | pathlib.Path, output_dir: str | pathlib.Path, batch_size: int, num_workers: int, transform_params: dict | None = None, dataset_params: dict | None = None, split: str = "test", device: str | None = None, ) -> None: """Evaluate a trained model. Parameters ---------- model_name : str Model name, must be one of vak.models.registry.MODEL_NAMES. model_config : dict Model configuration in a ``dict``, as loaded from a .toml file, and used by the model method ``from_config``. dataset_path : str, pathlib.Path Path to dataset, e.g., a csv file generated by running ``vak prep``. checkpoint_path : str, pathlib.Path Path to directory with checkpoint files saved by Torch, to reload model output_dir : str, pathlib.Path Path to location where .csv files with evaluation metrics should be saved. batch_size : int Number of samples per batch fed into model. num_workers : int Number of processes to use for parallel loading of data. Argument to torch.DataLoader. Default is 2. transform_params: dict, optional Parameters for data transform. Passed as keyword arguments. Optional, default is None. dataset_params: dict, optional Parameters for dataset. Passed as keyword arguments. Optional, default is None. split : str Split of dataset on which model should be evaluated. One of {'train', 'val', 'test'}. Default is 'test'. device : str Device on which to work with model + data. Defaults to 'cuda' if torch.cuda.is_available is True. """ # ---- pre-conditions ---------------------------------------------------------------------------------------------- for path, path_name in zip( (checkpoint_path,), ("checkpoint_path",), ): if path is not None: # because `spect_scaler_path` is optional if not validators.is_a_file(path): raise FileNotFoundError( f"value for ``{path_name}`` not recognized as a file: {path}" ) dataset_path = pathlib.Path(dataset_path) if not dataset_path.exists() or not dataset_path.is_dir(): raise NotADirectoryError( f"`dataset_path` not found or not recognized as a directory: {dataset_path}" ) logger.info( f"Loading metadata from dataset path: {dataset_path}", ) if not validators.is_a_directory(output_dir): raise NotADirectoryError( f"value for ``output_dir`` not recognized as a directory: {output_dir}" ) # ---- get time for .csv file -------------------------------------------------------------------------------------- timenow = datetime.now().strftime("%y%m%d_%H%M%S") # ---------------- load data for evaluation ------------------------------------------------------------------------ if transform_params is None: transform_params = {} item_transform = transforms.defaults.get_default_transform( model_name, "eval", transform_params ) if dataset_params is None: dataset_params = {} val_dataset = ParametricUMAPDataset.from_dataset_path( dataset_path=dataset_path, split=split, transform=item_transform, **dataset_params, ) val_loader = torch.utils.data.DataLoader( dataset=val_dataset, shuffle=False, batch_size=batch_size, num_workers=num_workers, ) # ---------------- do the actual evaluating ------------------------------------------------------------------------ model = models.get( model_name, model_config, input_shape=val_dataset.shape, ) logger.info(f"running evaluation for model: {model_name}") model.load_state_dict_from_path(checkpoint_path) # TODO: use accelerator parameter, https://github.com/vocalpy/vak/issues/691 if device == "cuda": accelerator = "gpu" else: accelerator = "auto" trainer_logger = lightning.loggers.TensorBoardLogger(save_dir=output_dir) trainer = lightning.Trainer(accelerator=accelerator, logger=trainer_logger) # TODO: check for hasattr(model, test_step) and if so run test # below, [0] because validate returns list of dicts, length of no. of val loaders metric_vals = trainer.validate(model, dataloaders=val_loader)[0] for metric_name, metric_val in metric_vals.items(): logger.info(f"{metric_name}: {metric_val:0.5f}") # create a "DataFrame" with just one row which we will save as a csv; # the idea is to be able to concatenate csvs from multiple runs of eval row = OrderedDict( [ ("model_name", model_name), ("checkpoint_path", checkpoint_path), ("dataset_path", dataset_path), ] ) # order metrics by name to be extra sure they will be consistent across runs row.update(sorted([(k, v) for k, v in metric_vals.items()])) # pass index into dataframe, needed when using all scalar values (a single row) # throw away index below when saving to avoid extra column eval_df = pd.DataFrame(row, index=[0]) eval_csv_path = output_dir.joinpath(f"eval_{model_name}_{timenow}.csv") logger.info(f"saving csv with evaluation metrics at: {eval_csv_path}") eval_df.to_csv( eval_csv_path, index=False ) # index is False to avoid having "Unnamed: 0" column when loading