Source code for aniposelib.cameras

import os
import cv2
import numpy as np
from copy import copy
from scipy.sparse import lil_matrix, dok_matrix
from scipy.linalg import inv
from scipy import optimize
from scipy import signal
from numba import jit
from collections import defaultdict, Counter
import toml
import itertools
from tqdm import trange
from pprint import pprint
import time

from .boards import merge_rows, extract_points, \
    extract_rtvecs, get_video_params
from .utils import get_initial_extrinsics, make_M, get_rtvec, \
    get_connections

def build_triangulate_syseq(point, camera_mat):
    import jax.numpy as jnp

    eqs = jnp.expand_dims(point, axis=-1) * jnp.expand_dims(camera_mat[2], axis=0)
    eqs = eqs - camera_mat[0:2]

    return eqs

def triangulate_simple(points, camera_mats, valid):
    import jax
    import jax.numpy as jnp

    A = jax.vmap(build_triangulate_syseq)(points, camera_mats)
    A = jnp.where(jnp.expand_dims(valid, axis=(1, 2)), A, 0)
    A = jnp.reshape(A, (-1, 4))
    _, _, vh = jnp.linalg.svd(A, full_matrices=True)
    p3d = vh[-1]
    p3d = p3d[:3] / p3d[3]

    return p3d


def get_error_dict(errors_full, min_points=10):
    n_cams = errors_full.shape[0]
    errors_norm = np.linalg.norm(errors_full, axis=2)

    good = ~np.isnan(errors_full[:, :, 0])

    error_dict = dict()

    for i in range(n_cams):
        for j in range(i+1, n_cams):
            subset = good[i] & good[j]
            err_subset = errors_norm[:, subset][[i, j]]
            err_subset_mean = np.mean(err_subset, axis=0)
            if np.sum(subset) > min_points:
                percents = np.percentile(err_subset_mean, [15, 75])
                # percents = np.percentile(err_subset, [25, 75])
                error_dict[(i, j)] = (err_subset.shape[1], percents)
    return error_dict

def check_errors(cgroup, imgp):
    p3ds = cgroup.triangulate(imgp)
    errors_full = cgroup.reprojection_error(p3ds, imgp, mean=False)
    return get_error_dict(errors_full)

def subset_extra(extra, ixs):
    if extra is None:
        return None

    new_extra = {
        'objp': extra['objp'][ixs],
        'ids': extra['ids'][ixs],
        'rvecs': extra['rvecs'][:, ixs],
        'tvecs': extra['tvecs'][:, ixs]
    }
    return new_extra

def resample_points_extra(imgp, extra, n_samp=25):
    n_cams, n_points, _ = imgp.shape
    ids = remap_ids(extra['ids'])
    n_ids = np.max(ids)+1
    good = ~np.isnan(imgp[:, :, 0])
    ixs = np.arange(n_points)

    cam_counts = np.zeros((n_ids, n_cams), dtype='int64')
    for idnum in range(n_ids):
        cam_counts[idnum] = np.sum(good[:, ids == idnum], axis=1)
    cam_counts_random = cam_counts + np.random.random(size=cam_counts.shape)
    best_boards = np.argsort(-cam_counts_random, axis=0)

    cam_totals = np.zeros(n_cams, dtype='int64')

    include = set()
    for cam_num in range(n_cams):
        for board_id in best_boards[:, cam_num]:
            include.update(ixs[ids == board_id])
            cam_totals += cam_counts[board_id]
            if cam_totals[cam_num] >= n_samp or \
               cam_counts_random[board_id, cam_num] < 1:
                break

    final_ixs = sorted(include)
    newp = imgp[:, final_ixs]
    extra = subset_extra(extra, final_ixs)
    return newp, extra

def resample_points(imgp, extra=None, n_samp=25):
    # if extra is not None:
    #     return resample_points_extra(imgp, extra, n_samp)

    n_cams = imgp.shape[0]
    good = ~np.isnan(imgp[:, :, 0])
    ixs = np.arange(imgp.shape[1])

    num_cams = np.sum(~np.isnan(imgp[:, :, 0]), axis=0)

    include = set()

    for i in range(n_cams):
        for j in range(i+1, n_cams):
            subset = good[i] & good[j]
            n_good = np.sum(subset)
            if n_good > 0:
                ## pick points, prioritizing points seen by more cameras
                arr = np.copy(num_cams[subset]).astype('float64')
                arr += np.random.random(size=arr.shape)
                picked_ix = np.argsort(-arr)[:n_samp]
                picked = ixs[subset][picked_ix]
                include.update(picked)

    final_ixs = sorted(include)
    newp = imgp[:, final_ixs]
    extra = subset_extra(extra, final_ixs)
    return newp, extra

def medfilt_data(values, size=15):
    padsize = size+5
    vpad = np.pad(values, (padsize, padsize), mode='reflect')
    vpadf = signal.medfilt(vpad, kernel_size=size)
    return vpadf[padsize:-padsize]

def nan_helper(y):
    return np.isnan(y), lambda z: z.nonzero()[0]

def interpolate_data(vals):
    nans, ix = nan_helper(vals)
    out = np.copy(vals)
    try:
        out[nans] = np.interp(ix(nans), ix(~nans), vals[~nans])
    except ValueError:
        out[:] = 0
    return out

def remap_ids(ids):
    unique_ids = np.unique(ids)
    ids_out = np.copy(ids)
    for i, num in enumerate(unique_ids):
        ids_out[ids == num] = i
    return ids_out

def transform_points(points, rvecs, tvecs):
    """Rotate points by given rotation vectors and translate.
    Rodrigues' rotation formula is used.
    """
    theta = np.linalg.norm(rvecs, axis=1)[:, np.newaxis]
    with np.errstate(invalid='ignore'):
        v = rvecs / theta
        v = np.nan_to_num(v)
    dot = np.sum(points * v, axis=1)[:, np.newaxis]
    cos_theta = np.cos(theta)
    sin_theta = np.sin(theta)

    rotated = cos_theta * points + \
        sin_theta * np.cross(v, points) + \
        dot * (1 - cos_theta) * v

    return rotated + tvecs


