Source code for metintos.optiflow

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import cv2
from scipy.interpolate import RectBivariateSpline
from metintos.utils import *


[docs]class NormalizedWrappedInterpolant(object): """2D interpolant with natural index coordinates :var array: 2D array to interpolate :type array: array_like :var i: normalized coordinates along the first axis :type i: array_like :var j: normalized coordinates along the second axis :type j: array_like :var interpolant: wrapped interpolant :type interpolant: R² -> R mapping """
[docs] def __init__(self, array, interpolant_generator=None): """ :param array: 2D array to interpolate :type array: array_like :param interpolant_generator: wrapped interpolant :type interpolant_generator: R² -> (R² -> R) mapping :returns: None """ if interpolant_generator is None: interpolant_generator = RectBivariateSpline self.shape = array.shape self.array = array self.i = np.linspace(0, self.shape[0] - 1, self.shape[0]) self.j = np.linspace(0, self.shape[1] - 1, self.shape[1]) self.interpolant = interpolant_generator(self.i, self.j, array)
[docs] def __call__(self, *args, **kwargs): """ :param args: :type args: :param kwargs: :type kwargs: :returns: """ return self.interpolant(*args, **kwargs)
[docs] def ev(self, *args, **kwargs): """ :param args: :type args: :param kwargs: :type kwargs: :returns: """ return self.interpolant.ev(*args, **kwargs)
[docs]class TwoFrameInterpolant(object): """Performs optical flow or linear interpolation between two consecutive frames :var start_frame: Matrix representation of the variable at the start of the temporal interpolation domain :type start_frame: 2D array :var end_frame: Matrix representation of the variable at the end of the temporal interpolation :type end_frame: 2D array :var flow_calculator: Callable that computes the optical flow between two frames / arrays :type flow_calculator: (R² x R²) -> R² mapping :var spatial_interp: Callable that generates a 2D interpolant of a frame array with normalized coordinates :type spatial_interp: R² -> (R² -> R) :var flow: Optical flow between the start and end frames :type flow: 2D array (R²) """
[docs] def __init__(self, start_frame, end_frame, flow_calculator=None, spatial_interp=None, initial_flow=None): """ :param start_frame: 2D array representation of the variable at the start of the temporal interpolation domain :type start_frame: array_like :param end_frame: 2D array representation of the variable at the end of the temporal interpolation :type end_frame: array_like :param flow_calculator: Callable that computes the optical flow between two frames / arrays :type flow_calculator: (R² x R²) -> R² mapping :param spatial_interp: Callable that generates a 2D interpolant of a frame array with normalized coordinates :type spatial_interp: R² -> (R² -> R) :returns: None """ if spatial_interp is None: spatial_interp = RectBivariateSpline if flow_calculator is None: flow_calculator = FarnebackFlow() if start_frame.shape != end_frame.shape: raise ValueError(f"start_frame and end_frame must have the same shape; instead, they have shapes " f"{start_frame.shape} and {end_frame.shape}") if len(start_frame.shape) != 2: raise ValueError(f"Input frames with shape {start_frame.shape} are not 2D") self.start_frame = start_frame self.end_frame = end_frame self.shape = self.start_frame.shape self.flow_calculator = flow_calculator self.spatial_interp = spatial_interp self.flow = self.flow_calculator(self.start_frame, self.end_frame, initial_flow) r_initial_flow = None if initial_flow is None else -initial_flow self.flow_r = - self.flow_calculator(self.end_frame, self.start_frame, r_initial_flow) self.I_start = NormalizedWrappedInterpolant(start_frame, spatial_interp) self.I_end = NormalizedWrappedInterpolant(end_frame, spatial_interp) self.ii, self.jj = np.meshgrid(self.I_start.i, self.I_start.j, indexing='ij') self.i_max = self.shape[0] - 1 self.j_max = self.shape[1] - 1
[docs] def __call__(self, t): """ :param t: Fraction of the time domain between the start and the end frame. Must fulfill 0 <= t <= 1 :type t: float :returns: Flow-interpolated array at t :rtype: array_like """ if t < 0.0 or t > 1.0: raise ValueError("The temporal fraction for optical flow interpolation must lie between 0 and 1") ii_end = np.clip(self.ii + self.flow[:, :, 1] * (1 - t), 0, self.i_max) jj_end = np.clip(self.jj + self.flow[:, :, 0] * (1 - t), 0, self.j_max) ii_start = np.clip(self.ii - self.flow[:, :, 1] * t, 0, self.i_max) jj_start = np.clip(self.jj - self.flow[:, :, 0] * t, 0, self.j_max) start_translated = self.I_start(ii_start.flatten(), jj_start.flatten(), grid=False).reshape(self.shape) end_translated = self.I_end(ii_end.flatten(), jj_end.flatten(), grid=False).reshape(self.shape) return t * end_translated + (1 - t) * start_translated
[docs]class MultiFrameInterpolant(object): """Performs optical flow or linear interpolation between several consecutive, equally spaced frames :var interpolants: List of pairwise interpolants :type interpolants: List[TwoFrameInterpolant] """
[docs] def __init__(self, frames, flow_calculator=None, spatial_interp=None, t_axis=None, initial_flows=None): """ :param frames: List of 2D arrays :type frames: list[array_like] :param flow_calculator: Callable that computes the optical flow between two frames / arrays :type flow_calculator: (R² x R²) -> R² mapping :param spatial_interp: Callable that generates a 2D interpolant of a frame array with normalized coordinates :type spatial_interp: R² -> (R² -> R) :param t_axis: (optional) array containing the timestamps of t :type t_axis: array_like(dtype='timedelta64') :param initial_flows: List of arrays of shape N x M x 2 representing initial guesses for the optical flows :type initial_flows: list[array_like] :returns: None """ assert len(frames) > 1 if initial_flows is None: initial_flows = [None for _ in range(len(frames) - 1)] self.frames = frames self.t_axis = t_axis self.interpolants = [TwoFrameInterpolant(prev_frame, next_frame, flow_calculator, spatial_interp, flow) for prev_frame, next_frame, flow in zip(frames[:-1], frames[1:], initial_flows)]
[docs] def __call__(self, t): """ :param t: Fraction of the time domain between the start and the end frame. Must fulfill 0 <= t <= n_frames :type t: float :returns: Flow-interpolated array at t :rtype: array_like """ if t < 0.0 or t > len(self.frames) - 1: raise ValueError("The temporal fraction must lie between 0 and n - 1, where n is the number of frames") i = int(t) t_frac = t - i return self.interpolants[i](t_frac)
[docs] def eval_at_t(self, t, check_nans=True): """ :param t: Union[np.datetime64, np.timedelta64] :type t: :param check_nans: if True, replace any nans in the input with zeros :type check_nans: bool :returns: Flow-interpolated array at timestamp t :rtype: array_like """ upper_bound = self.t_axis[-1] lower_bound = self.t_axis[0] if self.t_axis is None: raise ValueError(f"The current {self.__class__} instance was not initialized with a temporal axis") if t < lower_bound: raise ValueError(f"The timestamp {t} is below the lower temporal axis bound of {lower_bound}") if t > upper_bound: raise ValueError(f"The timestamp {t} is below the upper temporal axis bound of {upper_bound}") idx_low, idx_high = get_bracketing_indexes(self.t_axis, t) t_high = self.t_axis[idx_high] t_low = self.t_axis[idx_low] frac = float(t - t_low) / float(t_high - t_low) assert 0 <= frac assert 1 >= frac output = self.interpolants[idx_low](frac) if check_nans: assert not np.isnan(output.sum()) return output
[docs]class FarnebackFlow(object): """Optical flow calculator using the algorithm of Farneback (2003) :var parameters: arguments passed to the Farneback algorithm :type parameters: dict """
[docs] def __init__(self, **kwargs): """Optical flow calculator using the algorithm of Farneback (2003) :param kwargs: arguments passed to the Farneback algorithm :type kwargs: dict :returns: None """ self.parameters = {'iterations': 2, 'levels': 9, 'poly_n': 10, 'poly_sigma': 1.934, 'pyr_scale': 0.85, 'scale': 25.0, 'winsize': 77, 'flags': cv2.OPTFLOW_FARNEBACK_GAUSSIAN } # self.parameters = {'pyr_scale': 0.5, # 'levels': 5, # 'winsize': 100, # 'iterations': 4, # 'poly_n': 9, # 'poly_sigma': 1.6, # 'flags': cv2.OPTFLOW_FARNEBACK_GAUSSIAN, # 'scale': 1e5} self.parameters.update(kwargs) for key in ('levels', 'winsize', 'iterations', 'poly_n'): self.parameters[key] = int(self.parameters[key]) self.scale = self.parameters['scale'] del self.parameters['scale']
[docs] def __call__(self, start_frame, end_frame, initial_flow=None, check_nans=True): """Returns the computed optical flow :param start_frame: 2D array (N x M) representing the start frame :type start_frame: array_like :param end_frame: 2D array (N x M) representing the end frame :type end_frame: array_like :param initial_flow: (optional) N x M x 2 array to initialize the flow :type initial_flow: array_like or None :param start_frame: 2D array (N x M) representing the start frame :type start_frame: array_like :param check_nans: (optional) if True, replace any nans in the input with zeros :type check_nans: bool :returns: optical flow between the frames :rtype: array_like """ flow = cv2.calcOpticalFlowFarneback(self.scale * start_frame, self.scale * end_frame, initial_flow, **self.parameters) if check_nans: assert not np.isnan(flow.sum()) return flow
class DummyFlow(object): def __init__(self): pass def __call__(self, start_frame, end_frame, initial_flow=None): return np.zeros(start_frame.shape + (2,))