Skip to content

Documentation for Parallelize

andrzeng edited this page Sep 5, 2022 · 15 revisions

Frequently in our line of work, for loops take longer than needed to run because they do not take advantage of every processor core. For example, consider the following loop:

processed_data = []
for subject in census:
   subject_data = some_function(subject)
   processed_data.append(subject_data)

Snippets such as these pop up frequently in neurophysiological work, e.g. when running masking operations. If our subject count runs into the thousands, we would have to twiddle our thumbs and make small talk for a while. That is because Python does not automatically recruit multiple cores for you. When we have over 100+ cores at our disposal, we are not making the most out of our resources! This is where the Parallelize utility comes into play. Parallelize will enlist the help of every core available while reducing code clutter. With Parallelize, the above loop reduces to:

processed_data = Parallelize(census, some_function)

Significant time savings are achieved this way. Empirically, we have reduced the runtime for a loop from 7 minutes to less than 20 seconds. Let's dive into the details.

Below is the full implementation of the Parallelize utility:

import multiprocessing
from multiprocessing import Pool
import numpy as np

def __processor_procedure(processor_index, ranges, f):

    global __48162789682__
    data = __48162789682__
    
    __processed = {}
    
    for index in ranges[processor_index]:
        _out = f(data[index])
        __processed[index] = _out

    return __processed



def Parallelize(data, f, num_cores=None):

    num_subjects = len(data)
    if(num_cores == None): 
        num_cores = multiprocessing.cpu_count()
    
    num_cores = min(num_cores, num_subjects, multiprocessing.cpu_count())
    
    cores_to_use = num_cores - 1
    range_dict = {}
    for i in range(cores_to_use):
        range_dict[i] = np.arange(i*int(num_subjects/(cores_to_use)), (i+1)*int(num_subjects/(cores_to_use)))
    range_dict[cores_to_use] = np.arange(num_subjects - num_subjects%cores_to_use, num_subjects)
    
    assert(num_cores == len(range_dict))
    
    pass_data = []
    
    for core_index in range(num_cores):
        pass_data.append((core_index, range_dict, f))
    
    global __48162789682__
    __48162789682__ = data
    
    with Pool(num_cores) as p:
        _returns = p.starmap(__processor_procedure, pass_data)
    
    del __48162789682__
    
    final_dict = {}
    for d in _returns:
        final_dict = {**final_dict, **d}
    
    return np.array(list(final_dict.values())).reshape(len(data),-1)
Clone this wiki locally