You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
244 lines
8.4 KiB
Python
244 lines
8.4 KiB
Python
import asyncio
|
|
import time
|
|
from collections.abc import MutableMapping
|
|
from signal import Signals
|
|
from typing import Any, Dict, Iterable, Optional, Callable, Awaitable
|
|
|
|
from amqpworker.conf import logger
|
|
from amqpworker.connections import Connection, ConnectionsMapping
|
|
from amqpworker.exceptions import InvalidConnection, InvalidRoute
|
|
from amqpworker.options import Options, DefaultValues, RouteTypes
|
|
from amqpworker.rabbitmq.entrypoints import AMQPRouteEntryPointImpl
|
|
from amqpworker.routes import RoutesRegistry, Route
|
|
from amqpworker.scheduled import ScheduledThreadPoolExecutor
|
|
|
|
from amqpworker.signals.base import Freezable, Signal
|
|
from concurrent.futures import ThreadPoolExecutor, Executor
|
|
|
|
from amqpworker.signals.handlers.rabbitmq import RabbitMQ
|
|
from amqpworker.utils import entrypoint
|
|
|
|
|
|
def async_run(coro: Awaitable):
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
except RuntimeError:
|
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
|
loop = asyncio.get_event_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
class App(MutableMapping, Freezable):
|
|
shutdown_os_signals = (Signals.SIGINT, Signals.SIGTERM)
|
|
handlers = (RabbitMQ(),)
|
|
|
|
def __init__(self, connections: Optional[Iterable[Connection]] = None, loop: Optional[Executor] = None):
|
|
Freezable.__init__(self)
|
|
self.routes_registry = RoutesRegistry()
|
|
self.default_route_options: dict = {}
|
|
self.loop: Optional[Executor] = loop
|
|
self.scheduler = None
|
|
self._state: Dict[Any, Any] = self._get_initial_state()
|
|
self.connections = ConnectionsMapping()
|
|
if connections:
|
|
self.connections.add(connections)
|
|
|
|
self._on_startup: Signal = Signal(self)
|
|
self._on_shutdown: Signal = Signal(self)
|
|
|
|
for handler in self.handlers:
|
|
self._on_startup.append(handler.startup)
|
|
self._on_shutdown.append(handler.shutdown)
|
|
|
|
# for signal_ in self.shutdown_os_signals:
|
|
# signal.signal(signal_, self.shutdown)
|
|
# self.loop.add_signal_handler(signal, self.shutdown)
|
|
|
|
def _check_frozen(self):
|
|
if self.frozen:
|
|
raise RuntimeError(
|
|
"You shouldn't change the state of a started application"
|
|
)
|
|
|
|
def freeze(self):
|
|
self.connections.freeze()
|
|
super(App, self).freeze()
|
|
|
|
def get_connection(self, name: str) -> Connection:
|
|
try:
|
|
return self.connections[name]
|
|
except KeyError as e:
|
|
raise InvalidConnection(
|
|
f"There's no Connection with name `{name}` registered "
|
|
f"in `App.connections`"
|
|
) from e
|
|
|
|
def _get_initial_state(self) -> Dict[str, Dict]:
|
|
# fixme: typeignore reason - https://github.com/python/mypy/issues/4537
|
|
return {route_type: {} for route_type in RouteTypes} # type: ignore
|
|
|
|
def __getitem__(self, key):
|
|
return self._state[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
self._check_frozen()
|
|
self._state[key] = value
|
|
|
|
def __delitem__(self, key):
|
|
self._check_frozen()
|
|
del self._state[key]
|
|
|
|
def __len__(self):
|
|
return len(self._state)
|
|
|
|
def __iter__(self):
|
|
return iter(self._state)
|
|
|
|
@entrypoint
|
|
def run(self):
|
|
logger.debug({"event": "Booting App..."})
|
|
self.startup()
|
|
try:
|
|
while True:
|
|
time.sleep(10)
|
|
except KeyboardInterrupt:
|
|
self.shutdown()
|
|
|
|
def startup(self):
|
|
"""
|
|
Causes on_startup signal
|
|
|
|
Should be called in the event loop along with the request handler.
|
|
"""
|
|
if self.loop is None:
|
|
self.loop = ThreadPoolExecutor(thread_name_prefix='AMQPWORKER-',
|
|
max_workers=min(16, sum(1 for _ in self.routes_registry.amqp_routes)))
|
|
self._on_startup.send(self)
|
|
|
|
def shutdown(self):
|
|
"""
|
|
Schedules an on_startup signal
|
|
|
|
Is called automatically when the application receives
|
|
a SIGINT or SIGTERM
|
|
"""
|
|
|
|
return self._on_shutdown.send(self)
|
|
|
|
@property
|
|
def amqp(self) -> AMQPRouteEntryPointImpl:
|
|
return AMQPRouteEntryPointImpl(self)
|
|
|
|
def route(
|
|
self,
|
|
routes: Iterable[str],
|
|
type: RouteTypes,
|
|
options: Optional[dict] = None,
|
|
**kwargs,
|
|
):
|
|
if options is None:
|
|
options = {}
|
|
if not isinstance(type, RouteTypes):
|
|
raise TypeError(
|
|
f"type parameter is not a valid RouteTypes." f" Found: '{type}'"
|
|
)
|
|
|
|
def wrapper(f):
|
|
handler = f
|
|
if not callable(handler):
|
|
raise TypeError(
|
|
f"Object passed as handler is not callable: {f}"
|
|
)
|
|
if asyncio.iscoroutinefunction(handler) or (
|
|
hasattr(handler, '__call__') and asyncio.iscoroutinefunction(handler.__call__)):
|
|
raise TypeError(
|
|
f"handler must be not a coroutine: {handler}"
|
|
)
|
|
self.routes_registry[handler] = {
|
|
"type": type,
|
|
"routes": routes,
|
|
"handler": handler,
|
|
"options": options,
|
|
"default_options": self.default_route_options,
|
|
**kwargs,
|
|
}
|
|
return f
|
|
|
|
return wrapper
|
|
|
|
def run_on_startup(self, func: Callable[["App"], None]) -> None:
|
|
"""
|
|
Registers a coroutine to be awaited for during app startup
|
|
"""
|
|
self._on_startup.append(func)
|
|
|
|
def run_on_shutdown(self, func: Callable[["App"], None]) -> None:
|
|
"""
|
|
Registers a coroutine to be awaited for during app shutdown
|
|
"""
|
|
self._on_shutdown.append(func)
|
|
|
|
def run_every(self, seconds: int, options: Optional[Dict] = None):
|
|
"""
|
|
Registers a coroutine to be called with a given interval
|
|
"""
|
|
if options is None:
|
|
options = {}
|
|
|
|
max_concurrency = options.get(
|
|
Options.MAX_CONCURRENCY, DefaultValues.RUN_EVERY_MAX_CONCURRENCY
|
|
)
|
|
|
|
def wrapper(task: Callable[..., None]):
|
|
runner = ScheduledThreadPoolExecutor(max_workers=max_concurrency)
|
|
runner.schedule_at_fixed_delay(task, initial_delay=seconds, period=seconds)
|
|
# runner = ScheduledTaskRunner(
|
|
# seconds=seconds,
|
|
# task=task,
|
|
# app=self,
|
|
# max_concurrency=max_concurrency,
|
|
# )
|
|
self._on_startup.append(runner.start)
|
|
self._on_shutdown.append(runner.stop)
|
|
if "task_runners" not in self:
|
|
self["task_runners"] = []
|
|
self["task_runners"].append(runner)
|
|
|
|
return task
|
|
|
|
return wrapper
|
|
|
|
def get_connection_for_route(self, route_info: Route):
|
|
route_connection = route_info.connection
|
|
if route_connection is None:
|
|
route_connection = route_info.options.get("connection")
|
|
|
|
connections = self.connections.with_type(route_info.type)
|
|
if route_connection is not None:
|
|
if isinstance(route_connection, Connection):
|
|
return route_connection
|
|
elif isinstance(route_connection, str):
|
|
return self.get_connection(name=route_connection)
|
|
else:
|
|
# pragma: nocover
|
|
raise InvalidRoute(
|
|
f"Expected `Route.connection` to be either `str` or "
|
|
f"`Connection`. Got `{type(route_connection)}` instead."
|
|
)
|
|
elif len(connections) > 1:
|
|
raise InvalidRoute(
|
|
f"Invalid route definition for App. You are trying to "
|
|
f"define a {route_info.type} into an amqpworker.App "
|
|
f"with multiple connections without specifying which "
|
|
f"one to use."
|
|
)
|
|
else:
|
|
try:
|
|
return connections[0]
|
|
except IndexError as e:
|
|
raise InvalidRoute(
|
|
f"Invalid route definition for App. You are trying to "
|
|
f"define a {route_info.type} without an "
|
|
f"Connection registered on App"
|
|
) from e
|