class Camera:
    def __init__(self,
                 matrix=np.eye(3),
                 dist=np.zeros(5),
                 size=None,
                 rvec=np.zeros(3),
                 tvec=np.zeros(3),
                 name=None,
                 extra_dist=False):

        self.set_camera_matrix(matrix)
        self.set_distortions(dist)
        self.set_size(size)
        self.set_rotation(rvec)
        self.set_translation(tvec)
        self.set_name(name)
        self.extra_dist = extra_dist

    def get_dict(self):
        return {
            'name': self.get_name(),
            'size': list(self.get_size()),
            'matrix': self.get_camera_matrix().tolist(),
            'distortions': self.get_distortions().tolist(),
            'rotation': self.get_rotation().tolist(),
            'translation': self.get_translation().tolist(),
        }

    def load_dict(self, d):
        self.set_camera_matrix(d['matrix'])
        self.set_rotation(d['rotation'])
        self.set_translation(d['translation'])
        self.set_distortions(d['distortions'])
        self.set_name(d['name'])
        self.set_size(d['size'])

    def from_dict(d):
        cam = Camera()
        cam.load_dict(d)
        return cam

    def get_camera_matrix(self):
        return self.matrix

    def get_distortions(self):
        return self.dist

    def set_camera_matrix(self, matrix):
        self.matrix = np.array(matrix, dtype='float64')

    def set_focal_length(self, fx, fy=None):
        if fy is None:
            fy = fx
        self.matrix[0, 0] = fx
        self.matrix[1, 1] = fy

    def get_focal_length(self, both=False):
        fx = self.matrix[0, 0]
        fy = self.matrix[1, 1]
        if both:
            return (fx, fy)
        else:
            return (fx + fy) / 2.0

    def set_distortions(self, dist):
        self.dist = np.array(dist, dtype='float64').ravel()

    def zero_distortions(self):
        self.dist = self.dist * 0

    def set_rotation(self, rvec):
        self.rvec = np.array(rvec, dtype='float64').ravel()

    def get_rotation(self):
        return self.rvec

    def set_translation(self, tvec):
        self.tvec = np.array(tvec, dtype='float64').ravel()

    def get_translation(self):
        return self.tvec

    def get_extrinsics_mat(self):
        return make_M(self.rvec, self.tvec)

    def get_name(self):
        return self.name

    def set_name(self, name):
        self.name = str(name)

    def set_size(self, size):
        """set size as (width, height)"""
        self.size = size

    def get_size(self):
        """get size as (width, height)"""
        return self.size

    def resize_camera(self, scale):
        """resize the camera by scale factor, updating intrinsics to match"""
        size = self.get_size()
        new_size = size[0] * scale, size[1] * scale
        matrix = self.get_camera_matrix()
        new_matrix = matrix * scale
        new_matrix[2, 2] = 1
        self.set_size(new_size)
        self.set_camera_matrix(new_matrix)

    def get_params(self, only_extrinsics=False):
        if only_extrinsics:
            params = np.zeros(6, dtype='float64')
        else:
            params = np.zeros(8 + self.extra_dist, dtype='float64')
        params[0:3] = self.get_rotation()
        params[3:6] = self.get_translation()
        if only_extrinsics:
            return params
        params[6] = self.get_focal_length()
        dist = self.get_distortions()
        params[7] = dist[0]
        if self.extra_dist:
            params[8] = dist[1]
        return params

    def set_params(self, params, only_extrinsics=False):
        self.set_rotation(params[0:3])
        self.set_translation(params[3:6])
        if only_extrinsics:
            return

        self.set_focal_length(params[6])

        dist = np.zeros(5, dtype='float64')
        dist[0] = params[7]
        if self.extra_dist:
            dist[1] = params[8]
        self.set_distortions(dist)

    def distort_points(self, points):
        shape = points.shape
        points = points.reshape(-1, 1, 2)
        new_points = np.dstack([points, np.ones((points.shape[0], 1, 1))])
        out, _ = cv2.projectPoints(new_points, np.zeros(3), np.zeros(3),
                                   self.matrix.astype('float64'),
                                   self.dist.astype('float64'))
        return out.reshape(shape)

    def undistort_points(self, points):
        shape = points.shape
        points = points.reshape(-1, 1, 2)
        out = cv2.undistortPoints(points,
                                  self.matrix.astype('float64'),
                                  self.dist.astype('float64'))
        return out.reshape(shape)

    def project(self, points):
        points = points.reshape(-1, 1, 3)
        out, _ = cv2.projectPoints(points, self.rvec, self.tvec,
                                   self.matrix.astype('float64'),
                                   self.dist.astype('float64'))
        return out

    def reprojection_error(self, p3d, p2d):
        proj = self.project(p3d).reshape(p2d.shape)
        return p2d - proj

    def copy(self):
        return \
            Camera(matrix=self.get_camera_matrix().copy(),
                   dist=self.get_distortions().copy(),
                   size=self.get_size(),
                   rvec=self.get_rotation().copy(),
                   tvec=self.get_translation().copy(),
                   name=self.get_name(),
                   extra_dist=self.extra_dist)

class FisheyeCamera(Camera):
    def __init__(self,
                 matrix=np.eye(3),
                 dist=np.zeros(4),
                 size=None,
                 rvec=np.zeros(3),
                 tvec=np.zeros(3),
                 name=None,
                 extra_dist=False):
        self.set_camera_matrix(matrix)
        self.set_distortions(dist)
        self.set_size(size)
        self.set_rotation(rvec)
        self.set_translation(tvec)
        self.set_name(name)
        self.extra_dist = extra_dist

    def from_dict(d):
        cam = FisheyeCamera()
        cam.load_dict(d)
        return cam

    def get_dict(self):
        d = super().get_dict()
        d['fisheye'] = True
        return d

    def distort_points(self, points):
        shape = points.shape
        points = points.reshape(-1, 1, 2)
        new_points = np.dstack([points, np.ones((points.shape[0], 1, 1))])
        out, _ = cv2.fisheye.projectPoints(new_points,
                                           np.zeros(3), np.zeros(3),
                                           self.matrix.astype('float64'),
                                           self.dist.astype('float64'))
        return out.reshape(shape)

    def undistort_points(self, points):
        shape = points.shape
        points = points.reshape(-1, 1, 2)
        out = cv2.fisheye.undistortPoints(points.astype('float64'),
                                          self.matrix.astype('float64'),
                                          self.dist.astype('float64'))
        return out.reshape(shape)

    def project(self, points):
        points = points.reshape(-1, 1, 3)
        out, _ = cv2.fisheye.projectPoints(points,
                                           self.rvec, self.tvec,
                                           self.matrix.astype('float64'),
                                           self.dist.astype('float64'))
        return out

    def set_params(self, params, only_extrinsics):
        self.set_rotation(params[0:3])
        self.set_translation(params[3:6])

        if only_extrinsics:
            return

        self.set_focal_length(params[6])

        dist = np.zeros(4, dtype='float64')
        dist[0] = params[7]
        if self.extra_dist:
            dist[1] = params[8]
        # dist[2] = params[9]
        # dist[3] = params[10]
        self.set_distortions(dist)

    def get_params(self, only_extrinsics=False):
        if only_extrinsics:
            params = np.zeros(6, dtype='float64')
        else:
            params = np.zeros(8+self.extra_dist, dtype='float64')
        params[0:3] = self.get_rotation()
        params[3:6] = self.get_translation()
        if only_extrinsics:
            return params
        params[6] = self.get_focal_length()
        dist = self.get_distortions()
        params[7] = dist[0]
        if self.extra_dist:
            params[8] = dist[1]
        # params[9] = dist[2]
        # params[10] = dist[3]
        return params

    def copy(self):
        return FisheyeCamera(
            matrix=self.get_camera_matrix().copy(),
            dist=self.get_distortions().copy(),
            size=self.get_size(),
            rvec=self.get_rotation().copy(),
            tvec=self.get_translation().copy(),
            name=self.get_name(),
            extra_dist=self.extra_dist)

