create contextmanagers to wrap multiprocess terminate/close and join

Registered by Marco Morais

The multiprocess logic in ScenarioRunner subclasses is currently repeated in all run methods. An example from ContinuousScenarioRunner is pasted below.

    def _run_scenario_continuously_for_times(self, cls, method, context, args,
                                             times, concurrent, timeout):
        test_args = [(i, cls, method, context["admin"],
                      random.choice(context["users"]), args)
                     for i in range(times)]

        pool = multiprocessing.Pool(concurrent)
        iter_result = pool.imap(base._run_scenario_once, test_args)

        results = []

        for i in range(len(test_args)):
            try:
                result = iter_result.next(timeout)
            except multiprocessing.TimeoutError as e:
                result = {"time": timeout, "idle_time": 0,
                          "error": utils.format_exc(e)}
            results.append(result)

        pool.close()
        pool.join()

        return results

This repeated logic can be more easily reused across ScenarioRunner subclasses by wrapping multiprocess pool management in context manager. The wrapper is shown below.

class ParallelImap(object):
    """Executes multiple invocations of a function in parallel."""

    def __init__(self, fn, *args, **kwargs):
        self._fn = fn
        self._args = args
        self._concurrent = kwargs.get('concurrent',
                                      multiprocessing.cpu_count())
        self._timeout = kwargs.get('timeout', None)
        self._pool = None
        self._iter = None

    def __enter__(self):
        self._pool = multiprocessing.Pool(self._concurrent)
        self._iter = self._pool.imap(self._fn, *self._args)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type is not None:
            # When an exception is thrown stop accepting new jobs
            # and abort pending jobs without waiting.
            self._pool.terminate()
            return True
        # Stop accepting new jobs and wait for pending jobs to finish.
        self._pool.close()
        self._pool.join()

    def __iter__(self):
        return self._iter

    def next(self):
        return self._iter.next(self._timeout)

Blueprint information

Status:
Complete
Approver:
Boris Pavlovic
Priority:
Not
Drafter:
Marco Morais
Direction:
Approved
Assignee:
Marco Morais
Definition:
Obsolete
Series goal:
None
Implementation:
Unknown
Milestone target:
None
Completed by
Boris Pavlovic

Related branches

Sprints

Whiteboard

(?)

Work Items

This blueprint contains Public information 
Everyone can see this information.

Subscribers

No subscribers.