Skip to content

Commit

Permalink
Merge pull request flatironinstitute#614 from pgunn/dev_yapf
Browse files Browse the repository at this point in the history
Some careful yapfing of some quiet files.
  • Loading branch information
epnev authored Aug 7, 2019
2 parents 5294abb + b613b13 commit c3970c6
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 304 deletions.
109 changes: 61 additions & 48 deletions caiman/cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" functions related to the creation and management of the cluster
We put arrays on disk as raw bytes, extending along the first dimension.
Expand Down Expand Up @@ -40,7 +39,12 @@

logger = logging.getLogger(__name__)

def extract_patch_coordinates(dims:Tuple, rf:Union[List,Tuple], stride:Union[List[int],Tuple], border_pix:int=0, indices=[slice(None)]*2) -> Tuple[List, List]:

def extract_patch_coordinates(dims: Tuple,
rf: Union[List, Tuple],
stride: Union[List[int], Tuple],
border_pix: int = 0,
indices=[slice(None)] * 2) -> Tuple[List, List]:
"""
Partition the FOV in patches
and return the indexed in 2D and 1D (flatten, order='F') formats
Expand All @@ -58,25 +62,24 @@ def extract_patch_coordinates(dims:Tuple, rf:Union[List,Tuple], stride:Union[Lis

sl_start = [0 if sl.start is None else sl.start for sl in indices]
sl_stop = [dim if sl.stop is None else sl.stop for (sl, dim) in zip(indices, dims)]
sl_step = [1 for sl in indices] # not used
sl_step = [1 for sl in indices] # not used
dims_large = dims
dims = np.minimum(np.array(dims) - border_pix, sl_stop) - np.maximum(border_pix, sl_start)

coords_flat = []
shapes = []
iters = [list(range(rf[i], dims[i] - rf[i], 2 * rf[i] - stride[i])) + [dims[i] - rf[i]]
for i in range(len(dims))]
iters = [list(range(rf[i], dims[i] - rf[i], 2 * rf[i] - stride[i])) + [dims[i] - rf[i]] for i in range(len(dims))]

coords = np.empty(list(map(len, iters)) + [len(dims)], dtype=np.object)
for count_0, xx in enumerate(iters[0]):
coords_x = np.arange(xx - rf[0], xx + rf[0] + 1)
coords_x = coords_x[(coords_x >= 0) & (coords_x < dims[0])]
coords_x += border_pix*0 + np.maximum(sl_start[0], border_pix)
coords_x += border_pix * 0 + np.maximum(sl_start[0], border_pix)

for count_1, yy in enumerate(iters[1]):
coords_y = np.arange(yy - rf[1], yy + rf[1] + 1)
coords_y = coords_y[(coords_y >= 0) & (coords_y < dims[1])]
coords_y += border_pix*0 + np.maximum(sl_start[1], border_pix)
coords_y += border_pix * 0 + np.maximum(sl_start[1], border_pix)

if len(dims) == 2:
idxs = np.meshgrid(coords_x, coords_y)
Expand All @@ -86,7 +89,7 @@ def extract_patch_coordinates(dims:Tuple, rf:Union[List,Tuple], stride:Union[Lis

coords_ = np.ravel_multi_index(idxs, dims_large, order='F')
coords_flat.append(coords_.flatten())
else: # 3D data
else: # 3D data

if border_pix > 0:
raise Exception(
Expand All @@ -109,7 +112,8 @@ def extract_patch_coordinates(dims:Tuple, rf:Union[List,Tuple], stride:Union[Lis


#%%
def apply_to_patch(mmap_file, shape:Tuple[Any,Any,Any], dview, rf, stride, function, *args, **kwargs) -> Tuple[List,Any,Tuple]:
def apply_to_patch(mmap_file, shape: Tuple[Any, Any, Any], dview, rf, stride, function, *args,
**kwargs) -> Tuple[List, Any, Tuple]:
"""
apply function to patches in parallel or not
Expand Down Expand Up @@ -152,11 +156,9 @@ def apply_to_patch(mmap_file, shape:Tuple[Any,Any,Any], dview, rf, stride, funct
stride1 = stride
stride2 = stride

idx_flat, idx_2d = extract_patch_coordinates(
(d1, d2), rf=(rf1, rf2), stride=(stride1, stride2))
idx_flat, idx_2d = extract_patch_coordinates((d1, d2), rf=(rf1, rf2), stride=(stride1, stride2))

shape_grid = tuple(np.ceil(
(d1 * 1. / (rf1 * 2 - stride1), d2 * 1. / (rf2 * 2 - stride2))).astype(np.int))
shape_grid = tuple(np.ceil((d1 * 1. / (rf1 * 2 - stride1), d2 * 1. / (rf2 * 2 - stride2))).astype(np.int))
if d1 <= rf1 * 2:
shape_grid = (1, shape_grid[1])
if d2 <= rf2 * 2:
Expand All @@ -167,8 +169,7 @@ def apply_to_patch(mmap_file, shape:Tuple[Any,Any,Any], dview, rf, stride, funct
args_in = []

for id_f, id_2d in zip(idx_flat[:], idx_2d[:]):
args_in.append((mmap_file.filename, id_f,
id_2d, function, args, kwargs))
args_in.append((mmap_file.filename, id_f, id_2d, function, args, kwargs))

logger.debug("Flat index is of length " + str(len(idx_flat)))
if dview is not None:
Expand All @@ -184,19 +185,20 @@ def apply_to_patch(mmap_file, shape:Tuple[Any,Any,Any], dview, rf, stride, funct

file_res = list(map(function_place_holder, args_in))
return file_res, idx_flat, shape_grid


#%%


def function_place_holder(args_in:Tuple) -> np.ndarray:
def function_place_holder(args_in: Tuple) -> np.ndarray:
#todo: todocument

file_name, idx_, shapes, function, args, kwargs = args_in
Yr, _, _ = load_memmap(file_name)
Yr = Yr[idx_, :]
Yr.filename = file_name
_, T = Yr.shape
Y = np.reshape(Yr, (shapes[1], shapes[0], T),
order='F').transpose([2, 0, 1])
Y = np.reshape(Yr, (shapes[1], shapes[0], T), order='F').transpose([2, 0, 1])
[T, d1, d2] = Y.shape

res_fun = function(Y, *args, **kwargs)
Expand All @@ -208,10 +210,11 @@ def function_place_holder(args_in:Tuple) -> np.ndarray:

return res_fun


#%%


def start_server(slurm_script:str=None, ipcluster:str="ipcluster", ncpus:int=None) -> None:
def start_server(slurm_script: str = None, ipcluster: str = "ipcluster", ncpus: int = None) -> None:
"""
programmatically start the ipyparallel server
Expand All @@ -220,7 +223,7 @@ def start_server(slurm_script:str=None, ipcluster:str="ipcluster", ncpus:int=Non
number of processors
ipcluster : str
ipcluster binary file name; requires 4 path separators on Windows. ipcluster="C:\\\\Anaconda2\\\\Scripts\\\\ipcluster.exe"
ipcluster binary file name; requires 4 path separators on Windows. ipcluster="C:\\\\Anaconda3\\\\Scripts\\\\ipcluster.exe"
Default: "ipcluster"
"""
logger.info("Starting cluster...")
Expand All @@ -230,21 +233,21 @@ def start_server(slurm_script:str=None, ipcluster:str="ipcluster", ncpus:int=Non
if slurm_script is None:

if ipcluster == "ipcluster":
subprocess.Popen(
"ipcluster start -n {0}".format(ncpus), shell=True, close_fds=(os.name != 'nt'))
subprocess.Popen("ipcluster start -n {0}".format(ncpus), shell=True, close_fds=(os.name != 'nt'))
else:
subprocess.Popen(shlex.split(
"{0} start -n {1}".format(ipcluster, ncpus)), shell=True, close_fds=(os.name != 'nt'))
subprocess.Popen(shlex.split("{0} start -n {1}".format(ipcluster, ncpus)),
shell=True,
close_fds=(os.name != 'nt'))
time.sleep(1.5)
# Check that all processes have started
client = ipyparallel.Client()
time.sleep(1.5)
while len(client) < ncpus:
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
sys.stdout.write(".") # Give some visual feedback of things starting
sys.stdout.flush() # (de-buffered)
time.sleep(0.5)
logger.debug('Making sure everything is up and running')
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go
client.direct_view().execute('__a=1', block=True) # when done on all, we're set to go
else:
shell_source(slurm_script)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
Expand All @@ -256,18 +259,18 @@ def start_server(slurm_script:str=None, ipcluster:str="ipcluster", ncpus:int=Non
c.close()
sys.stdout.write("start_server: done\n")

def shell_source(script:str) -> None:

def shell_source(script: str) -> None:
""" Run a source-style bash script, copy resulting env vars to current process. """
# XXX This function is weird and maybe not a good idea. People easily might expect
# it to handle conditionals. Maybe just make them provide a key-value file
#introduce echo to indicate the end of the output
pipe = subprocess.Popen(". %s; env; echo 'FINISHED_CLUSTER'" %
script, stdout=subprocess.PIPE, shell=True)
pipe = subprocess.Popen(". %s; env; echo 'FINISHED_CLUSTER'" % script, stdout=subprocess.PIPE, shell=True)

env = dict()
while True:
line = pipe.stdout.readline().decode('utf-8').rstrip()
if 'FINISHED_CLUSTER' in line: # find the keyword set above to determine the end of the output stream
if 'FINISHED_CLUSTER' in line: # find the keyword set above to determine the end of the output stream
break
logger.debug("shell_source parsing line[" + str(line) + "]")
lsp = str(line).split("=", 1)
Expand All @@ -277,7 +280,8 @@ def shell_source(script:str) -> None:
os.environ.update(env)
pipe.stdout.close()

def stop_server(ipcluster:str='ipcluster', pdir:str=None, profile:str=None, dview=None) -> None:

def stop_server(ipcluster: str = 'ipcluster', pdir: str = None, profile: str = None, dview=None) -> None:
"""
programmatically stops the ipyparallel server
Expand Down Expand Up @@ -315,7 +319,7 @@ def stop_server(ipcluster:str='ipcluster', pdir:str=None, profile:str=None, dvie
try:
shutil.rmtree('./log/')
except:
logger.info('creating log folder') # FIXME Not what this means
logger.info('creating log folder') # FIXME Not what this means

files = glob.glob('*.log')
os.mkdir('./log')
Expand All @@ -325,11 +329,15 @@ def stop_server(ipcluster:str='ipcluster', pdir:str=None, profile:str=None, dvie

else:
if ipcluster == "ipcluster":
proc = subprocess.Popen(
"ipcluster stop", shell=True, stderr=subprocess.PIPE, close_fds=(os.name != 'nt'))
proc = subprocess.Popen("ipcluster stop",
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))
else:
proc = subprocess.Popen(shlex.split(ipcluster + " stop"),
shell=True, stderr=subprocess.PIPE, close_fds=(os.name != 'nt'))
shell=True,
stderr=subprocess.PIPE,
close_fds=(os.name != 'nt'))

line_out = proc.stderr.readline()
if b'CRITICAL' in line_out:
Expand All @@ -348,10 +356,15 @@ def stop_server(ipcluster:str='ipcluster', pdir:str=None, profile:str=None, dvie
proc.stderr.close()

logger.info("stop_cluster(): done")


#%%


def setup_cluster(backend:str='multiprocessing', n_processes:int=None, single_thread:bool=False, ignore_preexisting:bool=False) -> Tuple[Any, Any, Optional[int]]:
def setup_cluster(backend: str = 'multiprocessing',
n_processes: int = None,
single_thread: bool = False,
ignore_preexisting: bool = False) -> Tuple[Any, Any, Optional[int]]:
"""Setup and/or restart a parallel cluster.
Args:
backend: str
Expand Down Expand Up @@ -386,7 +399,7 @@ def setup_cluster(backend:str='multiprocessing', n_processes:int=None, single_th
stop_server()
except:
logger.debug('Nothing to stop')
slurm_script = '/mnt/home/agiovann/SOFTWARE/CaImAn/SLURM/slurmStart.sh'
slurm_script = '/mnt/home/agiovann/SOFTWARE/CaImAn/SLURM/slurmStart.sh' # FIXME: Make this a documented environment variable
logger.info([str(n_processes), slurm_script])
start_server(slurm_script=slurm_script, ncpus=n_processes)
pdir, profile = os.environ['IPPPDIR'], os.environ['IPPPROFILE']
Expand All @@ -397,7 +410,7 @@ def setup_cluster(backend:str='multiprocessing', n_processes:int=None, single_th
stop_server()
start_server(ncpus=n_processes)
c = Client()
logger.info('Started ipyparallel cluster: Using ' + str(len(c)) + ' processes')
logger.info(f'Started ipyparallel cluster: Using {len(c)} processes')
dview = c[:len(c)]

elif (backend == 'multiprocessing') or (backend == 'local'):
Expand All @@ -411,16 +424,16 @@ def setup_cluster(backend:str='multiprocessing', n_processes:int=None, single_th
'A cluster is already runnning. Terminate with dview.terminate() if you want to restart.')
if (platform.system() == 'Darwin') and (sys.version_info > (3, 0)):
try:
if 'kernel' in get_ipython().trait_names(): # type: ignore
# If you're on OSX and you're running under Jupyter or Spyder,
# which already run the code in a forkserver-friendly way, this
# can eliminate some setup and make this a reasonable approach.
# Otherwise, seting VECLIB_MAXIMUM_THREADS=1 or using a different
# blas/lapack is the way to avoid the issues.
# See https://github.com/flatironinstitute/CaImAn/issues/206 for more
# info on why we're doing this (for now).
if 'kernel' in get_ipython().trait_names(): # type: ignore
# If you're on OSX and you're running under Jupyter or Spyder,
# which already run the code in a forkserver-friendly way, this
# can eliminate some setup and make this a reasonable approach.
# Otherwise, seting VECLIB_MAXIMUM_THREADS=1 or using a different
# blas/lapack is the way to avoid the issues.
# See https://github.com/flatironinstitute/CaImAn/issues/206 for more
# info on why we're doing this (for now).
multiprocessing.set_start_method('forkserver', force=True)
except: # If we're not running under ipython, don't do anything.
except: # If we're not running under ipython, don't do anything.
pass
c = None

Expand Down
Loading

0 comments on commit c3970c6

Please sign in to comment.