Module gplately.parallel
Tools to execute routines efficiently by parallelising them over several threads. This uses multiple processing units.
Expand source code
"""Tools to execute routines efficiently by parallelising
them over several threads. This uses multiple processing units.
"""
from multiprocessing import Pool, Process, Queue, cpu_count
class Parallel(object):
"""A class that uses multiple processors with `multiprocessing`
to execute routines in parallel over several threads.
Parameters
-----------
nprocs : int, default=1
The number of separate executions of a process. By default,
a single thread is run.
"""
def __init__(self, nprocs=1):
self.nprocs = nprocs
def parallelise_routine(self, function, *args, **kwargs):
"""Execute a routine over multiple threads on different
processors, ultimately reducing computation time.
`parallelise_routine` permits one item through the process
queue when an executed item is extracted with get().
Parameters
----------
self.nprocs : int, default=1
The number of separate executions of a process. By
default, a single thread is run.
function : method from an instance of an object
The process to be executed in parallel. Should be
supplied as module.class.method (if belonging to a class)
or module.method.
*args : tuple
Contains all necessary input parameters for the ‘function’.
**kwargs : dict
Keyword arguments for the ‘function’.
"""
if self.nprocs == 1:
# single thread
result = function(*args, **kwargs)
return result
elif self.nprocs > 1:
# more than one processor - game on
results = [[] for i in range(n)]
processes = []
q_in = Queue(1)
q_out = Queue()
for i in range(self.nprocs):
pass_args = [function]
pass_args.extend(args)
p = Process(target=self._func_queue, args=tuple(pass_args), kwargs=kwargs)
processes.append(p)
for p in processes:
p.daemon = True
p.start()
# put items in the queue
sent = [q_in.put((i,)) for i in range(n)]
[q_in.put((None,)) for _ in range(nprocs)]
# get the results
results = []
for i in range(len(sent)):
index, result = q_out.get()
results[index] = result
# wait until each processor has finished
[p.join() for p in processes]
return results
def _func_queue(self, function, q_in, q_out, *args, **kwargs):
while True:
pos, input_args = q_in.get()
if pos is None:
break
res = function(*input_args, **kwargs)
q_out.put((pos, res))
return
Classes
class Parallel (nprocs=1)
-
A class that uses multiple processors with
multiprocessing
to execute routines in parallel over several threads.Parameters
nprocs
:int
, default=1
- The number of separate executions of a process. By default, a single thread is run.
Expand source code
class Parallel(object): """A class that uses multiple processors with `multiprocessing` to execute routines in parallel over several threads. Parameters ----------- nprocs : int, default=1 The number of separate executions of a process. By default, a single thread is run. """ def __init__(self, nprocs=1): self.nprocs = nprocs def parallelise_routine(self, function, *args, **kwargs): """Execute a routine over multiple threads on different processors, ultimately reducing computation time. `parallelise_routine` permits one item through the process queue when an executed item is extracted with get(). Parameters ---------- self.nprocs : int, default=1 The number of separate executions of a process. By default, a single thread is run. function : method from an instance of an object The process to be executed in parallel. Should be supplied as module.class.method (if belonging to a class) or module.method. *args : tuple Contains all necessary input parameters for the ‘function’. **kwargs : dict Keyword arguments for the ‘function’. """ if self.nprocs == 1: # single thread result = function(*args, **kwargs) return result elif self.nprocs > 1: # more than one processor - game on results = [[] for i in range(n)] processes = [] q_in = Queue(1) q_out = Queue() for i in range(self.nprocs): pass_args = [function] pass_args.extend(args) p = Process(target=self._func_queue, args=tuple(pass_args), kwargs=kwargs) processes.append(p) for p in processes: p.daemon = True p.start() # put items in the queue sent = [q_in.put((i,)) for i in range(n)] [q_in.put((None,)) for _ in range(nprocs)] # get the results results = [] for i in range(len(sent)): index, result = q_out.get() results[index] = result # wait until each processor has finished [p.join() for p in processes] return results def _func_queue(self, function, q_in, q_out, *args, **kwargs): while True: pos, input_args = q_in.get() if pos is None: break res = function(*input_args, **kwargs) q_out.put((pos, res)) return
Methods
def parallelise_routine(self, function, *args, **kwargs)
-
Execute a routine over multiple threads on different processors, ultimately reducing computation time.
parallelise_routine
permits one item through the process queue when an executed item is extracted with get().Parameters
self.nprocs : int, default=1 The number of separate executions of a process. By default, a single thread is run.
function
:method from an instance
ofan object
- The process to be executed in parallel. Should be supplied as module.class.method (if belonging to a class) or module.method.
*args
:tuple
- Contains all necessary input parameters for the ‘function’.
**kwargs
:dict
- Keyword arguments for the ‘function’.
Expand source code
def parallelise_routine(self, function, *args, **kwargs): """Execute a routine over multiple threads on different processors, ultimately reducing computation time. `parallelise_routine` permits one item through the process queue when an executed item is extracted with get(). Parameters ---------- self.nprocs : int, default=1 The number of separate executions of a process. By default, a single thread is run. function : method from an instance of an object The process to be executed in parallel. Should be supplied as module.class.method (if belonging to a class) or module.method. *args : tuple Contains all necessary input parameters for the ‘function’. **kwargs : dict Keyword arguments for the ‘function’. """ if self.nprocs == 1: # single thread result = function(*args, **kwargs) return result elif self.nprocs > 1: # more than one processor - game on results = [[] for i in range(n)] processes = [] q_in = Queue(1) q_out = Queue() for i in range(self.nprocs): pass_args = [function] pass_args.extend(args) p = Process(target=self._func_queue, args=tuple(pass_args), kwargs=kwargs) processes.append(p) for p in processes: p.daemon = True p.start() # put items in the queue sent = [q_in.put((i,)) for i in range(n)] [q_in.put((None,)) for _ in range(nprocs)] # get the results results = [] for i in range(len(sent)): index, result = q_out.get() results[index] = result # wait until each processor has finished [p.join() for p in processes] return results