Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optical flow parallelization #73

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cobrawap/pipeline/stage04_wave_detection/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use rule template as optical_flow with:
script = SCRIPTS / 'optical_flow.py'
params:
params('alpha', 'max_Niter', 'convergence_limit', 'gaussian_sigma',
'derivative_filter', 'use_phases', config=config)
'derivative_filter', 'use_phases', 'n_jobs', 'max_padding_iterations', config=config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters would also need to be added to the config_template.yaml file.
I think the number of jobs isn't really a parameter that should be set by the user, but rather set automatically by inferring the available resources, for example, by using multiprocessing.cpu_count()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok we will modify the config_template.yaml file coherently.
It may depend on the specific user and platform.
In addition, multiprocessing.cpu_count() does not differentiate between physical and logical cores. Thus allowing the optical_flow.py script to use all the output of multiprocessing.cpu_count() may not result into computational advantage and may interfere with other processes running.
For this reason we suggest the usage of psutil library and a ratio of psutil.cpu_count(logical = False) output to set the number of parallel processes based on the number of physical cores, in case the user does not set explicitly the n_jobs parameter (which can be set to None as default).

output:
Path('{dir}') / 'optical_flow' / f'optical_flow.{config.NEO_FORMAT}',
output_img = Path('{dir}') / 'optical_flow' / f'optical_flow.{config.PLOT_FORMAT}'
Expand Down
108 changes: 69 additions & 39 deletions cobrawap/pipeline/stage04_wave_detection/scripts/optical_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import numpy as np
from copy import copy
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from tqdm import tqdm
from utils.io_utils import load_neo, write_neo, save_plot
from utils.parse import none_or_str, str_to_bool
from utils.neo_utils import imagesequence_to_analogsignal, analogsignal_to_imagesequence
Expand All @@ -34,6 +36,10 @@
help='Filter kernel to use for calculating spatial derivatives')
CLI.add_argument("--use_phases", nargs='?', type=str_to_bool, default=False,
help='whether to use signal phase instead of amplitude')
CLI.add_argument("--max_padding_iterations", nargs='?', type=int, default=1000,
help='maximum number of padding interpolation iterations')
CLI.add_argument("--n_jobs", nargs='?', type=int, default=1,
help='number of parallel horn_schunck_step executions')

