aboutsummaryrefslogtreecommitdiff
path: root/pyee/executor.py
blob: 25df77455018e812b874f1b6b9e92b8600e6e262 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# -*- coding: utf-8 -*-

from concurrent.futures import Executor, Future, ThreadPoolExecutor
from types import TracebackType
from typing import Any, Callable, Dict, Optional, Tuple, Type

from pyee.base import EventEmitter

__all__ = ["ExecutorEventEmitter"]


class ExecutorEventEmitter(EventEmitter):
    """An event emitter class which runs handlers in a ``concurrent.futures``
    executor.

    By default, this class creates a default ``ThreadPoolExecutor``, but
    a custom executor may also be passed in explicitly to, for instance,
    use a ``ProcessPoolExecutor`` instead.

    This class runs all emitted events on the configured executor. Errors
    captured by the resulting Future are automatically emitted on the
    ``error`` event. This is unlike the EventEmitter, which have no error
    handling.

    The underlying executor may be shut down by calling the ``shutdown``
    method. Alternately you can treat the event emitter as a context manager::

        with ExecutorEventEmitter() as ee:
            # Underlying executor open

            @ee.on('data')
            def handler(data):
                print(data)

            ee.emit('event')

        # Underlying executor closed

    Since the function call is scheduled on an executor, emit is always
    non-blocking.

    No effort is made to ensure thread safety, beyond using an executor.
    """

    def __init__(self, executor: Executor = None):
        super(ExecutorEventEmitter, self).__init__()
        if executor:
            self._executor: Executor = executor
        else:
            self._executor = ThreadPoolExecutor()

    def _emit_run(
        self,
        f: Callable,
        args: Tuple[Any, ...],
        kwargs: Dict[str, Any],
    ):
        future: Future = self._executor.submit(f, *args, **kwargs)

        @future.add_done_callback
        def _callback(f: Future) -> None:
            exc: Optional[BaseException] = f.exception()
            if isinstance(exc, Exception):
                self.emit("error", exc)
            elif exc is not None:
                raise exc

    def shutdown(self, wait: bool = True) -> None:
        """Call ``shutdown`` on the internal executor."""

        self._executor.shutdown(wait=wait)

    def __enter__(self) -> "ExecutorEventEmitter":
        return self

    def __exit__(
        self, type: Type[Exception], value: Exception, traceback: TracebackType
    ) -> Optional[bool]:
        self.shutdown()