Source code for lightning_pose.data.dali

"""Data pipelines based on efficient video reading by nvidia dali package."""

import os
from typing import Literal

import numpy as np
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import torch
from nvidia.dali import pipeline_def
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
from omegaconf import DictConfig

from lightning_pose.data import _IMAGENET_MEAN, _IMAGENET_STD
from lightning_pose.data.datatypes import MultiviewUnlabeledBatchDict, UnlabeledBatchDict
from lightning_pose.data.utils import count_frames

# to ignore imports for sphix-autoapidoc
__all__ = [
    "video_pipe",
    "LitDaliWrapper",
    "PrepareDALI",
]


# cannot typecheck due to way pipeline_def decorator consumes additional args
[docs] @pipeline_def def video_pipe( filenames: list[str] | str, resize_dims: list[int] | None = None, random_shuffle: bool = False, sequence_length: int = 16, pad_sequences: bool = True, initial_fill: int = 16, normalization_mean: list[float] = _IMAGENET_MEAN, normalization_std: list[float] = _IMAGENET_STD, name: str = "reader", step: int = 1, pad_last_batch: bool = False, imgaug: str = "default", skip_vfr_check: bool = True, # arguments consumed by decorator: # batch_size, # num_threads, # device_id ) -> tuple: """Generic video reader pipeline that loads videos, resizes, augments, and normalizes. Args: filenames: list of absolute paths of video files to feed through pipeline resize_dims: [height, width] to resize raw frames random_shuffle: True to grab random batches of frames from videos; False to sequential read seed: random seed when `random_shuffle` is True sequence_length: number of frames to load per sequence pad_sequences: allows creation of incomplete sequences if there is an insufficient number of frames at the very end of the video initial_fill: size of the buffer that is used for random shuffling normalization_mean: mean values in (0, 1) to subtract from each channel normalization_std: standard deviation values to subtract from each channel name: pipeline name, used to string together DataNode elements step: number of frames to advance on each read pad_last_batch: imgaug: string identifying which imgaug pipeline to use; "default", "dlc", "dlc-top-down" skip_vfr_check: don't check for variable frame rates, can throw errors with small diffs Returns: pipeline object to be fed to DALIGenericIterator data augmentation matrix (returned so that geometric transforms can be undone) size of video frames, used for bbox """ # turn all inputs into a list of list of strings to be most general # first list: over views (might only be one) # second list: over videos/sessions if isinstance(filenames, list) and isinstance(filenames[0], str): filenames = [filenames] elif isinstance(filenames, str): filenames = [[filenames]] assert isinstance(filenames, list) and isinstance(filenames[0], list) # loop over views (can be only one) frames_list = [] transform_list = [] orig_size_list = [] for f, filename_list in enumerate(filenames): video = fn.readers.video( device="gpu", filenames=filename_list, random_shuffle=random_shuffle, sequence_length=sequence_length, step=step, pad_sequences=pad_sequences, initial_fill=initial_fill, normalized=False, name=f"{name}_{f}", dtype=types.DALIDataType.FLOAT, pad_last_batch=pad_last_batch, # Important for context loaders file_list_include_preceding_frame=True, # to get rid of dali warnings skip_vfr_check=skip_vfr_check, ) orig_size = fn.shapes(video) if resize_dims: video = fn.resize(video, size=resize_dims) if imgaug == "dlc" or imgaug == "dlc-top-down": size = (resize_dims[0] / 2, resize_dims[1] / 2) center = size # / 2 # rotate + scale angle = fn.random.uniform(range=(-10, 10)) transform = fn.transforms.rotation(angle=angle, center=center) scale = fn.random.uniform(range=(0.8, 1.2), shape=2) transform = fn.transforms.scale(transform, scale=scale, center=center) video = fn.warp_affine(video, matrix=transform, fill_value=0, inverse_map=False) # brightness contrast: contrast = fn.random.uniform(range=(0.75, 1.25)) brightness = fn.random.uniform(range=(0.75, 1.25)) video = fn.brightness_contrast(video, brightness=brightness, contrast=contrast) # # shot noise factor = fn.random.uniform(range=(0.0, 10.0)) video = fn.noise.shot(video, factor=factor) # jpeg compression # quality = fn.random.uniform(range=(50, 100), dtype=types.INT32) # video = fn.jpeg_compression_distortion(video, quality=quality) else: # choose arbitrary scalar (rather than a matrix) so that downstream operations know # there is no geometric transforms to undo transform = np.array([-1]) # video pixel range is [0, 255]; transform it to [0, 1]. # happens naturally in the torchvision transform to tensor. video = video / 255.0 # permute dimensions and normalize to imagenet statistics frames = fn.crop_mirror_normalize( video, output_layout="FCHW", mean=normalization_mean, std=normalization_std, ) frames_list.append(frames) transform_list.append(transform) orig_size_list.append(orig_size) return (*frames_list, *transform_list, *orig_size_list)
[docs] class LitDaliWrapper(DALIGenericIterator): """wrapper around a DALI pipeline to get batches for ptl."""
[docs] def __init__( self, *args, eval_mode: Literal["train", "predict"], num_iters: int = 1, do_context: bool = False, **kwargs ) -> None: """Wrapper around DALIGenericIterator to get batches for pl. Args: eval_mode num_iters: number of enumerations of dataloader (should be computed outside for now; should be fixed by lightning/dali teams) do_context: whether model/loader use 5-frame context or not """ self.num_iters = num_iters self.do_context = do_context self.eval_mode = eval_mode self.batch_sampler = 1 # hack to get around DALI-ptl issue # call parent super().__init__(*args, **kwargs)
def __len__(self) -> int: return self.num_iters @staticmethod def _dali_output_to_tensors( batch: list ) -> UnlabeledBatchDict | MultiviewUnlabeledBatchDict: # always batch_size=1 if len(batch[0].keys()) == 3: # single view pipeline # shape (sequence_length, 3, H, W) frames = batch[0]["frames"][0, :, :, :, :] # shape (1,) or (2, 3) transforms = batch[0]["transforms"][0] # get frame size if batch[0]["frame_size"][0, -1] == 3: # order is seq_len,H,W,C height = batch[0]["frame_size"][0, 1] width = batch[0]["frame_size"][0, 2] else: # order is seq_len,C,H,W height = batch[0]["frame_size"][0, 2] width = batch[0]["frame_size"][0, 3] bbox = torch.tensor([0, 0, height, width], device=frames.device).repeat( (frames.shape[0], 1)) return UnlabeledBatchDict( frames=frames, transforms=transforms, bbox=bbox, is_multiview=False, ) else: # multiview pipeline # final shape: ("seq_len", "num_views", "RGB":3, "image_height", "image_width") frames = torch.stack( [batch[0][key][0, :, :, :, :] for key in batch[0].keys() if "transforms" not in key and "frame_size" not in key], dim=1, ) # final shape: ("num_views", "h":2, "w":3) transforms = torch.stack( [batch[0][key] for key in batch[0].keys() if "transforms" in key], dim=0, ) # final shape: ("seq_len", "num_views * xyhw") bbox = torch.cat([ torch.tensor([ 0, 0, batch[0][key][0, 1], batch[0][key][0, 2], ], device=frames.device ) for key in batch[0].keys() if "frame_size" in key ], dim=0).repeat(frames.shape[0], 1) return MultiviewUnlabeledBatchDict( frames=frames, transforms=transforms, bbox=bbox, is_multiview=True, ) def __next__(self) -> UnlabeledBatchDict | MultiviewUnlabeledBatchDict: batch = super().__next__() return self._dali_output_to_tensors(batch=batch)
[docs] class PrepareDALI(object): """All the DALI stuff in one place. Big picture: this will initialize the pipes and dataloaders for both training and prediction. """
[docs] def __init__( self, train_stage: Literal["predict", "train"], model_type: Literal["base", "context"], filenames: list[str] | list[list[str]], resize_dims: list[int], dali_config: dict | DictConfig = None, imgaug: str | None = "default", num_threads: int = 1, ) -> None: # determine if we have a multiview pipeline if isinstance(filenames, list) and isinstance(filenames[0], list): self.multiview = True else: self.multiview = False # make sure `filenames` is a list of existing video files if isinstance(filenames, list) and isinstance(filenames[0], str): filenames = [filenames] for view_list in filenames: for vid in view_list: if not os.path.exists(vid) or not os.path.isfile(vid): raise FileNotFoundError(f"{vid} is not a video file!") self.train_stage = train_stage self.model_type = model_type self.filenames = filenames self.resize_dims = resize_dims self.dali_config = dali_config self.num_threads = num_threads self.frame_count = sum(map(count_frames, filenames[0])) self._pipe_dict: dict = self._setup_pipe_dict(self.filenames, imgaug)
@property def num_iters(self) -> int: # count frames # "how many times should we enumerate the data loader?" # sum across vids pipe_dict = self._pipe_dict[self.train_stage][self.model_type] if self.model_type == "base": return int(np.ceil(self.frame_count / (pipe_dict["sequence_length"]))) elif self.model_type == "context": if pipe_dict["step"] == 1: # 0-5, 1-6, 2-7, 3-8, 4-9 ... return int(np.ceil(self.frame_count / (pipe_dict["batch_size"]))) elif pipe_dict["step"] == pipe_dict["sequence_length"]: # taking the floor because during training we don't care about missing the last # non-full batch. we prefer having fewer batches but valid. return int( np.floor( self.frame_count / (pipe_dict["batch_size"] * pipe_dict["sequence_length"]) ) ) elif (pipe_dict["batch_size"] == 1) and ( pipe_dict["step"] == (pipe_dict["sequence_length"] - 4) ): # the case of prediction with a single sequence at a time and internal model # reshapes if pipe_dict["step"] <= 0: raise ValueError( "step cannot be 0, please modify " "cfg.dali.context.predict.sequence_length to be > 4" ) # remove the first sequence data_except_first_batch = self.frame_count - pipe_dict["sequence_length"] # calculate how many "step"s are needed to get at least to the end # count back the first sequence num_iters = int(np.ceil(data_except_first_batch / pipe_dict["step"])) + 1 return num_iters else: raise NotImplementedError def _setup_pipe_dict( self, filenames: list[str] | list[list[str]], imgaug: str, ) -> dict[str, dict]: """All of the pipeline args in one place.""" # When running with multiple GPUs, the LOCAL_RANK variable correctly # contains the DDP Local Rank, which is also the cuda device index. device_id = int(os.environ.get("LOCAL_RANK", "0")) dict_args = { "predict": {"context": {}, "base": {}}, "train": {"context": {}, "base": {}}, } gen_cfg = self.dali_config.get("general", {"seed": 123456}) # base (vanilla single-frame model), train pipe args base_train_cfg = self.dali_config["base"]["train"] dict_args["train"]["base"] = { "filenames": filenames, "resize_dims": self.resize_dims, "sequence_length": base_train_cfg["sequence_length"], "step": base_train_cfg["sequence_length"], "batch_size": 1, # Multi-GPU strategy is to have each GPU randomize differently. "seed": gen_cfg["seed"] + device_id, "num_threads": self.num_threads, "device_id": device_id, "random_shuffle": True, "imgaug": imgaug, } # base (vanilla single-frame model), predict pipe args base_pred_cfg = self.dali_config["base"]["predict"] dict_args["predict"]["base"] = { "filenames": filenames, "resize_dims": self.resize_dims, "sequence_length": base_pred_cfg["sequence_length"], "step": base_pred_cfg["sequence_length"], "batch_size": 1, # Multi-GPU strategy is to have each GPU randomize differently. "seed": gen_cfg["seed"] + device_id, "num_threads": self.num_threads, "device_id": device_id, "random_shuffle": False, "name": "reader", "pad_sequences": True, "imgaug": "default", # no imgaug when predicting } # context (five-frame) model, predict pipe args context_pred_cfg = self.dali_config["context"]["predict"] dict_args["predict"]["context"] = { "filenames": filenames, "resize_dims": self.resize_dims, "sequence_length": context_pred_cfg["sequence_length"], "step": context_pred_cfg["sequence_length"] - 4, "batch_size": 1, "num_threads": self.num_threads, "device_id": device_id, "random_shuffle": False, "name": "reader", # Multi-GPU strategy is to have each GPU randomize differently. "seed": gen_cfg["seed"] + device_id, "pad_sequences": True, # "pad_last_batch": True, "imgaug": "default", # no imgaug when predicting } # context (five-frame) model, train pipe args # grab a single sequence of frames, will resize into 5-frame chunks at the # representation level inside BaseFeatureExtractor # note: reusing the batch size argument context_train_cfg = self.dali_config["context"]["train"] dict_args["train"]["context"] = { "filenames": filenames, "resize_dims": self.resize_dims, "sequence_length": context_train_cfg["batch_size"], "step": context_train_cfg["batch_size"], "batch_size": 1, # Multi-GPU strategy is to have each GPU randomize differently. "seed": gen_cfg["seed"] + device_id, "num_threads": self.num_threads, "device_id": device_id, "random_shuffle": True, "imgaug": imgaug, } # our floor above should prevent us from getting to the very final batch. return dict_args def _get_dali_pipe(self): """ Return a DALI pipe with predefined args. """ pipe_args = self._pipe_dict[self.train_stage][self.model_type] pipe = video_pipe(**pipe_args) return pipe def _setup_dali_iterator_args(self) -> dict: """Builds args for Lightning iterator. If you want to extract more outputs from DALI, e.g., optical flow, you should also add this in the "output_map" arg """ dict_args = { "predict": {"context": {}, "base": {}}, "train": {"context": {}, "base": {}}, } if self.multiview: # video pipeline returns a big tuple: frame, transforms, and frame size for each view output_map = \ [f"frames_{i}" for i in range(len(self.filenames))] \ + [f"transforms_{i}" for i in range(len(self.filenames))] \ + [f"frame_size_{i}" for i in range(len(self.filenames))] else: output_map = ["frames", "transforms", "frame_size"] # base models (single-frame) dict_args["train"]["base"] = { "num_iters": self.num_iters, "eval_mode": "train", "do_context": False, "output_map": output_map, "last_batch_policy": LastBatchPolicy.PARTIAL, "auto_reset": True, } dict_args["predict"]["base"] = { "num_iters": self.num_iters, "eval_mode": "predict", "do_context": False, "output_map": output_map, "last_batch_policy": LastBatchPolicy.FILL, "last_batch_padded": False, "auto_reset": False, # if we have multiple readers, if we select only 1 here there's an error "reader_name": "reader_0" if not self.multiview else None, } # 5-frame context models dict_args["train"]["context"] = { "num_iters": self.num_iters, "eval_mode": "train", "do_context": True, "output_map": output_map, "last_batch_policy": LastBatchPolicy.PARTIAL, "auto_reset": True, } # taken from datamodules.py. only difference is that we need to do context dict_args["predict"]["context"] = { "num_iters": self.num_iters, "eval_mode": "predict", "do_context": True, "output_map": output_map, "last_batch_policy": LastBatchPolicy.FILL, # LastBatchPolicy.PARTIAL, "last_batch_padded": False, "auto_reset": False, # if we have multiple readers, if we select only 1 here there's an error "reader_name": "reader_0" if not self.multiview else None, } return dict_args
[docs] def __call__(self) -> LitDaliWrapper: """ Returns a LightningWrapper object. """ pipe = self._get_dali_pipe() args = self._setup_dali_iterator_args() return LitDaliWrapper(pipe, **args[self.train_stage][self.model_type])