class CameraGroup:
[docs] def __init__(self, cameras, metadata={}): self.cameras = cameras self.metadata = metadata
def subset_cameras(self, indices): cams = [self.cameras[ix].copy() for ix in indices] return CameraGroup(cams, self.metadata) def subset_cameras_names(self, names): cur_names = self.get_names() cur_names_dict = dict(zip(cur_names, range(len(cur_names)))) indices = [] for name in names: if name not in cur_names_dict: raise IndexError( "name {} not part of camera names: {}".format( name, cur_names )) indices.append(cur_names_dict[name]) return self.subset_cameras(indices) def project(self, points): """Given an Nx3 array of points, this returns an CxNx2 array of 2D points, where C is the number of cameras""" points = points.reshape(-1, 1, 3) n_points = points.shape[0] n_cams = len(self.cameras) out = np.empty((n_cams, n_points, 2), dtype='float64') for cnum, cam in enumerate(self.cameras): out[cnum] = cam.project(points).reshape(n_points, 2) return out def triangulate(self, points, undistort=True, progress=False, fast=False, disable_64bit=False): """Given an CxNx2 array, this returns an Nx3 array of points, where N is the number of points and C is the number of cameras""" import jax import jax.numpy as jnp if (not disable_64bit) and (os.getenv("ANIPOSE_DISABLE_JAX_X64", "0").lower() in ("0", "no", "false")): jax.config.update("jax_enable_x64", True) assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) one_point = False if len(points.shape) == 2: points = points.reshape(-1, 1, 2) one_point = True if undistort: new_points = np.empty(points.shape) for cnum, cam in enumerate(self.cameras): # must copy in order to satisfy opencv underneath sub = np.copy(points[cnum]) new_points[cnum] = cam.undistort_points(sub) points = new_points n_cams, n_points, _ = points.shape if fast: cam_Rt_mats = np.array([cam.get_extrinsics_mat()[:3] for cam in self.cameras]) p3d_allview_withnan = [] for j1, j2 in itertools.combinations(range(n_cams), 2): pts1, pts2 = points[j1], points[j2] Rt1, Rt2 = cam_Rt_mats[j1], cam_Rt_mats[j2] tri = cv2.triangulatePoints(Rt1, Rt2, pts1.T, pts2.T) tri = tri[:3]/tri[3] p3d_allview_withnan.append(tri.T) p3d_allview_withnan = np.array(p3d_allview_withnan) out = np.nanmedian(p3d_allview_withnan, axis=0) else: # Determine batch size and prepare for batching batch_size = min(100000, n_points) num_batches = (n_points + batch_size - 1) // batch_size # Get camera matrices once since these don't need batching cam_mats = jnp.stack([cam.get_extrinsics_mat() for cam in self.cameras]) # Initialize output array out = np.full((n_points, 3), np.nan) # Get vectorized triangulate function vtriangulate = jax.vmap(jax.jit(triangulate_simple), in_axes=(1, None, 1)) # Process in batches for batch_idx in range(num_batches): start_idx = batch_idx * batch_size end_idx = min((batch_idx + 1) * batch_size, n_points) current_batch_size = end_idx - start_idx # Extract current batch batch_points = points[:, start_idx:end_idx].copy() # Convert to JAX arrays and prepare for triangulation batch_points = jnp.array(batch_points) batch_good = ~jnp.isnan(batch_points[:, :, 0]) batch_is_good = jnp.expand_dims(jnp.sum(batch_good, axis=0) >= 2, axis=-1) # Triangulate batch_out = vtriangulate(batch_points, cam_mats, batch_good) # Filter points based on successfull triangulation batch_out = jnp.where(batch_is_good[:current_batch_size], batch_out, jnp.nan) # Store in the output array out[start_idx:end_idx] = np.array(batch_out) if one_point: out = out[0] return out def triangulate_possible(self, points, undistort=True, min_cams=2, progress=False, threshold=0.5): """Given an CxNxPx2 array, this returns an Nx3 array of points by triangulating all possible points and picking the ones with best reprojection error where: C: number of cameras N: number of points P: number of possible options per point """ assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) n_cams, n_points, n_possible, _ = points.shape cam_nums, point_nums, possible_nums = np.where( ~np.isnan(points[:, :, :, 0])) all_iters = defaultdict(dict) for cam_num, point_num, possible_num in zip(cam_nums, point_nums, possible_nums): if cam_num not in all_iters[point_num]: all_iters[point_num][cam_num] = [] all_iters[point_num][cam_num].append((cam_num, possible_num)) for point_num in all_iters.keys(): for cam_num in all_iters[point_num].keys(): all_iters[point_num][cam_num].append(None) out = np.full((n_points, 3), np.nan, dtype='float64') picked_vals = np.zeros((n_cams, n_points, n_possible), dtype='bool') errors = np.zeros(n_points, dtype='float64') points_2d = np.full((n_cams, n_points, 2), np.nan, dtype='float64') if progress: iterator = trange(n_points, ncols=70) else: iterator = range(n_points) for point_ix in iterator: best_point = None best_error = 200 n_cams_max = len(all_iters[point_ix]) for picked in itertools.product(*all_iters[point_ix].values()): picked = [p for p in picked if p is not None] if len(picked) < min_cams and len(picked) != n_cams_max: continue cnums = [p[0] for p in picked] xnums = [p[1] for p in picked] pts = points[cnums, point_ix, xnums] cc = self.subset_cameras(cnums) p3d = cc.triangulate(pts, undistort=undistort) err = cc.reprojection_error(p3d, pts, mean=True) if err < best_error: best_point = { 'error': err, 'point': p3d[:3], 'points': pts, 'picked': picked, 'joint_ix': point_ix } best_error = err if best_error < threshold: break if best_point is not None: out[point_ix] = best_point['point'] picked = best_point['picked'] cnums = [p[0] for p in picked] xnums = [p[1] for p in picked] picked_vals[cnums, point_ix, xnums] = True errors[point_ix] = best_point['error'] points_2d[cnums, point_ix] = best_point['points'] return out, picked_vals, points_2d, errors def triangulate_ransac(self, points, undistort=True, min_cams=2, progress=False): """Given an CxNx2 array, this returns an Nx3 array of points, where N is the number of points and C is the number of cameras""" assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) n_cams, n_points, _ = points.shape points_ransac = points.reshape(n_cams, n_points, 1, 2) return self.triangulate_possible(points_ransac, undistort=undistort, min_cams=min_cams, progress=progress) @jit(parallel=True, forceobj=True) def reprojection_error(self, p3ds, p2ds, mean=False): """Given an Nx3 array of 3D points and an CxNx2 array of 2D points, where N is the number of points and C is the number of cameras, this returns an CxNx2 array of errors. Optionally mean=True, this averages the errors and returns array of length N of errors""" one_point = False if len(p3ds.shape) == 1 and len(p2ds.shape) == 2: p3ds = p3ds.reshape(1, 3) p2ds = p2ds.reshape(-1, 1, 2) one_point = True n_cams, n_points, _ = p2ds.shape assert p3ds.shape == (n_points, 3), \ "shapes of 2D and 3D points are not consistent: " \ "2D={}, 3D={}".format(p2ds.shape, p3ds.shape) errors = np.empty((n_cams, n_points, 2)) for cnum, cam in enumerate(self.cameras): errors[cnum] = cam.reprojection_error(p3ds, p2ds[cnum]) if mean: errors_norm = np.linalg.norm(errors, axis=2) good = ~np.isnan(errors_norm) errors_norm[~good] = 0 denom = np.sum(good, axis=0).astype('float64') denom[denom < 1.5] = np.nan errors = np.sum(errors_norm, axis=0) / denom if one_point: if mean: errors = float(errors[0]) else: errors = errors.reshape(-1, 2) return errors def bundle_adjust_iter(self, p2ds, extra=None, n_iters=6, start_mu=15, end_mu=1, max_nfev=200, ftol=1e-4, n_samp_iter=200, n_samp_full=1000, error_threshold=0.3, only_extrinsics=False, verbose=False): """Given an CxNx2 array of 2D points, where N is the number of points and C is the number of cameras, this performs iterative bundle adjustsment to fine-tune the parameters of the cameras. That is, it performs bundle adjustment multiple times, adjusting the weights given to points to reduce the influence of outliers. This is inspired by the algorithm for Fast Global Registration by Zhou, Park, and Koltun """ assert p2ds.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), p2ds.shape ) p2ds_full = p2ds extra_full = extra p2ds, extra = resample_points(p2ds_full, extra_full, n_samp=n_samp_full) error = self.average_error(p2ds, median=True) if verbose: print('error: ', error) mus = np.exp(np.linspace(np.log(start_mu), np.log(end_mu), num=n_iters)) if verbose: print('n_samples: {}'.format(n_samp_iter)) for i in range(n_iters): p2ds, extra = resample_points(p2ds_full, extra_full, n_samp=n_samp_full) p3ds = self.triangulate(p2ds) errors_full = self.reprojection_error(p3ds, p2ds, mean=False) errors_norm = self.reprojection_error(p3ds, p2ds, mean=True) error_dict = get_error_dict(errors_full) max_error = 0 min_error = 0 for k, v in error_dict.items(): num, percents = v max_error = max(percents[-1], max_error) min_error = max(percents[0], min_error) mu = max(min(max_error, mus[i]), min_error) good = errors_norm < mu extra_good = subset_extra(extra, good) p2ds_samp, extra_samp = resample_points( p2ds[:, good], extra_good, n_samp=n_samp_iter) error = np.median(errors_norm) if error < error_threshold: break if verbose: pprint(error_dict) print('error: {:.2f}, mu: {:.1f}, ratio: {:.3f}'.format(error, mu, np.mean(good))) self.bundle_adjust(p2ds_samp, extra_samp, loss='linear', ftol=ftol, max_nfev=max_nfev, only_extrinsics=only_extrinsics, verbose=verbose) p2ds, extra = resample_points(p2ds_full, extra_full, n_samp=n_samp_full) p3ds = self.triangulate(p2ds) errors_full = self.reprojection_error(p3ds, p2ds, mean=False) errors_norm = self.reprojection_error(p3ds, p2ds, mean=True) error_dict = get_error_dict(errors_full) if verbose: pprint(error_dict) max_error = 0 min_error = 0 for k, v in error_dict.items(): num, percents = v max_error = max(percents[-1], max_error) min_error = max(percents[0], min_error) mu = max(max(max_error, end_mu), min_error) good = errors_norm < mu extra_good = subset_extra(extra, good) self.bundle_adjust(p2ds[:, good], extra_good, loss='linear', ftol=ftol, max_nfev=max(200, max_nfev), only_extrinsics=only_extrinsics, verbose=verbose) error = self.average_error(p2ds, median=True) p3ds = self.triangulate(p2ds) errors_full = self.reprojection_error(p3ds, p2ds, mean=False) error_dict = get_error_dict(errors_full) if verbose: pprint(error_dict) if verbose: print('error: ', error) return error def bundle_adjust(self, p2ds, extra=None, loss='linear', threshold=50, ftol=1e-4, max_nfev=1000, weights=None, start_params=None, only_extrinsics=False, verbose=True): """Given an CxNx2 array of 2D points, where N is the number of points and C is the number of cameras, this performs bundle adjustsment to fine-tune the parameters of the cameras""" assert p2ds.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), p2ds.shape ) if extra is not None: extra['ids_map'] = remap_ids(extra['ids']) x0, n_cam_params = self._initialize_params_bundle(p2ds, extra, only_extrinsics) if start_params is not None: x0 = start_params # n_cam_params = len(self.cameras[0].get_params(only_extrinsics)) error_fun = self._error_fun_bundle jac_sparse = self._jac_sparsity_bundle(p2ds, n_cam_params, extra) f_scale = threshold opt = optimize.least_squares(error_fun, x0, jac_sparsity=jac_sparse, f_scale=f_scale, x_scale='jac', loss=loss, ftol=ftol, method='trf', tr_solver='lsmr', verbose=2 * verbose, max_nfev=max_nfev, args=(p2ds, n_cam_params, extra, only_extrinsics)) best_params = opt.x for i, cam in enumerate(self.cameras): a = i * n_cam_params b = (i + 1) * n_cam_params cam.set_params(best_params[a:b], only_extrinsics) error = self.average_error(p2ds) return error @jit(parallel=True, forceobj=True) def _error_fun_bundle(self, params, p2ds, n_cam_params, extra, only_extrinsics): """Error function for bundle adjustment""" good = ~np.isnan(p2ds) n_cams = len(self.cameras) for i in range(n_cams): cam = self.cameras[i] a = i * n_cam_params b = (i + 1) * n_cam_params cam.set_params(params[a:b], only_extrinsics) n_cams = len(self.cameras) sub = n_cam_params * n_cams n3d = p2ds.shape[1] * 3 p3ds_test = params[sub:sub+n3d].reshape(-1, 3) errors = self.reprojection_error(p3ds_test, p2ds) errors_reproj = errors[good] if extra is not None: ids = extra['ids_map'] objp = extra['objp'] min_scale = np.min(objp[objp > 0]) n_boards = int(np.max(ids)) + 1 a = sub+n3d rvecs = params[a:a+n_boards*3].reshape(-1, 3) tvecs = params[a+n_boards*3:a+n_boards*6].reshape(-1, 3) expected = transform_points(objp, rvecs[ids], tvecs[ids]) errors_obj = 2 * (p3ds_test - expected).ravel() / min_scale else: errors_obj = np.array([]) return np.hstack([errors_reproj, errors_obj]) def _jac_sparsity_bundle(self, p2ds, n_cam_params, extra): """Given an CxNx2 array of 2D points, where N is the number of points and C is the number of cameras, compute the sparsity structure of the jacobian for bundle adjustment""" point_indices = np.zeros(p2ds.shape, dtype='int32') cam_indices = np.zeros(p2ds.shape, dtype='int32') for i in range(p2ds.shape[1]): point_indices[:, i] = i for j in range(p2ds.shape[0]): cam_indices[j] = j good = ~np.isnan(p2ds) if extra is not None: ids = extra['ids_map'] n_boards = int(np.max(ids)) + 1 total_board_params = n_boards * (3 + 3) # rvecs + tvecs else: n_boards = 0 total_board_params = 0 n_cams = p2ds.shape[0] n_points = p2ds.shape[1] total_params_reproj = n_cams * n_cam_params + n_points * 3 n_params = total_params_reproj + total_board_params n_good_values = np.sum(good) if extra is not None: n_errors = n_good_values + n_points * 3 else: n_errors = n_good_values A_sparse = dok_matrix((n_errors, n_params), dtype='int16') cam_indices_good = cam_indices[good] point_indices_good = point_indices[good] # -- reprojection error -- ix = np.arange(n_good_values) ## update camera params based on point error for i in range(n_cam_params): A_sparse[ix, cam_indices_good * n_cam_params + i] = 1 ## update point position based on point error for i in range(3): A_sparse[ix, n_cams * n_cam_params + point_indices_good * 3 + i] = 1 # -- match for the object points-- if extra is not None: point_ix = np.arange(n_points) ## update all the camera parameters # A_sparse[n_good_values:n_good_values+n_points*3, # 0:n_cams*n_cam_params] = 1 ## update board rotation and translation based on error from expected for i in range(3): for j in range(3): A_sparse[n_good_values + point_ix*3 + i, total_params_reproj + ids*3 + j] = 1 A_sparse[n_good_values + point_ix*3 + i, total_params_reproj + n_boards*3 + ids*3 + j] = 1 ## update point position based on error from expected for i in range(3): A_sparse[n_good_values + point_ix*3 + i, n_cams*n_cam_params + point_ix*3 + i] = 1 return A_sparse def _initialize_params_bundle(self, p2ds, extra, only_extrinsics): """Given an CxNx2 array of 2D points, where N is the number of points and C is the number of cameras, initializes the parameters for bundle adjustment""" cam_params = np.hstack([cam.get_params(only_extrinsics) for cam in self.cameras]) n_cam_params = len(cam_params) // len(self.cameras) total_cam_params = len(cam_params) n_cams, n_points, _ = p2ds.shape assert n_cams == len(self.cameras), \ "number of cameras in CameraGroup does not " \ "match number of cameras in 2D points given" p3ds = self.triangulate(p2ds) if extra is not None: ids = extra['ids_map'] n_boards = int(np.max(ids[~np.isnan(ids)])) + 1 total_board_params = n_boards * (3 + 3) # rvecs + tvecs # initialize to 0 rvecs = np.zeros((n_boards, 3), dtype='float64') tvecs = np.zeros((n_boards, 3), dtype='float64') if 'rvecs' in extra and 'tvecs' in extra: rvecs_all = extra['rvecs'] tvecs_all = extra['tvecs'] for board_num in range(n_boards): point_id = np.where(ids == board_num)[0][0] cam_ids_possible = np.where(~np.isnan(p2ds[:, point_id, 0]))[0] cam_id = np.random.choice(cam_ids_possible) M_cam = self.cameras[cam_id].get_extrinsics_mat() M_board_cam = make_M(rvecs_all[cam_id, point_id], tvecs_all[cam_id, point_id]) M_board = np.matmul(inv(M_cam), M_board_cam) rvec, tvec = get_rtvec(M_board) rvecs[board_num] = rvec tvecs[board_num] = tvec else: total_board_params = 0 x0 = np.zeros(total_cam_params + p3ds.size + total_board_params) x0[:total_cam_params] = cam_params x0[total_cam_params:total_cam_params+p3ds.size] = p3ds.ravel() if extra is not None: start_board = total_cam_params+p3ds.size x0[start_board:start_board + n_boards*3] = rvecs.ravel() x0[start_board + n_boards*3:start_board + n_boards*6] = \ tvecs.ravel() return x0, n_cam_params def optim_points(self, points, p3ds, constraints=[], constraints_weak=[], scale_smooth=4, scale_length=2, scale_length_weak=0.5, reproj_error_threshold=15, reproj_loss='soft_l1', n_deriv_smooth=1, scores=None, verbose=False, n_fixed=0): """ Take in an array of 2D points of shape CxNxJx2, an array of 3D points of shape NxJx3, and an array of constraints of shape Kx2, where C: number of camera N: number of frames J: number of joints K: number of constraints This function creates an optimized array of 3D points of shape NxJx3. Example constraints: constraints = [[0, 1], [1, 2], [2, 3]] (meaning that lengths of segments 0->1, 1->2, 2->3 are all constant) """ assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) n_cams, n_frames, n_joints, _ = points.shape constraints = np.array(constraints) constraints_weak = np.array(constraints_weak) p3ds_intp = np.apply_along_axis(interpolate_data, 0, p3ds) p3ds_med = np.apply_along_axis(medfilt_data, 0, p3ds_intp, size=7) default_smooth = 1.0/np.mean(np.abs(np.diff(p3ds_med, axis=0))) scale_smooth_full = scale_smooth * default_smooth t1 = time.time() x0 = self._initialize_params_triangulation( p3ds_intp, constraints, constraints_weak) x0[~np.isfinite(x0)] = 0 if n_fixed > 0: p3ds_fixed = p3ds_intp[:n_fixed] else: p3ds_fixed = None jac = self._jac_sparsity_triangulation( points, constraints, constraints_weak, n_deriv_smooth) opt2 = optimize.least_squares(self._error_fun_triangulation, x0=x0, jac_sparsity=jac, loss='linear', ftol=1e-3, verbose=2*verbose, args=(points, constraints, constraints_weak, scores, scale_smooth_full, scale_length, scale_length_weak, reproj_error_threshold, reproj_loss, n_deriv_smooth, p3ds_fixed)) p3ds_new2 = opt2.x[:p3ds.size].reshape(p3ds.shape) if n_fixed > 0: p3ds_new2 = np.vstack([p3ds_fixed, p3ds_new2[n_fixed:]]) t2 = time.time() if verbose: print('optimization took {:.2f} seconds'.format(t2 - t1)) return p3ds_new2 def optim_points_possible(self, points, p3ds, constraints=[], constraints_weak=[], scale_smooth=4, scale_length=2, scale_length_weak=0.5, reproj_error_threshold=15, reproj_loss='soft_l1', n_deriv_smooth=1, scores=None, verbose=False): """ Take in an array of 2D points of shape CxNxJxPx2, an array of 3D points of shape NxJx3, and an array of constraints of shape Kx2, where C: number of camera N: number of frames J: number of joints P: number of possible options per point K: number of constraints This function creates an optimized array of 3D points of shape NxJx3. Example constraints: constraints = [[0, 1], [1, 2], [2, 3]] (meaning that lengths of segments 0->1, 1->2, 2->3 are all constant) """ assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) n_cams, n_frames, n_joints, n_possible, _ = points.shape constraints = np.array(constraints) constraints_weak = np.array(constraints_weak) p3ds_intp = np.apply_along_axis(interpolate_data, 0, p3ds) p3ds_med = np.apply_along_axis(medfilt_data, 0, p3ds_intp, size=7) default_smooth = 1.0/np.mean(np.abs(np.diff(p3ds_med, axis=0))) scale_smooth_full = scale_smooth * default_smooth t1 = time.time() x0 = self._initialize_params_triangulation_possible( p3ds_intp, points, constraints=constraints, constraints_weak=constraints_weak) print('getting jacobian...') jac = self._jac_sparsity_triangulation_possible( points, constraints=constraints, constraints_weak=constraints_weak, n_deriv_smooth=n_deriv_smooth) beta = 5 print('starting optimization...') opt2 = optimize.least_squares(self._error_fun_triangulation_possible, x0=x0, jac_sparsity=jac, loss='linear', ftol=1e-3, verbose=2*verbose, args=(points, beta, constraints, constraints_weak, scores, scale_smooth_full, scale_length, scale_length_weak, reproj_error_threshold, reproj_loss, n_deriv_smooth)) params = opt2.x p3ds_new2 = params[:p3ds.size].reshape(p3ds.shape) bad = np.isnan(points[:, :, :, :, 0]) all_bad = np.all(bad, axis=3) n_params_norm = p3ds.size + len(constraints) + len(constraints_weak) alphas = np.zeros((n_cams, n_frames, n_joints, n_possible), dtype='float64') alphas[~bad] = params[n_params_norm:] alphas_exp = np.exp(beta * alphas) alphas_exp[bad] = 0 alphas_sum = np.sum(alphas_exp, axis=3) alphas_sum[all_bad] = 1 alphas_norm = alphas_exp / alphas_sum[:, :, :, None] alphas_norm[bad] = np.nan t2 = time.time() if verbose: print('optimization took {:.2f} seconds'.format(t2 - t1)) return p3ds_new2, alphas_norm def triangulate_optim(self, points, init_ransac=False, init_progress=False, **kwargs): """ Take in an array of 2D points of shape CxNxJx2, and an array of constraints of shape Kx2, where C: number of camera N: number of frames J: number of joints K: number of constraints This function creates an optimized array of 3D points of shape NxJx3. Example constraints: constraints = [[0, 1], [1, 2], [2, 3]] (meaning that lengths of segments 0->1, 1->2, 2->3 are all constant) """ assert points.shape[0] == len(self.cameras), \ "Invalid points shape, first dim should be equal to" \ " number of cameras ({}), but shape is {}".format( len(self.cameras), points.shape ) n_cams, n_frames, n_joints, _ = points.shape # constraints = np.array(constraints) # constraints_weak = np.array(constraints_weak) points_shaped = points.reshape(n_cams, n_frames*n_joints, 2) if init_ransac: p3ds, picked, p2ds, errors = self.triangulate_ransac(points_shaped, progress=init_progress) points = p2ds.reshape(points.shape) else: p3ds = self.triangulate(points_shaped, progress=init_progress) p3ds = p3ds.reshape((n_frames, n_joints, 3)) c = np.isfinite(p3ds[:, :, 0]) if np.sum(c) < 20: print("warning: not enough 3D points to run optimization") return p3ds return self.optim_points(points, p3ds, **kwargs) @jit(forceobj=True, parallel=True) def _error_fun_triangulation(self, params, p2ds, constraints=[], constraints_weak=[], scores=None, scale_smooth=10000, scale_length=1, scale_length_weak=0.2, reproj_error_threshold=100, reproj_loss='soft_l1', n_deriv_smooth=1, p3ds_fixed=None): n_cams, n_frames, n_joints, _ = p2ds.shape n_3d = n_frames * n_joints * 3 n_constraints = len(constraints) n_constraints_weak = len(constraints_weak) # load params p3ds = params[:n_3d].reshape((n_frames, n_joints, 3)) joint_lengths = np.array(params[n_3d:n_3d+n_constraints]) joint_lengths_weak = np.array(params[n_3d+n_constraints:]) ## if fixed points, first n_fixed parameter points are ignored ## and replacement points are put in ## this way we can keep rest of code the same, especially _jac_sparsity_triangulation if p3ds_fixed is not None: n_fixed = p3ds_fixed.shape[0] p3ds = np.vstack([p3ds_fixed, p3ds[n_fixed:]]) # reprojection errors p3ds_flat = p3ds.reshape(-1, 3) p2ds_flat = p2ds.reshape((n_cams, -1, 2)) errors = self.reprojection_error(p3ds_flat, p2ds_flat) if scores is not None: scores_flat = scores.reshape((n_cams, -1)) errors = errors * scores_flat[:, :, None] errors_reproj = errors[~np.isnan(p2ds_flat)] rp = reproj_error_threshold errors_reproj = np.abs(errors_reproj) if reproj_loss == 'huber': bad = errors_reproj > rp errors_reproj[bad] = rp*(2*np.sqrt(errors_reproj[bad]/rp) - 1) elif reproj_loss == 'linear': pass elif reproj_loss == 'soft_l1': errors_reproj = rp*2*(np.sqrt(1+errors_reproj/rp)-1) # temporal constraint errors_smooth = np.diff(p3ds, n=n_deriv_smooth, axis=0).ravel() * scale_smooth # joint length constraint errors_lengths = np.empty((n_constraints, n_frames), dtype='float64') for cix, (a, b) in enumerate(constraints): lengths = np.linalg.norm(p3ds[:, a] - p3ds[:, b], axis=1) expected = joint_lengths[cix] errors_lengths[cix] = 100*(lengths - expected)/expected errors_lengths = errors_lengths.ravel() * scale_length errors_lengths_weak = np.empty((n_constraints_weak, n_frames), dtype='float64') for cix, (a, b) in enumerate(constraints_weak): lengths = np.linalg.norm(p3ds[:, a] - p3ds[:, b], axis=1) expected = joint_lengths_weak[cix] errors_lengths_weak[cix] = 100*(lengths - expected)/expected errors_lengths_weak = errors_lengths_weak.ravel() * scale_length_weak return np.hstack([errors_reproj, errors_smooth, errors_lengths, errors_lengths_weak]) def _error_fun_triangulation_possible(self, params, p2ds, beta=2, constraints=[], constraints_weak=[], *args): # extract alphas from end of params # soft argmax for picking the appropriate points from p2ds # pass the points to error_fun_triangulate_possible for residuals # add errors to keep the alphas in check # return all the errors n_cams, n_frames, n_joints, n_possible, _ = p2ds.shape n_3d = n_frames*n_joints*3 n_constraints = len(constraints) n_constraints_weak = len(constraints_weak) n_params_norm = n_3d+n_constraints+n_constraints_weak # load params bad = np.isnan(p2ds[:, :, :, :, 0]) all_bad = np.all(bad, axis=3) alphas = np.zeros((n_cams, n_frames, n_joints, n_possible), dtype='float64') alphas[~bad] = params[n_params_norm:] params_rest = np.array(params[:n_params_norm]) # get normalized alphas alphas_exp = np.exp(beta * alphas) alphas_exp[bad] = 0 alphas_sum = np.sum(alphas_exp, axis=3) alphas_sum[all_bad] = 1 alphas_norm = alphas_exp / alphas_sum[:, :, :, None] # extract the 2D points using soft argmax p2ds_test = np.copy(p2ds) p2ds_test[bad] = 0 p2ds_adj = np.sum(alphas_norm[:, :, :, :, None] * p2ds_test, axis=3) p2ds_adj[all_bad] = np.nan errors = self._error_fun_triangulation(params_rest, p2ds_adj, constraints, constraints_weak, *args) alphas_test = alphas_norm[~all_bad] errors_alphas = (1 - np.std(alphas_test, axis=1)) * 10 return np.hstack([errors, errors_alphas]) def _initialize_params_triangulation(self, p3ds, constraints=[], constraints_weak=[]): joint_lengths = np.empty(len(constraints), dtype='float64') joint_lengths_weak = np.empty(len(constraints_weak), dtype='float64') for cix, (a, b) in enumerate(constraints): lengths = np.linalg.norm(p3ds[:, a] - p3ds[:, b], axis=1) joint_lengths[cix] = np.median(lengths) for cix, (a, b) in enumerate(constraints_weak): lengths = np.linalg.norm(p3ds[:, a] - p3ds[:, b], axis=1) joint_lengths_weak[cix] = np.median(lengths) all_lengths = np.hstack([joint_lengths, joint_lengths_weak]) med = np.median(all_lengths) if med == 0: med = 1e-3 mad = np.median(np.abs(all_lengths - med)) joint_lengths[joint_lengths == 0] = med joint_lengths_weak[joint_lengths_weak == 0] = med joint_lengths[joint_lengths > med+mad*5] = med joint_lengths_weak[joint_lengths_weak > med+mad*5] = med return np.hstack([p3ds.ravel(), joint_lengths, joint_lengths_weak]) def _initialize_params_triangulation_possible(self, p3ds, p2ds, **kwargs): # initialize params using above function # initialize alphas to 1 for first one and 0 for other possible n_cams, n_frames, n_joints, n_possible, _ = p2ds.shape good = ~np.isnan(p2ds[:, :, :, :, 0]) alphas = np.zeros((n_cams, n_frames, n_joints, n_possible), dtype='float64') alphas[:, :, :, 0] = 0 params = self._initialize_params_triangulation(p3ds, **kwargs) params_full = np.hstack([params, alphas[good]]) return params_full def _jac_sparsity_triangulation(self, p2ds, constraints=[], constraints_weak=[], n_deriv_smooth=1): n_cams, n_frames, n_joints, _ = p2ds.shape n_constraints = len(constraints) n_constraints_weak = len(constraints_weak) p2ds_flat = p2ds.reshape((n_cams, -1, 2)) point_indices = np.zeros(p2ds_flat.shape, dtype='int32') for i in range(p2ds_flat.shape[1]): point_indices[:, i] = i point_indices_3d = np.arange(n_frames*n_joints)\ .reshape((n_frames, n_joints)) good = ~np.isnan(p2ds_flat) n_errors_reproj = np.sum(good) n_errors_smooth = (n_frames-n_deriv_smooth) * n_joints * 3 n_errors_lengths = n_constraints * n_frames n_errors_lengths_weak = n_constraints_weak * n_frames n_errors = n_errors_reproj + n_errors_smooth + \ n_errors_lengths + n_errors_lengths_weak n_3d = n_frames*n_joints*3 n_params = n_3d + n_constraints + n_constraints_weak point_indices_good = point_indices[good] A_sparse = dok_matrix((n_errors, n_params), dtype='int16') # constraints for reprojection errors ix_reproj = np.arange(n_errors_reproj) for k in range(3): A_sparse[ix_reproj, point_indices_good * 3 + k] = 1 # sparse constraints for smoothness in time frames = np.arange(n_frames-n_deriv_smooth) for j in range(n_joints): for n in range(n_deriv_smooth+1): pa = point_indices_3d[frames, j] pb = point_indices_3d[frames+n, j] for k in range(3): A_sparse[n_errors_reproj + pa*3 + k, pb*3 + k] = 1 ## -- strong constraints -- # joint lengths should change with joint lengths errors start = n_errors_reproj + n_errors_smooth frames = np.arange(n_frames) for cix, (a, b) in enumerate(constraints): A_sparse[start + cix*n_frames + frames, n_3d+cix] = 1 # points should change accordingly to match joint lengths too frames = np.arange(n_frames) for cix, (a, b) in enumerate(constraints): pa = point_indices_3d[frames, a] pb = point_indices_3d[frames, b] for k in range(3): A_sparse[start + cix*n_frames + frames, pa*3 + k] = 1 A_sparse[start + cix*n_frames + frames, pb*3 + k] = 1 ## -- weak constraints -- # joint lengths should change with joint lengths errors start = n_errors_reproj + n_errors_smooth + n_errors_lengths frames = np.arange(n_frames) for cix, (a, b) in enumerate(constraints_weak): A_sparse[start + cix*n_frames + frames, n_3d + n_constraints + cix] = 1 # points should change accordingly to match joint lengths too frames = np.arange(n_frames) for cix, (a, b) in enumerate(constraints_weak): pa = point_indices_3d[frames, a] pb = point_indices_3d[frames, b] for k in range(3): A_sparse[start + cix*n_frames + frames, pa*3 + k] = 1 A_sparse[start + cix*n_frames + frames, pb*3 + k] = 1 return A_sparse def _jac_sparsity_triangulation_possible(self, p2ds_full, **kwargs): # initialize sparse jacobian using above function # extend to include alphas from parameters ## TODO: this initialization is really slow for some reason n_cams, n_frames, n_joints, n_possible, _ = p2ds_full.shape good_full = ~np.isnan(p2ds_full[:, :, :, :, 0]) any_good = np.any(good_full, axis=3) n_alphas = np.sum(good_full) n_errors_alphas = np.sum(any_good) p2ds = p2ds_full[:, :, :, 0] A_sparse = self._jac_sparsity_triangulation(p2ds, **kwargs) n_errors, n_params = A_sparse.shape B_sparse = dok_matrix((n_errors + n_errors_alphas, n_params + n_alphas), dtype='int16') for r, c in zip(*A_sparse.nonzero()): B_sparse[r, c] = A_sparse[r, c] point_indices_2d = np.arange(n_cams*n_frames*n_joints)\ .reshape(n_cams, n_frames, n_joints) point_indices_2d_rep = np.repeat(point_indices_2d[:, :, :, None], 2, axis=3) point_indices_2d_good = point_indices_2d_rep[~np.isnan(p2ds)] point_indices_good = point_indices_2d[any_good] alpha_indices = np.zeros((n_cams, n_frames, n_joints, n_possible), dtype='int64') for pnum in range(n_possible): alpha_indices[:, :, :, pnum] = point_indices_2d alpha_indices_good = alpha_indices[good_full] # alphas should change according to the reprojection error for each corresponding point point_indices_2d_good_find = defaultdict(list) for ix, p in enumerate(point_indices_2d_good): point_indices_2d_good_find[p].append(ix) for ix, alpha_index in enumerate(alpha_indices_good): B_sparse[point_indices_2d_good_find[alpha_index], n_params + ix] = 1 # alphas should change according to the alpha errors point_indices_good_find = dict() for ix, p in enumerate(point_indices_good): point_indices_good_find[p] = ix for ix, alpha_index in enumerate(alpha_indices_good): if alpha_index in point_indices_good_find: err_ix = n_errors + point_indices_good_find[alpha_index] B_sparse[err_ix, n_params + ix] = 1 return B_sparse def copy(self): cameras = [cam.copy() for cam in self.cameras] metadata = copy(self.metadata) return CameraGroup(cameras, metadata) def set_rotations(self, rvecs): for cam, rvec in zip(self.cameras, rvecs): cam.set_rotation(rvec) def set_translations(self, tvecs): for cam, tvec in zip(self.cameras, tvecs): cam.set_translation(tvec) def get_rotations(self): rvecs = [] for cam in self.cameras: rvec = cam.get_rotation() rvecs.append(rvec) return np.array(rvecs) def get_translations(self): tvecs = [] for cam in self.cameras: tvec = cam.get_translation() tvecs.append(tvec) return np.array(tvecs) def get_names(self): return [cam.get_name() for cam in self.cameras] def set_names(self, names): for cam, name in zip(self.cameras, names): cam.set_name(name) def average_error(self, p2ds, median=False): p3ds = self.triangulate(p2ds) errors = self.reprojection_error(p3ds, p2ds, mean=True) if median: return np.median(errors) else: return np.mean(errors) def calibrate_rows(self, all_rows, board, init_intrinsics=True, init_extrinsics=True, verbose=True, min_corners_intrinsic=9, **kwargs): assert len(all_rows) == len(self.cameras), \ "Number of camera detections does not match number of cameras" for rows, camera in zip(all_rows, self.cameras): size = camera.get_size() assert size is not None, \ "Camera with name {} has no specified frame size".format(camera.get_name()) if init_intrinsics: objp, imgp = board.get_all_calibration_points(rows) mixed = [(o, i) for (o, i) in zip(objp, imgp) if len(o) >= min_corners_intrinsic] objp, imgp = zip(*mixed) matrix = cv2.initCameraMatrix2D(objp, imgp, tuple(size)) camera.set_camera_matrix(matrix.copy()) camera.zero_distortions() print(self.get_dicts()) for i, (row, cam) in enumerate(zip(all_rows, self.cameras)): all_rows[i] = board.estimate_pose_rows(cam, row) new_rows = [[r for r in rows if r['ids'].size >= 8] for rows in all_rows] merged = merge_rows(new_rows) imgp, extra = extract_points(merged, board, min_cameras=2) if init_extrinsics: rtvecs = extract_rtvecs(merged) if verbose: pprint(get_connections(rtvecs, self.get_names())) rvecs, tvecs = get_initial_extrinsics(rtvecs, self.get_names()) self.set_rotations(rvecs) self.set_translations(tvecs) error = self.bundle_adjust_iter(imgp, extra, verbose=verbose, **kwargs) return error def get_rows_videos(self, videos, board, verbose=True): all_rows = [] for cix, (cam, cam_videos) in enumerate(zip(self.cameras, videos)): rows_cam = [] for vnum, vidname in enumerate(cam_videos): if verbose: print(vidname) try: rows = board.detect_video(vidname, prefix=vnum, progress=verbose) except Exception as e: print("WARNING: board detection failed for video {}".format(vidname)) print(e) rows = [] if verbose: print("{} boards detected".format(len(rows))) rows_cam.extend(rows) all_rows.append(rows_cam) return all_rows def set_camera_sizes_videos(self, videos): for cix, (cam, cam_videos) in enumerate(zip(self.cameras, videos)): rows_cam = [] for vnum, vidname in enumerate(cam_videos): try: params = get_video_params(vidname) size = (params['width'], params['height']) cam.set_size(size) except Exception as e: print("WARNING: camera size detection failed for video {}".format(vidname)) print(e) def calibrate_videos(self, videos, board, init_intrinsics=True, init_extrinsics=True, verbose=True, min_corners_intrinsic=9, **kwargs): """Takes as input a list of list of video filenames, one list of each camera. Also takes a board which specifies what should be detected in the videos""" all_rows = self.get_rows_videos(videos, board, verbose=verbose) if init_extrinsics: self.set_camera_sizes_videos(videos) error = self.calibrate_rows(all_rows, board, init_intrinsics=init_intrinsics, init_extrinsics=init_extrinsics, verbose=verbose, min_corners_intrinsic=min_corners_intrinsic, **kwargs) return error, all_rows def get_dicts(self): out = [] for cam in self.cameras: out.append(cam.get_dict()) return out def from_dicts(arr): cameras = [] for d in arr: if 'fisheye' in d and d['fisheye']: cam = FisheyeCamera.from_dict(d) else: cam = Camera.from_dict(d) cameras.append(cam) return CameraGroup(cameras) def from_names(names, fisheye=False): cameras = [] for name in names: if fisheye: cam = FisheyeCamera(name=name) else: cam = Camera(name=name) cameras.append(cam) return CameraGroup(cameras) def load_dicts(self, arr): for cam, d in zip(self.cameras, arr): cam.load_dict(d) def dump(self, fname): dicts = self.get_dicts() names = ['cam_{}'.format(i) for i in range(len(dicts))] master_dict = dict(zip(names, dicts)) master_dict['metadata'] = self.metadata with open(fname, 'w') as f: toml.dump(master_dict, f) def load(fname): master_dict = toml.load(fname) keys = sorted(master_dict.keys()) items = [master_dict[k] for k in keys if k != 'metadata'] cgroup = CameraGroup.from_dicts(items) if 'metadata' in master_dict: cgroup.metadata = master_dict['metadata'] return cgroup def resize_cameras(self, scale): for cam in self.cameras: cam.resize_camera(scale)