def horn_schunck_step(frame, next_frame, alpha, max_Niter, convergence_limit,
kernelHS, kernelT, kernelX, kernelY,
Expand Down Expand Up @@ -98,60 +104,81 @@ def compute_derivatives(frame, next_frame, kernelX, kernelY,

def horn_schunck(frames, alpha, max_Niter, convergence_limit,
kernelHS, kernelT, kernelX, kernelY,
are_phases=False):
are_phases=False, n_jobs=1, max_padding_iterations=1000):

nan_channels = np.where(np.bitwise_not(np.isfinite(frames[0])))
frames = interpolate_empty_sites(frames, are_phases)

vector_frames = np.zeros(frames.shape, dtype=complex)

for i, frame in enumerate(frames[:-1]):
next_frame = frames[i+1]

vector_frames[i] = horn_schunck_step(frame,
next_frame,
alpha=alpha,
max_Niter=max_Niter,
convergence_limit=convergence_limit,
kernelHS=kernelHS,
kernelT=kernelT,
kernelX=kernelX,
kernelY=kernelY,
are_phases=are_phases)
vector_frames[i][nan_channels] = np.nan + np.nan*1j
frames = interpolate_empty_sites(frames, are_phases=are_phases,
max_iters=max_padding_iterations, n_jobs=n_jobs)

print("Calculating horn schunck steps")
vector_frames = Parallel(n_jobs=n_jobs)(
delayed(horn_schunck_step)
(frames[i],
frames[i+1],
alpha=alpha,
max_Niter=max_Niter,
convergence_limit=convergence_limit,
kernelHS=kernelHS,
kernelT=kernelT,
kernelX=kernelX,
kernelY=kernelY,
are_phases=are_phases)
for i in tqdm(range(len(frames[:-1])), ascii=True))

vector_frames = np.asarray(vector_frames, dtype=complex)
vector_frames[:,nan_channels[0],nan_channels[1]] = np.nan + np.nan*1j
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formulation does not generalize to possible 1D frames

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the 1D case is represented as a vector-like matrix of shape (dim_x, 1), this notation would still be consistent.
Otherwise if the shape would just be (dim_x), then the script would break here and elsewhere in the code, e.g.

  1. in many parts of the code we explicitly extract the shape:
    dim_y, dim_x = frames[0].shape
  1. in plot_opticalflow function we assume 2D arrays and explicitly extract the shape:
    dim_y, dim_x = vec_frame.shape

And could be not properly defined the algorithm when we compute the derivatives on both the x and y axes:

        fx = phase_conv2d(frame, kernelX) \
           + phase_conv2d(next_frame, kernelX)
        fy = phase_conv2d(frame, kernelY) \
           + phase_conv2d(next_frame, kernelY)


frames[:,nan_channels[0],nan_channels[1]] = np.nan
return vector_frames


def interpolate_empty_sites(frames, are_phases=False):
if np.isfinite(frames).all():
return frames
dim_y, dim_x = frames[0].shape
grid_y, grid_x = np.meshgrid([-1,0,1],[-1,0,1], indexing='ij')

for i, frame in enumerate(frames):
new_frame = copy(frame)
while not np.isfinite(new_frame).all():
y, x = np.where(np.bitwise_not(np.isfinite(new_frame)))
def interpolation_step(frame, grid_y, grid_x, are_phases=False, max_iters=1000):
dim_y, dim_x = frame.shape
if np.isfinite(frame).all():
return frame
else:
for _ in range(max_iters):
new_frame = copy(frame)
y, x = np.where(np.bitwise_not(np.isfinite(frame)))
# loop over nan-sites
for xi, yi in zip(x,y):
for xi, yi in zip(x, y):

neighbours = []
# collect neighbours of each site
for dx, dy in zip(grid_x.flatten(), grid_y.flatten()):
xn = xi+dx
yn = yi+dy
xn = xi + dx
yn = yi + dy
if (0 <= xn) & (xn < dim_x) & (0 <= yn) & (yn < dim_y):
neighbours.append(frames[i, yn, xn])
neighbours.append(frame[yn, xn])
# average over neihbour values
if np.isfinite(neighbours).any():
if are_phases:
vectors = np.exp(1j*np.array(neighbours))
new_frame[yi,xi] = np.angle(np.nansum(vectors))
vectors = np.exp(1j * np.array(neighbours))
new_frame[yi, xi] = np.angle(np.nansum(vectors))
else:
new_frame[yi,xi] = np.nansum(neighbours)
frames[i] = new_frame
return frames
new_frame[yi, xi] = np.nansum(neighbours)
frame = new_frame
if np.isfinite(frame).all():
break
return frame


def interpolate_empty_sites(frames, are_phases=False, max_iters=1000, n_jobs=1):
if np.isfinite(frames).all():
return frames
else:
grid_y, grid_x = np.meshgrid([-1, 0, 1], [-1, 0, 1], indexing='ij')
print('Frames interpolation')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove print statement. instead you could set this as a title for tqdm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we will add a title to the tqdm progress bar.

frames = Parallel(n_jobs=n_jobs)(
delayed(interpolation_step)
(frames[i],
grid_y=grid_y,
grid_x=grid_x,
are_phases=are_phases,
max_iters=max_iters)
for i in tqdm(range(len(frames)), ascii=True))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tqdm would be a new dependency of cobrawap and would need to be added to the environment and pyproject definition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will update the environment file and the pyproject accordingly.

frames = np.asarray(frames)
return frames


def smooth_frames(frames, sigma):
Expand Down Expand Up @@ -259,7 +286,10 @@ def is_phase_signal(signal, use_phases):
kernelY=kernel.y,
kernelT=kernelT,
kernelHS=kernelHS,
are_phases=args.use_phases)
are_phases=args.use_phases,
n_jobs=args.n_jobs,
max_padding_iterations=args.max_padding_iterations)


if np.sum(args.gaussian_sigma):
vector_frames = smooth_frames(vector_frames, sigma=args.gaussian_sigma)
Expand Down