Source code for criticalityMaps.criticality.mp_queue_tools

# -*- coding: utf-8 -*-
"""
Created on Tue Jun  4 07:59:47 2019

@author: PHassett
"""
import multiprocessing as mp
import time


def _execute(function, arguments):
    result = function(*arguments)
    print('At {:24}, {:10} completed process: {:4}'.format(time.ctime(),
          mp.current_process().name, str(mp.current_process().pid)))
    return result


def _worker(input_queue, output_queue):
    for func, args in iter(input_queue.get, 'STOP'):
        result = _execute(func, args)
        output_queue.put(result)


[docs]def runner(tasks, num_processors): """ Run the tasks specified across mutiple processors and return the results in a list. Parameters ---------- tasks - list task list of the form [(func,(arg1, arg2,...,argN))] num_processors - int the number of processors to use Returns ------- results - list list of func return objects for each task """ # Handle undefined num_processors if num_processors is None: num_processors = int(mp.cpu_count() * 0.666) # Do some error checking for the number of processors elif type(num_processors) != int: raise ValueError('num_processors must of type int') elif num_processors < 1: raise Exception('num_processors must be greater than 1.') elif num_processors > mp.cpu_count(): raise Exception('num_processors must be less than the total \ number of processors on the machine. Run:\n\ import multiprocessing as mp\n\ mp.cpu_count()\n\ to determine the number of processors.') # Create task and return queues. task_queue = mp.Queue() done_queue = mp.Queue() # Submit tasks to the task queue. for task in tasks: task_queue.put(task) # Start worker processes. for i in range(num_processors): mp.Process(target=_worker, args=(task_queue, done_queue) ).start() # Get the results and store them in a yaml-friendly object. results = [] for i in range(len(tasks)): results.append(done_queue.get()) # Stop all child processes. for i in range(num_processors): task_queue.put('STOP') # Return the results. return results