"""PCA class to assist with computing PCA losses."""
import warnings
from typing import Literal
import numpy as np
import torch
from omegaconf import ListConfig
from sklearn.decomposition import PCA
from sklearn.decomposition._pca import _infer_dimension
from sklearn.utils._array_api import _convert_to_numpy, get_namespace
from sklearn.utils.extmath import stable_cumsum, svd_flip
from torchtyping import TensorType
from typeguard import typechecked
from lightning_pose.data.datamodules import BaseDataModule, UnlabeledDataModule
from lightning_pose.data.datasets import MultiviewHeatmapDataset
from lightning_pose.data.utils import DataExtractor
from lightning_pose.losses.helpers import EmpiricalEpsilon, convert_dict_values_to_tensors
# to ignore imports for sphix-autoapidoc
__all__ = [
"KeypointPCA",
"ComponentChooser",
"format_multiview_data_for_pca",
]
[docs]
class KeypointPCA(object):
"""Class to collect data from a dataloader and compute PCA params."""
[docs]
def __init__(
self,
loss_type: Literal["pca_singleview", "pca_multiview"],
data_module: UnlabeledDataModule | BaseDataModule,
components_to_keep: int | float | None = 0.99,
empirical_epsilon_percentile: float = 90.0,
mirrored_column_matches: ListConfig | list | None = None,
columns_for_singleview_pca: ListConfig | list | None = None,
device: Literal["cuda", "cpu"] | torch.device = "cpu",
centering_method: Literal["mean", "median"] | None = None,
) -> None:
self.loss_type = loss_type
self.data_module = data_module
self.components_to_keep = components_to_keep
self.empirical_epsilon_percentile = empirical_epsilon_percentile
# check if this is a mirrored or true multiview dataset
# the former will be a list of lists, the latter a list of ints
if mirrored_column_matches is not None and isinstance(mirrored_column_matches[0], int):
if not isinstance(data_module.dataset, MultiviewHeatmapDataset):
raise ValueError(
"cfg.data.mirrored_column_matches must contain a list of indices for each "
"mirrored view"
)
num_views = len(data_module.dataset.view_names)
num_keypoints = data_module.dataset.num_keypoints / num_views # keypoints per view
mirrored_column_matches = [
(v * num_keypoints + np.array(mirrored_column_matches, dtype=int)).tolist()
for v in range(num_views)
]
self.mirrored_column_matches = mirrored_column_matches
self.columns_for_singleview_pca = columns_for_singleview_pca
self.pca_object = None
self.device = device
self.centering_method = centering_method
def _get_data(self) -> None:
self.data_arr, _ = DataExtractor(
data_module=self.data_module, cond="train", extract_images=False,
remove_augmentations=True,
)()
def _multiview_format(
self, data_arr: TensorType["num_original_samples", "num_original_dims"]
) -> TensorType["num_original_samples_times_num_selected_keypoints", "two_times_num_views"]:
# original shape = (batch, 2 * num_keypoints) where `num_keypoints` includes
# keypoints views from multiple views.
data_arr = data_arr.reshape(data_arr.shape[0], data_arr.shape[1] // 2, 2)
# shape = (batch_size, num_keypoints, 2)
data_arr = format_multiview_data_for_pca(
data_arr=data_arr,
mirrored_column_matches=self.mirrored_column_matches,
) # shape = (batch_size * num_keypoints, views * 2)
return data_arr
def _singleview_format(
self, data_arr: TensorType["num_original_samples", "num_original_dims"]
) -> (
TensorType["num_original_samples", "num_selected_dims"]
| TensorType["num_original_samples", "num_original_dims"]
):
# original shape = (batch, 2 * num_keypoints)
# reshape to (batch, num_keypoints, 2) to easily select columns
## [1,2,3,4]
## [[1,2]],[3,4]]
data_arr = data_arr.reshape(data_arr.shape[0], data_arr.shape[1] // 2, 2)
# optionally choose a subset of the keypoints for the singleview pca
if self.columns_for_singleview_pca is not None:
data_arr = data_arr[:, np.array(self.columns_for_singleview_pca), :]
if self.centering_method is not None:
if self.centering_method == "mean":
center = data_arr.mean(dim=1, keepdim=True)
elif self.centering_method == "median":
# When there are an even # of keypoints, Tensor.quantile averages
# while Tensor.median is non-deterministic.
center = data_arr.quantile(dim=1, q=0.5, keepdim=True)
else:
raise NotImplementedError(f"centering_method: {self.centering_method}")
data_arr = data_arr - center
# reshape back to (batch, num_selected_keypoints * 2)
data_arr = data_arr.reshape(data_arr.shape[0], data_arr.shape[1] * 2)
return data_arr
def _format_data(
self, data_arr: TensorType["num_original_samples", "num_original_dims"]
) -> TensorType:
# Union[
# TensorType["num_original_samples", "num_selected_dims"], # singleview filtered
# TensorType[
# "num_original_samples", "num_original_dims"
# ], # singleview unfiltered
# TensorType[
# "num_original_samples_times_num_selected_keypoints", "two_times_num_views"
# ]], # multiview
if self.loss_type == "pca_multiview":
return self._multiview_format(data_arr=data_arr)
elif self.loss_type == "pca_singleview":
return self._singleview_format(data_arr=data_arr)
else:
raise NotImplementedError
def _check_data(self) -> None:
# ensure we have more rows than columns after doing nan filtering
if self.data_arr.shape[0] < self.data_arr.shape[1]:
raise ValueError(
f"cannot fit PCA with {self.data_arr.shape[0]} samples < {self.data_arr.shape[1]} "
"observation dimensions"
)
def _fit_pca(self) -> None:
# fit PCA with the full number of comps on the cleaned-up data array.
# note self.data_arr is a tensor but sklearn's PCA function is fine with it
self.pca_object = NaNPCA(svd_solver="covariance_eigh")
self.pca_object.fit(X=self.data_arr)
def _choose_n_components(self) -> None:
if self.loss_type == "pca_multiview":
# all views can be explained by 3 (x,y,z) coords
# ignore self.components_to_keep_argument
self._n_components_kept = 3
if self._n_components_kept != self.components_to_keep:
warnings.warn(
f"for {self.loss_type} loss, you specified {self.components_to_keep} "
f"components_to_keep, but we will instead keep {self._n_components_kept} "
f"components"
)
elif self.loss_type == "pca_singleview":
if self.pca_object is not None:
self._n_components_kept = ComponentChooser(
fitted_pca_object=self.pca_object,
components_to_keep=self.components_to_keep,
).__call__()
assert isinstance(self._n_components_kept, int)
[docs]
def pca_prints(self) -> None:
# call after we've fitted a pca object and selected how many components to keep
pca_prints(self.pca_object, self.loss_type, self._n_components_kept)
def _set_parameter_dict(self) -> None:
self.parameters = { # dict with same keys as loss_param_dict
"mean": self.pca_object.mean_,
"kept_eigenvectors": self.pca_object.components_[:self._n_components_kept],
"discarded_eigenvectors": self.pca_object.components_[self._n_components_kept:],
}
self.parameters = convert_dict_values_to_tensors(self.parameters, self.device)
self.parameters["epsilon"] = EmpiricalEpsilon(
percentile=self.empirical_epsilon_percentile
)(loss=self.compute_reprojection_error())
[docs]
def reproject(
self, data_arr: TensorType["num_samples", "sample_dim"] | None = None
) -> TensorType["num_samples", "sample_dim"]:
"""Reproject a data array using the fixed pca parameters.
This transformation is implemented as in scikit-learn
https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/decomposition/_base.py#L125
"""
# assert that datashape is valid
if data_arr is None:
data_arr = self.data_arr.to(self.device)
evecs = self.parameters["kept_eigenvectors"]
mean = self.parameters["mean"].unsqueeze(0)
# assert that the observation dimension is equal for all objects
assert data_arr.shape[1] == evecs.shape[1] and evecs.shape[1] == mean.shape[1]
# verify that data array has observations divisible by 2 (corresponding to (x,y) coords)
assert data_arr.shape[1] % 2 == 0
# transform data into low-d space as in scikit learn's _BasePCA.transform()
# https://github.com/scikit-learn/scikit-learn/blob/37ac6788c9504ee409b75e5e24ff7d86c90c2ffb/sklearn/decomposition/_base.py#L97
centered_data = data_arr - mean
low_d_projection = centered_data @ evecs.T
# project back up to observation space, as in scikit learn's _BasePCA.inverse_transform()
# https://github.com/scikit-learn/scikit-learn/blob/37ac6788c9504ee409b75e5e24ff7d86c90c2ffb/sklearn/decomposition/_base.py#L125
reprojection = low_d_projection @ evecs + mean
return reprojection
[docs]
def compute_reprojection_error(
self, data_arr: TensorType["num_samples", "sample_dim"] | None = None
) -> TensorType["num_samples", "sample_dim_over_two"]:
"""returns error per 2D keypoint"""
if data_arr is None:
data_arr = self.data_arr.to(self.device)
reprojection = self.reproject(data_arr=data_arr)
diff = data_arr - reprojection
# reshape to get a per-keypoint differences:
diff_arr_per_keypoint = diff.reshape(diff.shape[0], diff.shape[1] // 2, 2)
# compute the prediction error for each 2D bodypart
reprojection_loss = torch.linalg.norm(diff_arr_per_keypoint, dim=2)
return reprojection_loss
[docs]
def __call__(self) -> None:
# save training data in self.data_arr
self._get_data()
# modify self.data_arr in the case of multiview pca, else keep the same
self.data_arr = self._format_data(data_arr=self.data_arr)
# check that we have more observations than observation-dimensions
self._check_data()
self._fit_pca()
self._choose_n_components()
self.pca_prints()
# save all the meaningful quantities
self._set_parameter_dict()
class NaNPCA(PCA):
def __init__(
self,
n_components: int | None = None,
*,
copy: bool = True,
whiten: bool = False,
svd_solver: str = "covariance_eigh",
tol: float = 0.0,
iterated_power: str = "auto",
n_oversamples: int = 10,
power_iteration_normalizer: str = "auto",
random_state: int | None = None,
) -> None:
# force solver to be "covariance_eigh"
super().__init__(
n_components=n_components,
copy=copy,
whiten=whiten,
svd_solver="covariance_eigh",
tol=tol,
iterated_power=iterated_power,
n_oversamples=n_oversamples,
power_iteration_normalizer=power_iteration_normalizer,
random_state=random_state,
)
def _fit(self, X: np.ndarray) -> tuple:
"""Dispatch to the right submethod depending on the chosen solver.
This is a modification of the sklearn fit function, with data validation
removed since we want to include NaNs.
"""
xp, is_array_api_compliant = get_namespace(X)
# Validate the data, without ever forcing a copy as any solver that
# supports sparse input data and the `covariance_eigh` solver are
# written in a way to avoid the need for any inplace modification of
# the input data contrary to the other solvers.
# The copy will happen
# later, only if needed, once the solver negotiation below is done.
# X = self._validate_data(
# X,
# dtype=[xp.float64, xp.float32],
# force_writeable=True,
# accept_sparse=("csr", "csc"),
# ensure_2d=True,
# copy=False,
# )
self._fit_svd_solver = self.svd_solver
assert self._fit_svd_solver == "covariance_eigh"
if self.n_components is None:
if self._fit_svd_solver != "arpack":
n_components = min(X.shape)
else:
n_components = min(X.shape) - 1
else:
n_components = self.n_components
# Call fit for full SVD
return self._fit_full(X, n_components, xp, is_array_api_compliant)
def _fit_full(self, X, n_components, xp, is_array_api_compliant):
"""Fit the model by computing full SVD on X.
This is a modification of the sklearn function that now allows for NaNs in the inputs.
"""
n_samples, n_features = X.shape
if n_components == "mle":
if n_samples < n_features:
raise ValueError(
"n_components='mle' is only supported if n_samples >= n_features"
)
elif not 0 <= n_components <= min(n_samples, n_features):
raise ValueError(
f"n_components={n_components} must be between 0 and "
f"min(n_samples, n_features)={min(n_samples, n_features)} with "
f"svd_solver={self._fit_svd_solver!r}"
)
self.mean_ = np.nanmean(X, axis=0)
# When X is a scipy sparse matrix, self.mean_ is a numpy matrix, so we need
# to transform it to a 1D array. Note that this is not the case when X
# is a scipy sparse array.
self.mean_ = xp.reshape(xp.asarray(self.mean_), (-1,))
# BEGIN ORIGINAL ELSE BLOCK
assert self._fit_svd_solver == "covariance_eigh"
x_is_centered = False
# -- original computation of covariance matrix
# C = X.T @ X
# C -= (
# n_samples
# * xp.reshape(self.mean_, (-1, 1))
# * xp.reshape(self.mean_, (1, -1))
# )
# C /= n_samples - 1
# -- masked computation of covariance matrix
C = np.ma.cov(np.ma.masked_invalid(X), rowvar=False).data
# -- picking up original sklearn pca code here
eigenvals, eigenvecs = xp.linalg.eigh(C)
# When X is a scipy sparse matrix, the following two datastructures
# are returned as instances of the soft-deprecated numpy.matrix
# class. Note that this problem does not occur when X is a scipy
# sparse array (or another other kind of supported array).
# TODO: remove the following two lines when scikit-learn only
# depends on scipy versions that no longer support scipy.sparse
# matrices.
eigenvals = xp.reshape(xp.asarray(eigenvals), (-1,))
eigenvecs = xp.asarray(eigenvecs)
eigenvals = xp.flip(eigenvals, axis=0)
eigenvecs = xp.flip(eigenvecs, axis=1)
# The covariance matrix C is positive semi-definite by
# construction. However, the eigenvalues returned by xp.linalg.eigh
# can be slightly negative due to numerical errors. This would be
# an issue for the subsequent sqrt, hence the manual clipping.
eigenvals[eigenvals < 0.0] = 0.0
explained_variance_ = eigenvals
# Re-construct SVD of centered X indirectly and make it consistent
# with the other solvers.
S = xp.sqrt(eigenvals * (n_samples - 1))
Vt = eigenvecs.T
U = None
# END ORIGINAL ELSE BLOCK
# flip eigenvectors' sign to enforce deterministic output
U, Vt = svd_flip(U, Vt, u_based_decision=False)
components_ = Vt
# Get variance explained by singular values
total_var = xp.sum(explained_variance_)
explained_variance_ratio_ = explained_variance_ / total_var
singular_values_ = xp.asarray(S, copy=True) # Store the singular values.
# Postprocess the number of components required
if n_components == "mle":
n_components = _infer_dimension(explained_variance_, n_samples)
elif 0 < n_components < 1.0:
# number of components for which the cumulated explained
# variance percentage is superior to the desired threshold
# side='right' ensures that number of features selected
# their variance is always greater than n_components float
# passed. More discussion in issue: #15669
if is_array_api_compliant:
# Convert to numpy as xp.cumsum and xp.searchsorted are not
# part of the Array API standard yet:
#
# https://github.com/data-apis/array-api/issues/597
# https://github.com/data-apis/array-api/issues/688
#
# Furthermore, it's not always safe to call them for namespaces
# that already implement them: for instance as
# cupy.searchsorted does not accept a float as second argument.
explained_variance_ratio_np = _convert_to_numpy(
explained_variance_ratio_, xp=xp
)
else:
explained_variance_ratio_np = explained_variance_ratio_
ratio_cumsum = stable_cumsum(explained_variance_ratio_np)
n_components = np.searchsorted(ratio_cumsum, n_components, side="right") + 1
# Compute noise covariance using Probabilistic PCA model
# The sigma2 maximum likelihood (cf. eq. 12.46)
if n_components < min(n_features, n_samples):
self.noise_variance_ = xp.mean(explained_variance_[n_components:])
else:
self.noise_variance_ = 0.0
self.n_samples_ = n_samples
self.n_components_ = n_components
# Assign a copy of the result of the truncation of the components in
# order to:
# - release the memory used by the discarded components,
# - ensure that the kept components are allocated contiguously in
# memory to make the transform method faster by leveraging cache
# locality.
self.components_ = xp.asarray(components_[:n_components, :], copy=True)
# We do the same for the other arrays for the sake of consistency.
self.explained_variance_ = xp.asarray(
explained_variance_[:n_components], copy=True
)
self.explained_variance_ratio_ = xp.asarray(
explained_variance_ratio_[:n_components], copy=True
)
self.singular_values_ = xp.asarray(singular_values_[:n_components], copy=True)
return U, S, Vt, X, x_is_centered, xp
[docs]
@typechecked
class ComponentChooser:
"""Determine the number of PCA components to keep."""
[docs]
def __init__(
self,
fitted_pca_object: PCA,
components_to_keep: int | float | None,
) -> None:
self.fitted_pca_object = fitted_pca_object
# can be either a float indicating proportion of explained variance, or an
# integer specifying the number of components
self.components_to_keep = components_to_keep
self._check_components_to_keep()
# TODO: I dislike the confusing names (components_to_keep vs min_variance_explained)
@property
def cumsum_explained_variance(self) -> np.ndarray:
return np.cumsum(self.fitted_pca_object.explained_variance_ratio_)
def _check_components_to_keep(self) -> None:
# if int, ensure it's not too big
if type(self.components_to_keep) is int:
if self.components_to_keep > self.fitted_pca_object.n_components_:
raise ValueError(
f"components_to_keep was set to {self.components_to_keep}, exceeding the "
f"maximum value of {self.fitted_pca_object.n_components_} observation dims"
)
# if float, ensure a proportion between 0.0-1.0
elif type(self.components_to_keep) is float:
if self.components_to_keep < 0.0 or self.components_to_keep > 1.0:
raise ValueError(
f"components_to_keep was set to {self.components_to_keep} while it has to be "
f"between 0.0 and 1.0"
)
def _find_first_threshold_cross(self) -> int:
# find the index of the first element above a min_variance_explained threshold
assert type(
self.components_to_keep is float
) # i.e., threshold crossing doesn't make sense with integer components_to_keep
if self.components_to_keep != 1.0:
components_to_keep = int(
np.where(self.cumsum_explained_variance >= self.components_to_keep)[0][0]
)
# cumsum is a d - 1 dimensional vector where the 0th element is the sum of the
# 0th and 1st element of the d dimensional vector it is summing over
return components_to_keep + 1
else: # if we want to keep all components, we need to return the number of components
# we do this because there's an issue with == 1.0 in the cumsum_explained_variance
return len(self.fitted_pca_object.explained_variance_)
[docs]
def __call__(self) -> int:
if type(self.components_to_keep) is int:
# return integer as is
return self.components_to_keep
elif type(self.components_to_keep) is float:
# find that integer that crosses the minimum explained variance
return self._find_first_threshold_cross()
@typechecked
def pca_prints(pca: PCA, condition: str, components_to_keep: int) -> None:
print("Results of running PCA ({}) on keypoints:".format(condition))
print(
"Kept {}/{} components, and found:".format(
components_to_keep, pca.n_components_
)
)
evr = np.round(pca.explained_variance_ratio_, 3)
print("Explained variance ratio: {}".format(evr))
tev = np.round(np.sum(pca.explained_variance_ratio_[:components_to_keep]), 3)
print("Variance explained by {} components: {}".format(components_to_keep, tev))
# @typechecked