1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# -*- coding: utf-8 -*-
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from types import TracebackType
from typing import Any, Callable, Dict, Optional, Tuple, Type
from pyee.base import EventEmitter
__all__ = ["ExecutorEventEmitter"]
class ExecutorEventEmitter(EventEmitter):
"""An event emitter class which runs handlers in a ``concurrent.futures``
executor.
By default, this class creates a default ``ThreadPoolExecutor``, but
a custom executor may also be passed in explicitly to, for instance,
use a ``ProcessPoolExecutor`` instead.
This class runs all emitted events on the configured executor. Errors
captured by the resulting Future are automatically emitted on the
``error`` event. This is unlike the EventEmitter, which have no error
handling.
The underlying executor may be shut down by calling the ``shutdown``
method. Alternately you can treat the event emitter as a context manager::
with ExecutorEventEmitter() as ee:
# Underlying executor open
@ee.on('data')
def handler(data):
print(data)
ee.emit('event')
# Underlying executor closed
Since the function call is scheduled on an executor, emit is always
non-blocking.
No effort is made to ensure thread safety, beyond using an executor.
"""
def __init__(self, executor: Executor = None):
super(ExecutorEventEmitter, self).__init__()
if executor:
self._executor: Executor = executor
else:
self._executor = ThreadPoolExecutor()
def _emit_run(
self,
f: Callable,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
):
future: Future = self._executor.submit(f, *args, **kwargs)
@future.add_done_callback
def _callback(f: Future) -> None:
exc: Optional[BaseException] = f.exception()
if isinstance(exc, Exception):
self.emit("error", exc)
elif exc is not None:
raise exc
def shutdown(self, wait: bool = True) -> None:
"""Call ``shutdown`` on the internal executor."""
self._executor.shutdown(wait=wait)
def __enter__(self) -> "ExecutorEventEmitter":
return self
def __exit__(
self, type: Type[Exception], value: Exception, traceback: TracebackType
) -> Optional[bool]:
self.shutdown()
|