From 6cf096b833c50d675f8f1dff38c91496b922a152 Mon Sep 17 00:00:00 2001 From: JimZhang Date: Mon, 8 May 2023 15:39:00 +0800 Subject: [PATCH] feat: init project amqp-worker --- .gitignore | 160 ++++++++++ README.md | 93 ++++++ amqpworker/__init__.py | 0 amqpworker/app.py | 234 ++++++++++++++ amqpworker/bucket.py | 36 +++ amqpworker/conf.py | 49 +++ amqpworker/connections.py | 178 +++++++++++ amqpworker/consumer.py | 253 ++++++++++++++++ amqpworker/decorators.py | 29 ++ amqpworker/easyqueue/__init__.py | 0 amqpworker/easyqueue/connection.py | 75 +++++ amqpworker/easyqueue/exceptions.py | 16 + amqpworker/easyqueue/message.py | 86 ++++++ amqpworker/easyqueue/queue.py | 386 ++++++++++++++++++++++++ amqpworker/entrypoints.py | 22 ++ amqpworker/exceptions.py | 10 + amqpworker/options.py | 42 +++ amqpworker/rabbitmq/__init__.py | 3 + amqpworker/rabbitmq/entrypoints.py | 42 +++ amqpworker/rabbitmq/message.py | 58 ++++ amqpworker/routes.py | 135 +++++++++ amqpworker/scheduled/__init__.py | 162 ++++++++++ amqpworker/signals/__init__.py | 0 amqpworker/signals/base.py | 43 +++ amqpworker/signals/handlers/__init__.py | 0 amqpworker/signals/handlers/base.py | 12 + amqpworker/signals/handlers/rabbitmq.py | 47 +++ amqpworker/types/__init__.py | 0 amqpworker/types/registry.py | 54 ++++ amqpworker/typing/__init__.py | 66 ++++ amqpworker/utils.py | 13 + async-worker_license | 19 ++ poetry.lock | 222 ++++++++++++++ pyproject.toml | 26 ++ tests/__init__.py | 0 tests/logger_t.py | 6 + tests/rabbitmq.py | 29 ++ 37 files changed, 2606 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 amqpworker/__init__.py create mode 100644 amqpworker/app.py create mode 100644 amqpworker/bucket.py create mode 100644 amqpworker/conf.py create mode 100644 amqpworker/connections.py create mode 100644 amqpworker/consumer.py create mode 100644 amqpworker/decorators.py create mode 100644 amqpworker/easyqueue/__init__.py create mode 100644 amqpworker/easyqueue/connection.py create mode 100644 amqpworker/easyqueue/exceptions.py create mode 100644 amqpworker/easyqueue/message.py create mode 100644 amqpworker/easyqueue/queue.py create mode 100644 amqpworker/entrypoints.py create mode 100644 amqpworker/exceptions.py create mode 100644 amqpworker/options.py create mode 100644 amqpworker/rabbitmq/__init__.py create mode 100644 amqpworker/rabbitmq/entrypoints.py create mode 100644 amqpworker/rabbitmq/message.py create mode 100644 amqpworker/routes.py create mode 100644 amqpworker/scheduled/__init__.py create mode 100644 amqpworker/signals/__init__.py create mode 100644 amqpworker/signals/base.py create mode 100644 amqpworker/signals/handlers/__init__.py create mode 100644 amqpworker/signals/handlers/base.py create mode 100644 amqpworker/signals/handlers/rabbitmq.py create mode 100644 amqpworker/types/__init__.py create mode 100644 amqpworker/types/registry.py create mode 100644 amqpworker/typing/__init__.py create mode 100644 amqpworker/utils.py create mode 100644 async-worker_license create mode 100644 poetry.lock create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/logger_t.py create mode 100644 tests/rabbitmq.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b843de --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4233d76 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# 🐰amqp-worker + +amqp-worker 是一个基于 Python 的多线程 RabbitMQ 消费框架。它可以让你在消费消息时更加高效和稳定。 + +## 功能特点 + +- 批量消费:按批处理消息,提高消费效率。 +- 自动重连:当 RabbitMQ 服务断开连接时,amqp-worker 会自动重连,保证消费不中断。 +- 自定义消费模式:消费函数中自由决定使用多线程和协程。 +- 可配置的消息确认方式:支持自动确认和手动确认两种确认方式,根据你的消费需求进行配置。 +- 可配置的异常处理:支持全局配置消息异常的消费模式,重入队列、重新插入、消费消息。 + +## 安装方式 + +你可以使用 pip 工具来安装 amqp-worker: + +``` +pip install amqp-worker +``` + +## 使用方法 + +首先,你需要在你的 Python 代码中引入 amqp_worker 模块: + +```python +from amqpworker.app import App +``` + +然后,你需要实例化一个 App 对象,而App对象依赖AMQPConnection对象: + +```python +from amqpworker.connections import AMQPConnection +amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672) + +app = App(connections=[amqp_conn]) +``` + + + +接下来,你需要定义消费函数: + +```python +@app.amqp.consume( + ['test'], + options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2) +) +def _handler(msgs: List[RabbitMQMessage]): + print(f"Recv {len(msgs)} {datetime.now().isoformat()}") +``` + + +上面的代码中我们给消费函数一个装饰器,给出了消费的队列,每批消费的数量,值得注意的是,消费函数的参数类型为`List[RabbitMQMessage]` + +最后,只需要调用 `run` 方法即可开始消费: + +```python +app.run() +``` + +## 示例代码 + +下面是一个简单的示例代码,它会消费名为 `test` 的队列中的消息: + +```python +from datetime import datetime +from typing import List + +from amqpworker.app import App +from amqpworker.connections import AMQPConnection +from amqpworker.rabbitmq import RabbitMQMessage +from amqpworker.routes import AMQPRouteOptions + +amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672) +app = App(connections=[amqp_conn]) + +@app.amqp.consume( + ['test'], + options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2) +) +def _handler(msgs: List[RabbitMQMessage]): + print(f"Recv {len(msgs)} {datetime.now().isoformat()}") + +app.run() + +``` + +## 贡献者 + +- @JimZhang + +## 许可证 + +amqp-worker 使用 MIT 许可证。详情请参阅 LICENSE 文件。 \ No newline at end of file diff --git a/amqpworker/__init__.py b/amqpworker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amqpworker/app.py b/amqpworker/app.py new file mode 100644 index 0000000..23a4026 --- /dev/null +++ b/amqpworker/app.py @@ -0,0 +1,234 @@ +import asyncio +import time +from collections.abc import MutableMapping +from signal import Signals +import signal +from typing import Any, Dict, Iterable, Optional, Callable + +from amqpworker.conf import logger +from amqpworker.connections import Connection, ConnectionsMapping +from amqpworker.exceptions import InvalidConnection, InvalidRoute +from amqpworker.options import Options, DefaultValues, RouteTypes +from amqpworker.rabbitmq.entrypoints import AMQPRouteEntryPointImpl +from amqpworker.routes import RoutesRegistry, Route +from amqpworker.scheduled import ScheduledThreadPoolExecutor + +from amqpworker.signals.base import Freezable, Signal +from concurrent.futures import ThreadPoolExecutor, Executor + +from amqpworker.signals.handlers.rabbitmq import RabbitMQ +from amqpworker.utils import entrypoint + + +class App(MutableMapping, Freezable): + shutdown_os_signals = (Signals.SIGINT, Signals.SIGTERM) + handlers = (RabbitMQ(),) + + def __init__(self, connections: Optional[Iterable[Connection]] = None, loop: Optional[Executor] = None): + Freezable.__init__(self) + self.routes_registry = RoutesRegistry() + self.default_route_options: dict = {} + self.loop: Optional[Executor] = loop + self.scheduler = None + self._state: Dict[Any, Any] = self._get_initial_state() + self.connections = ConnectionsMapping() + if connections: + self.connections.add(connections) + + self._on_startup: Signal = Signal(self) + self._on_shutdown: Signal = Signal(self) + + for handler in self.handlers: + self._on_startup.append(handler.startup) + self._on_shutdown.append(handler.shutdown) + + # for signal_ in self.shutdown_os_signals: + # signal.signal(signal_, self.shutdown) + # self.loop.add_signal_handler(signal, self.shutdown) + + def _check_frozen(self): + if self.frozen: + raise RuntimeError( + "You shouldn't change the state of a started application" + ) + + def freeze(self): + self.connections.freeze() + super(App, self).freeze() + + def get_connection(self, name: str) -> Connection: + try: + return self.connections[name] + except KeyError as e: + raise InvalidConnection( + f"There's no Connection with name `{name}` registered " + f"in `App.connections`" + ) from e + + def _get_initial_state(self) -> Dict[str, Dict]: + # fixme: typeignore reason - https://github.com/python/mypy/issues/4537 + return {route_type: {} for route_type in RouteTypes} # type: ignore + + def __getitem__(self, key): + return self._state[key] + + def __setitem__(self, key, value): + self._check_frozen() + self._state[key] = value + + def __delitem__(self, key): + self._check_frozen() + del self._state[key] + + def __len__(self): + return len(self._state) + + def __iter__(self): + return iter(self._state) + + @entrypoint + def run(self): + logger.debug({"event": "Booting App..."}) + self.startup() + try: + while True: + time.sleep(10) + except KeyboardInterrupt: + self.shutdown() + + def startup(self): + """ + Causes on_startup signal + + Should be called in the event loop along with the request handler. + """ + if self.loop is None: + self.loop = ThreadPoolExecutor(thread_name_prefix='AMQPWORKER-',max_workers=min(16, sum(1 for _ in self.routes_registry.amqp_routes))) + self._on_startup.send(self) + + def shutdown(self): + """ + Schedules an on_startup signal + + Is called automatically when the application receives + a SIGINT or SIGTERM + """ + + return self._on_shutdown.send(self) + + @property + def amqp(self) -> AMQPRouteEntryPointImpl: + return AMQPRouteEntryPointImpl(self) + + def route( + self, + routes: Iterable[str], + type: RouteTypes, + options: Optional[dict] = None, + **kwargs, + ): + if options is None: + options = {} + if not isinstance(type, RouteTypes): + raise TypeError( + f"type parameter is not a valid RouteTypes." f" Found: '{type}'" + ) + + def wrapper(f): + handler = f + if not callable(handler): + raise TypeError( + f"Object passed as handler is not callable: {f}" + ) + if asyncio.iscoroutinefunction(handler) or ( + hasattr(handler, '__call__') and asyncio.iscoroutinefunction(handler.__call__)): + raise TypeError( + f"handler must be not a coroutine: {handler}" + ) + self.routes_registry[handler] = { + "type": type, + "routes": routes, + "handler": handler, + "options": options, + "default_options": self.default_route_options, + **kwargs, + } + return f + + return wrapper + + def run_on_startup(self, func: Callable[["App"], None]) -> None: + """ + Registers a coroutine to be awaited for during app startup + """ + self._on_startup.append(func) + + def run_on_shutdown(self, func: Callable[["App"], None]) -> None: + """ + Registers a coroutine to be awaited for during app shutdown + """ + self._on_shutdown.append(func) + + def run_every(self, seconds: int, options: Optional[Dict] = None): + """ + Registers a coroutine to be called with a given interval + """ + if options is None: + options = {} + + max_concurrency = options.get( + Options.MAX_CONCURRENCY, DefaultValues.RUN_EVERY_MAX_CONCURRENCY + ) + + def wrapper(task: Callable[..., None]): + runner = ScheduledThreadPoolExecutor(max_workers=max_concurrency) + runner.schedule_at_fixed_delay(task, initial_delay=seconds, period=seconds) + # runner = ScheduledTaskRunner( + # seconds=seconds, + # task=task, + # app=self, + # max_concurrency=max_concurrency, + # ) + self._on_startup.append(runner.start) + self._on_shutdown.append(runner.stop) + if "task_runners" not in self: + self["task_runners"] = [] + self["task_runners"].append(runner) + + return task + + return wrapper + + def get_connection_for_route(self, route_info: Route): + route_connection = route_info.connection + if route_connection is None: + route_connection = route_info.options.get("connection") + + connections = self.connections.with_type(route_info.type) + if route_connection is not None: + if isinstance(route_connection, Connection): + return route_connection + elif isinstance(route_connection, str): + return self.get_connection(name=route_connection) + else: + # pragma: nocover + raise InvalidRoute( + f"Expected `Route.connection` to be either `str` or " + f"`Connection`. Got `{type(route_connection)}` instead." + ) + elif len(connections) > 1: + raise InvalidRoute( + f"Invalid route definition for App. You are trying to " + f"define a {route_info.type} into an amqpworker.App " + f"with multiple connections without specifying which " + f"one to use." + ) + else: + try: + return connections[0] + except IndexError as e: + raise InvalidRoute( + f"Invalid route definition for App. You are trying to " + f"define a {route_info.type} without an " + f"Connection registered on App" + ) from e diff --git a/amqpworker/bucket.py b/amqpworker/bucket.py new file mode 100644 index 0000000..fb9f230 --- /dev/null +++ b/amqpworker/bucket.py @@ -0,0 +1,36 @@ +from typing import Generic, List, TypeVar + +T = TypeVar("T") + + +class Bucket(Generic[T]): + def __init__(self, size: int) -> None: + self.size = size + # fixme: Criar uma interface comum para as *Message + # para substituir esse Any + self._items: List[T] = [] + + def is_full(self) -> bool: + return len(self._items) == self.size + + def is_empty(self) -> bool: + return len(self._items) == 0 + + def put(self, item: T): + if self.is_full(): + error_msg = f"Bucket is at full capacity: {self.size}" + raise BucketFullException(error_msg) + self._items.append(item) + + def pop_all(self) -> List[T]: + _r = self._items + self._items = [] + return _r + + @property + def used(self) -> int: + return len(self._items) + + +class BucketFullException(Exception): + pass diff --git a/amqpworker/conf.py b/amqpworker/conf.py new file mode 100644 index 0000000..96e3263 --- /dev/null +++ b/amqpworker/conf.py @@ -0,0 +1,49 @@ +import logging +from typing import List, Optional + +from loguru import logger as _logger +from pydantic import BaseSettings + +from amqpworker.options import DefaultValues + +INFINITY = float("inf") + + +class Settings(BaseSettings): + LOGLEVEL: str = "ERROR" + + AMQP_DEFAULT_VHOST: str = "/" + AMQP_DEFAULT_PREFETCH_COUNT: int = 128 + AMQP_DEFAULT_HEARTBEAT: int = 60 + + HTTP_HOST: str = "127.0.0.1" + HTTP_PORT: int = 8080 + + FLUSH_TIMEOUT: int = DefaultValues.BULK_FLUSH_INTERVAL + + # metrics + METRICS_NAMESPACE: str = "amqpworker" + METRICS_APPPREFIX: Optional[str] + METRICS_ROUTE_PATH: str = "/metrics" + METRICS_ROUTE_ENABLED: bool = True + METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float] = [ + 10, + 50, + 100, + 200, + 500, + 1000, + 5000, + INFINITY, + ] + + class Config: + allow_mutation = False + env_prefix = "AMQPWORKER_" + + +settings = Settings() + +loglevel = getattr(logging, settings.LOGLEVEL, logging.INFO) +_logger.level( settings.LOGLEVEL) +logger = _logger diff --git a/amqpworker/connections.py b/amqpworker/connections.py new file mode 100644 index 0000000..e929e9b --- /dev/null +++ b/amqpworker/connections.py @@ -0,0 +1,178 @@ +import abc +import collections +from collections.abc import KeysView, ValuesView +from typing import ( + Any, + Counter, + Dict, + ItemsView, + Iterable, + List, + Mapping, + Optional, + Type, + Union, +) + +from pydantic import BaseModel, validator + +from amqpworker.conf import settings +from amqpworker.easyqueue.queue import JsonQueue +from amqpworker.exceptions import InvalidConnection +from amqpworker.options import RouteTypes +from amqpworker.signals.base import Freezable + + +class Connection(BaseModel, abc.ABC): + """ + Common ancestral for all Connection classes that auto generates a + connection name and is responsible for keeping track of new connections on + the ConnectionsMapping + """ + + route_type: RouteTypes + name: Optional[str] = None + + +class ConnectionsMapping(Mapping[str, Connection], Freezable): + """ + A mapping (Connection.name->Connection) of all available connections that + also keeps a counter for each connection type + """ + + def __getitem__(self, k: str) -> Connection: + return self._data[k] + + def __len__(self) -> int: + return len(self._data) + + def __iter__(self): + return iter(self._data) + + def __init__(self) -> None: + Freezable.__init__(self) + self._data: Dict[str, Connection] = {} + self.counter: Counter[Type[Connection]] = collections.Counter() + + def __contains__(self, item): + if isinstance(item, Connection): + return item in self.values() + return super(ConnectionsMapping, self).__contains__(item) + + def __setitem__(self, key: str, value: Connection) -> None: + if self.frozen: + raise RuntimeError( + "You shouldn't change the state of ConnectionsMapping " + "after App startup" + ) + + if key is None: + key = id(value) + + if key in self: + raise InvalidConnection( + f"Invalid connection: `{value}`. " + f"The name `{key}` already exists in {self.__class__.__name__}" + ) + self._data[key] = value + self.counter[value.__class__] += 1 + + def __delitem__(self, key: str) -> None: + if self.frozen: + raise RuntimeError( + "You shouldn't change the state of ConnectionsMapping " + "after App startup" + ) + del self._data[key] + + def add(self, connections: Iterable[Connection]) -> None: + for conn in connections: + self[conn.name] = conn # type: ignore + + def with_type(self, route_type: RouteTypes) -> List["Connection"]: + # todo: manter uma segunda estrutura de dados ou aceitar O(n) sempre que chamado? + return [conn for conn in self.values() if conn.route_type == route_type] + + +_TYPE_COUNTER: Counter[Type[Connection]] = collections.Counter() + +Message = Union[List, Dict] + + +class AMQPConnection(Connection): + hostname: str + port: int + username: str + password: str + route_type = RouteTypes.AMQP_RABBITMQ + prefetch: int = settings.AMQP_DEFAULT_PREFETCH_COUNT + heartbeat: int = settings.AMQP_DEFAULT_HEARTBEAT + name: Optional[str] = None + connections: Dict[str, JsonQueue] = {} + + class Config: + arbitrary_types_allowed = True + + @validator("connections", pre=True, always=True, check_fields=False) + def set_connections(cls, v): + return v or {} + + def __len__(self) -> int: + return len(self.connections) + + def __iter__(self): + return iter(self.connections) + + def __getitem__(self, key: str) -> JsonQueue: + """ + Gets a JsonQueue instance for a given virtual host + + :param key: The virtual host of the connection + :return: An instance of the connection + """ + try: + return self.connections[key] + except KeyError: + conn: JsonQueue = JsonQueue( + host=self.hostname, + port=self.port, + username=self.username, + password=self.password, + virtual_host=key, + ) + self.connections[key] = conn + return conn + + def keys(self): + return KeysView(self) + + def items(self): + return ItemsView(self) + + def values(self): + return ValuesView(self) + + def register(self, queue: JsonQueue) -> None: + self.connections[queue.virtual_host] = queue + + def put( + self, + routing_key: str, + data: Any = None, + serialized_data: Union[str, bytes, None] = None, + exchange: str = "", + vhost: str = settings.AMQP_DEFAULT_VHOST, + properties: Optional[dict] = None, + mandatory: bool = False, + immediate: bool = False, + ): + conn = self[vhost] + return conn.put( + routing_key=routing_key, + data=data, + serialized_data=serialized_data, + exchange=exchange, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) diff --git a/amqpworker/consumer.py b/amqpworker/consumer.py new file mode 100644 index 0000000..b01a957 --- /dev/null +++ b/amqpworker/consumer.py @@ -0,0 +1,253 @@ +import time +import traceback +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures.thread import _shutdown +from typing import Dict, Type, Union + +from amqpstorm import AMQPError +from loguru import logger + +from amqpworker import conf +from amqpworker.easyqueue.message import AMQPMessage +from amqpworker.easyqueue.queue import JsonQueue, QueueConsumerDelegate +from amqpworker.options import Events, Options +from amqpworker.routes import AMQPRoute +from .bucket import Bucket +from .rabbitmq import RabbitMQMessage +from .scheduled import ScheduledThreadPoolExecutor +from .utils import call + + +class Consumer(QueueConsumerDelegate): + def __init__( + self, + route_info: Union[Dict, AMQPRoute], + host: str, + port: int, + username: str, + password: str, + prefetch_count: int = 128, + bucket_class: Type[Bucket] = Bucket[RabbitMQMessage], + ) -> None: + self.route = route_info + self._handler = route_info["handler"] + self._queue_name = route_info["routes"] + self._route_options = route_info["options"] + self.host = host + self.vhost = route_info.get("vhost", "/") + self.bucket = bucket_class( + size=min(self._route_options["bulk_size"], prefetch_count) + ) + self.running = False + self.queue: JsonQueue = JsonQueue( + host, + username, + password, + port, + virtual_host=self.vhost, + delegate=self, + prefetch_count=prefetch_count, + logger=conf.logger, + connection_fail_callback=self._route_options.get( + Options.CONNECTION_FAIL_CALLBACK, None + ), + ) + + self.pool = ThreadPoolExecutor(thread_name_prefix=f'submit-{"-".join(self._queue_name)}-', + max_workers=self._route_options.get('max_workers', 8)) + self.multiple_queue_pool = ThreadPoolExecutor(thread_name_prefix='consumer-queue-', + max_workers=len(self._queue_name)) + self.clock = ScheduledThreadPoolExecutor(max_workers=1, name='flush-rabbit-') + self.clock_task = None + + @property + def queue_name(self) -> str: + return self._queue_name + + def on_before_start_consumption( + self, queue_name: str, queue: "JsonQueue" + ): + """ + Coroutine called before queue consumption starts. May be overwritten to + implement further custom initialization. + + :param queue_name: Queue name that will be consumed + :type queue_name: str + :param queue: AsynQueue instanced + :type queue: JsonQueue + """ + pass + + def on_consumption_start(self, consumer_tag: str, queue: "JsonQueue"): + """ + Coroutine called once consumption started. + """ + pass + + def on_queue_message(self, msg: AMQPMessage): + """ + Callback called every time that a new, valid and deserialized message + is ready to be handled. + """ + if not self.bucket.is_full(): + message = RabbitMQMessage( + delivery_tag=msg.delivery_tag, + amqp_message=msg, + on_success=self._route_options[Events.ON_SUCCESS], + on_exception=self._route_options[Events.ON_EXCEPTION], + ) + self.bucket.put(message) + + if self.bucket.is_full(): + return self._flush_bucket_if_needed() + + def _flush_clocked(self, *args, **kwargs): + try: + self._flush_bucket_if_needed() + except Exception as e: + conf.logger.error( + { + "type": "flush-bucket-failed", + "dest": self.host, + "retry": True, + "exc_traceback": traceback.format_exc(), + } + ) + + def _flush_bucket_if_needed(self): + try: + if not self.bucket.is_empty(): + all_messages = self.bucket.pop_all() + conf.logger.debug( + { + "event": "bucket-flush", + "bucket-size": len(all_messages), + "handler": self._handler.__name__, + } + ) + if self.running: + rv = self._handler(all_messages) + for fu in self.pool.map(call, [m.process_success for m in all_messages]): + pass + return rv + else: + for m in all_messages: + m.process_exception() + except AMQPError: + raise + except Exception as e: + traceback.print_exc() + if self.running: + for fu in self.pool.map(call, [m.process_exception for m in all_messages]): + pass + else: + for m in all_messages: + m.process_exception() + raise e + + def on_queue_error(self, body, delivery_tag, error, queue): + """ + Callback called every time that an error occurred during the validation + or deserialization stage. + + :param body: unparsed, raw message content + :type body: Any + :param delivery_tag: delivery_tag of the consumed message + :type delivery_tag: int + :param error: THe error that caused the callback to be called + :type error: MessageError + :type queue: JsonQueue + """ + conf.logger.error( + { + "parse-error": True, + "exception": "Error: not a JSON", + "original_msg": body, + } + ) + try: + queue.ack(delivery_tag=delivery_tag) + except AMQPError as e: + self._log_exception(e) + + def on_message_handle_error(self, handler_error: Exception, **kwargs): + """ + Callback called when an uncaught exception was raised during message + handling stage. + + :param handler_error: The exception that triggered + :param kwargs: arguments used to call the coroutine that handled + the message + """ + self._log_exception(handler_error) + + def on_connection_error(self, exception: Exception): + """ + Called when the connection fails + """ + self._log_exception(exception) + + def _log_exception(self, exception): + current_exception = { + "exc_message": str(exception), + "exc_traceback": traceback.format_exc(), + } + conf.logger.error(current_exception) + + def consume_all_queues(self, queue: JsonQueue): + + # for r in self.multiple_queue_pool.map(queue.consume, self._queue_name, repeat(self)): + # pass + for queue_name in self._queue_name: + # Por enquanto não estamos guardando a consumer_tag retornada + # se precisar, podemos passar a guardar. + conf.logger.debug( + {"queue": queue_name, "event": "start-consume"} + ) + queue.consume(queue_name=queue_name, pool=self.multiple_queue_pool, delegate=self) + + def keep_runnig(self): + return self.running + + def stop(self): + logger.debug('stop consumer') + self.running = False + self.queue.connection.close() + self.pool.shutdown(False) + self.multiple_queue_pool.shutdown(False) + self.clock.stop(None) + + def start(self): + self.running = True + if not self.clock_task: + seconds = self._route_options.get( + Options.BULK_FLUSH_INTERVAL, conf.settings.FLUSH_TIMEOUT + ) + self.clock.schedule_at_fixed_rate(self._flush_clocked, seconds, seconds) + self.clock.start(None) + self.clock_task = self._flush_clocked + while self.keep_runnig(): + if not self.queue.connection.has_channel_ready(): + try: + self.consume_all_queues(self.queue) + except RuntimeError as e: + traceback.print_exc() + if self.multiple_queue_pool._shutdown or _shutdown: + conf.logger.info('app shutdown') + # if 'interpreter shutdown' in str(e): + # return + else: + raise + except KeyboardInterrupt: + self.stop() + except Exception as e: + + conf.logger.error( + { + "type": "connection-failed", + "dest": self.host, + "retry": True, + "exc_traceback": traceback.format_exc(), + } + ) + time.sleep(1) diff --git a/amqpworker/decorators.py b/amqpworker/decorators.py new file mode 100644 index 0000000..1e523e6 --- /dev/null +++ b/amqpworker/decorators.py @@ -0,0 +1,29 @@ +def wraps(original_handler): + """ + Esse decorator faz com que a assinatura da função original + "suba" até o último decorator, que deverá ser sempre um registrador do + próprio amqpworker. ex: + @app.http.get(...) + @deco1 + @deco2 + async def handler(...) + pass + + Nesse caso, os decorators `@deco1` e `@deco2` devem, *necessariamente* + fazer uso desse `@wraps()` + """ + + def _wrap(deco): + deco.amqpworker_original_annotations = getattr( + original_handler, + "amqpworker_original_annotations", + original_handler.__annotations__, + ) + deco.amqpworker_original_qualname = getattr( + original_handler, + "amqpworker_original_qualname", + original_handler.__qualname__, + ) + return deco + + return _wrap diff --git a/amqpworker/easyqueue/__init__.py b/amqpworker/easyqueue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amqpworker/easyqueue/connection.py b/amqpworker/easyqueue/connection.py new file mode 100644 index 0000000..0df8ba3 --- /dev/null +++ b/amqpworker/easyqueue/connection.py @@ -0,0 +1,75 @@ +import threading +from typing import Callable, Optional, Union +from amqpstorm.base import Stateful +from amqpstorm import Connection, Channel, AMQPError + +OnErrorCallback = Union[ + None, Callable[[Exception], None] +] + + +class AMQPConnection: + def __init__( + self, + host: str, + username: str, + password: str, + port: int = 5672, + heartbeat: int = 60, + virtual_host: str = "/", + on_error: OnErrorCallback = None, + ) -> None: + self.host = host + self.port = port + self.username = username + self.password = password + self.virtual_host = virtual_host + self.heartbeat = heartbeat + self._on_error = on_error + self._connection_lock = threading.Lock() + self.channel: Optional[Channel] = None + self._protocol: Optional[Connection] = None + + @property + def connection_parameters(self): + return { + "hostname": self.host, + + "port": self.port, + "username": self.username, + "password": self.password, + "virtual_host": self.virtual_host, + # "on_error": self._on_error, + "heartbeat": self.heartbeat, + } + + @property + def is_connected(self) -> bool: + return self._protocol and self._protocol.current_state == Stateful.OPEN + + def has_channel_ready(self): + return self.channel and self.channel.is_open + + def close(self) -> None: + if not self.is_connected: + return None + self._protocol.close() + + def _connect(self) -> None: + with self._connection_lock: + if self.is_connected and self.has_channel_ready(): + return + + try: + if self._protocol: + self.channel = self._protocol.channel() + return + except AMQPError as e: + # Se não conseguirmos pegar um channel novo + # a conexão atual deve mesmo ser renovada e isso + # será feito logo abaixo. + pass + + self._protocol = Connection(**self.connection_parameters) + + self.channel = self._protocol.channel() diff --git a/amqpworker/easyqueue/exceptions.py b/amqpworker/easyqueue/exceptions.py new file mode 100644 index 0000000..8bb998b --- /dev/null +++ b/amqpworker/easyqueue/exceptions.py @@ -0,0 +1,16 @@ +class EmptyQueueException(Exception): + """No message to get""" + + +class MessageError(ValueError): + """Base for all message exceptions""" + + +class UndecodableMessageException(MessageError): + """Can't decode as JSON""" + + +class InvalidMessageSizeException(MessageError): + def __init__(self, message=None): + """Message size if bigger than it should be""" + self.message = message diff --git a/amqpworker/easyqueue/message.py b/amqpworker/easyqueue/message.py new file mode 100644 index 0000000..806017e --- /dev/null +++ b/amqpworker/easyqueue/message.py @@ -0,0 +1,86 @@ +from typing import Callable, Generic, Optional, TypeVar + +from amqpstorm import Channel +from amqpworker.easyqueue.connection import AMQPConnection +from amqpworker.easyqueue.exceptions import UndecodableMessageException + +T = TypeVar("T") + + +class AMQPMessage(Generic[T]): + __slots__ = ( + "connection", + "channel", + "queue_name", + "serialized_data", + "delivery_tag", + "_envelope", + "_properties", + "_deserialization_method", + "_deserialized_data", + "_queue", + ) + + def __init__( + self, + connection: AMQPConnection, + channel: Channel, + queue_name: str, + serialized_data: bytes, + delivery_tag: int, + properties: dict, + deserialization_method: Callable[[bytes], T], + queue, + ) -> None: + self.queue_name = queue_name + self.serialized_data = serialized_data + self.delivery_tag = delivery_tag + self.connection = connection + self.channel = channel + self._properties = properties + self._deserialization_method = deserialization_method + + self._deserialized_data: Optional[T] = None + self._queue = queue + + @property + def deserialized_data(self) -> T: + if self._deserialized_data: + return self._deserialized_data + try: + self._deserialized_data = self._deserialization_method( + self.serialized_data + ) + except ValueError as e: + raise UndecodableMessageException( + "msg couldn't be decoded as JSON" + ) from e + return self._deserialized_data + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + for attr in self.__slots__: + if attr.startswith("__"): + continue + if getattr(self, attr) != getattr(other, attr): + return False + + return True + + def ack(self): + return self.channel.basic.ack(self.delivery_tag) + + def reject(self, requeue=False): + return self.channel.basic.reject( + delivery_tag=self.delivery_tag, requeue=requeue + ) + + def requeue(self): + self.channel.basic.ack(self.delivery_tag) + + self.channel.basic.publish(body=self.serialized_data, + routing_key=self.queue_name, + properties=self._properties, + mandatory=False, + immediate=False) diff --git a/amqpworker/easyqueue/queue.py b/amqpworker/easyqueue/queue.py new file mode 100644 index 0000000..0dcc7b7 --- /dev/null +++ b/amqpworker/easyqueue/queue.py @@ -0,0 +1,386 @@ +import abc +import json +import logging +import time +import traceback +from concurrent.futures import ThreadPoolExecutor +from enum import Enum, auto +from functools import wraps +from typing import ( + Any, + Callable, + Dict, + Generic, + Optional, + Type, + TypeVar, + Union, +) + +from amqpstorm import Message +from loguru import logger + +from amqpworker.easyqueue.connection import AMQPConnection +from amqpworker.easyqueue.exceptions import UndecodableMessageException +from amqpworker.easyqueue.message import AMQPMessage + + +class DeliveryModes: + NON_PERSISTENT = 1 + PERSISTENT = 2 + + +class ConnType(Enum): + CONSUME = auto() + WRITE = auto() + + +class BaseQueue(metaclass=abc.ABCMeta): + def __init__( + self, + host: str, + username: str, + password: str, + port: int = 5672, + virtual_host: str = "/", + heartbeat: int = 60, + ) -> None: + self.host = host + self.port = port + self.username = username + self.password = password + self.virtual_host = virtual_host + self.heartbeat = heartbeat + + @abc.abstractmethod + def serialize(self, body: Any, **kwargs) -> str: + raise NotImplementedError + + @abc.abstractmethod + def deserialize(self, body: bytes) -> Any: + raise NotImplementedError + + def _parse_message(self, content) -> Dict[str, Any]: + """ + Gets the raw message body as an input, handles deserialization and + outputs + :param content: The raw message body + """ + try: + return self.deserialize(content) + except TypeError: + return self.deserialize(content.decode()) + except json.decoder.JSONDecodeError as e: + raise UndecodableMessageException( + '"{content}" can\'t be decoded as JSON'.format(content=content) + ) + + +class BaseJsonQueue(BaseQueue): + content_type = "application/json" + + def serialize(self, body: Any, **kwargs) -> str: + return json.dumps(body, **kwargs) + + def deserialize(self, body: bytes) -> Any: + return json.loads(body.decode()) + + +def _ensure_conn_is_ready(conn_type: ConnType): + def _ensure_connected(coro: Callable[..., Any]): + @wraps(coro) + def wrapper(self: "JsonQueue", *args, **kwargs): + conn = self.conn_types[conn_type] + retries = 0 + while self.is_running and not conn.has_channel_ready(): + try: + conn._connect() + break + except Exception as e: + time.sleep(self.seconds_between_conn_retry) + retries += 1 + if self.connection_fail_callback: + self.connection_fail_callback(e, retries) + if self.logger: + self.logger.error( + { + "event": "reconnect-failure", + "retry_count": retries, + "exc_traceback": traceback.format_tb( + e.__traceback__ + ), + } + ) + return coro(self, *args, **kwargs) + + return wrapper + + return _ensure_connected + + +T = TypeVar("T") + + +class _ConsumptionHandler: + def __init__( + self, + delegate: "QueueConsumerDelegate", + queue: "JsonQueue", + queue_name: str, + ) -> None: + self.delegate = delegate + self.queue = queue + self.queue_name = queue_name + self.consumer_tag: Optional[str] = None + + def _handle_callback(self, callback, **kwargs): + """ + Chains the callback coroutine into a try/except and calls + `on_message_handle_error` in case of failure, avoiding unhandled + exceptions. + + :param callback: + :param kwargs: + :return: + """ + try: + return callback(**kwargs) + except Exception as e: + return self.delegate.on_message_handle_error( + handler_error=e, **kwargs + ) + + def handle_message( + self, + message: Message + ): + + msg = AMQPMessage( + connection=self.queue.connection, + channel=message.channel, + queue=self.queue, + properties=message.properties, + delivery_tag=message.delivery_tag, + deserialization_method=self.queue.deserialize, + queue_name=self.queue_name, + serialized_data=message.body, + ) + + callback = self._handle_callback( + self.delegate.on_queue_message, msg=msg # type: ignore + ) + return callback + + +class JsonQueue(BaseQueue, Generic[T]): + + def __init__( + self, + host: str, + username: str, + password: str, + port: int = 5672, + delegate_class: Optional[Type["QueueConsumerDelegate"]] = None, + delegate: Optional["QueueConsumerDelegate"] = None, + virtual_host: str = "/", + heartbeat: int = 60, + prefetch_count: int = 100, + seconds_between_conn_retry: int = 1, + logger: Optional[logging.Logger] = None, + connection_fail_callback: Optional[ + Callable[[Exception, int], None] + ] = None, + ) -> None: + super().__init__(host, username, password, port, virtual_host, heartbeat) + + if delegate is not None and delegate_class is not None: + raise ValueError("Cant provide both delegate and delegate_class") + + if delegate_class is not None: + self.delegate = delegate_class() + else: + self.delegate = delegate # type: ignore + + self.prefetch_count = prefetch_count + + on_error = self.delegate.on_connection_error if self.delegate else None + + self.connection = AMQPConnection( + host=host, + port=port, + username=username, + password=password, + virtual_host=virtual_host, + heartbeat=heartbeat, + on_error=on_error, + ) + + self._write_connection = AMQPConnection( + host=host, + port=port, + username=username, + password=password, + virtual_host=virtual_host, + heartbeat=heartbeat, + on_error=on_error, + ) + + self.conn_types = { + ConnType.CONSUME: self.connection, + ConnType.WRITE: self._write_connection, + } + + self.seconds_between_conn_retry = seconds_between_conn_retry + self.is_running = True + self.logger = logger + self.connection_fail_callback = connection_fail_callback + + def serialize(self, body: T, **kwargs) -> str: + return json.dumps(body, **kwargs) + + def deserialize(self, body: bytes) -> T: + return json.loads(body.decode()) + + def conn_for(self, type: ConnType) -> AMQPConnection: + return self.conn_types[type] + + @_ensure_conn_is_ready(ConnType.WRITE) + def put( + self, + routing_key: str, + data: Any = None, + serialized_data: Union[str, bytes] = "", + exchange: str = "", + properties: Optional[dict] = None, + mandatory: bool = False, + immediate: bool = False, + ): + """ + :param data: A serializable data that should be serialized before + publishing + :param serialized_data: A payload to be published as is + :param exchange: The exchange to publish the message + :param routing_key: The routing key to publish the message + """ + if data and serialized_data: + raise ValueError("Only one of data or json should be specified") + + if data: + if isinstance(data, (str, bytes)): + serialized_data = data + else: + serialized_data = self.serialize(data, ensure_ascii=False) + properties['Content-Type'] = 'application/json' + + if not isinstance(serialized_data, bytes): + serialized_data = serialized_data.encode() + + return self._write_connection.channel.basic.publish( + body=serialized_data, + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + + @_ensure_conn_is_ready(ConnType.CONSUME) + def consume( + self, + queue_name: str, + pool: ThreadPoolExecutor, + delegate: "QueueConsumerDelegate", + consumer_name: str = "", + + ) -> str: + """ + Connects the client if needed and starts queue consumption, sending + `on_before_start_consumption` and `on_consumption_start` notifications + to the delegate object + + :param queue_name: queue name to consume from + :param consumer_name: An optional name to be used as a consumer + identifier. If one isn't provided, a random one is generated by the + broker + :return: The consumer tag. Useful for cancelling/stopping consumption + """ + # todo: Implement a consumer tag generator + handler = _ConsumptionHandler( + delegate=delegate, queue=self, queue_name=queue_name + ) + + delegate.on_before_start_consumption( + queue_name=queue_name, queue=self + ) + self.connection.channel.basic.qos( + prefetch_count=self.prefetch_count, + prefetch_size=0, + global_=False, + ) + consumer_tag = self.connection.channel.basic.consume( + callback=handler.handle_message, + consumer_tag=consumer_name, + queue=queue_name, + ) + delegate.on_consumption_start( + consumer_tag=consumer_tag, queue=self + ) + + def start(): + try: + self.connection.channel.start_consuming() + except: + traceback.print_exc() + + pool.submit(start) + handler.consumer_tag = consumer_tag + return consumer_tag + + +class QueueConsumerDelegate(metaclass=abc.ABCMeta): + def on_before_start_consumption( + self, queue_name: str, queue: JsonQueue + ): + """ + Coroutine called before queue consumption starts. May be overwritten to + implement further custom initialization. + + :param queue_name: Queue name that will be consumed + :type queue_name: str + :param queue: AsynQueue instanced + :type queue: JsonQueue + """ + pass + + def on_consumption_start(self, consumer_tag: str, queue: JsonQueue): + """ + Coroutine called once consumption started. + """ + + @abc.abstractmethod + def on_queue_message(self, msg: AMQPMessage[Any]): + """ + Callback called every time that a new, valid and deserialized message + is ready to be handled. + + :param msg: the consumed message + """ + raise NotImplementedError + + def on_message_handle_error(self, handler_error: Exception, **kwargs): + """ + Callback called when an uncaught exception was raised during message + handling stage. + + :param handler_error: The exception that triggered + :param kwargs: arguments used to call the coroutine that handled + the message + :return: + """ + pass + + def on_connection_error(self, exception: Exception): + """ + Called when the connection fails + """ + pass diff --git a/amqpworker/entrypoints.py b/amqpworker/entrypoints.py new file mode 100644 index 0000000..b90913c --- /dev/null +++ b/amqpworker/entrypoints.py @@ -0,0 +1,22 @@ +from abc import ABC +from asyncio import iscoroutinefunction +from typing import Generic, TypeVar + +import amqpworker +from amqpworker.routes import RouteHandler + +T = TypeVar("T") + + +def _extract_sync_callable(handler) -> RouteHandler: + cb = handler + if not callable(cb): + raise TypeError(f"Object passed as handler is not callable: {cb}") + if iscoroutinefunction(cb) or (hasattr(cb, '__call__') and iscoroutinefunction(cb)): + raise TypeError(f"handler must be sync: {cb}") + return cb + + +class EntrypointInterface(ABC, Generic[T]): + def __init__(self, app: "amqpworker.App") -> None: + self.app = app diff --git a/amqpworker/exceptions.py b/amqpworker/exceptions.py new file mode 100644 index 0000000..3606b5c --- /dev/null +++ b/amqpworker/exceptions.py @@ -0,0 +1,10 @@ +class InvalidRoute(ValueError): + """ + Defines an invalid route definition condition. + """ + + +class InvalidConnection(ValueError): + """ + Defines an invalid connection definition condition. + """ diff --git a/amqpworker/options.py b/amqpworker/options.py new file mode 100644 index 0000000..581d1da --- /dev/null +++ b/amqpworker/options.py @@ -0,0 +1,42 @@ +from enum import Enum, auto +from typing import List + + +class AutoNameEnum(str, Enum): + def _generate_next_value_( # type: ignore + name: str, start: int, count: int, last_values: List[str] + ) -> str: + return name.lower() + + +class Options(AutoNameEnum): + BULK_SIZE = auto() + BULK_FLUSH_INTERVAL = auto() + MAX_CONCURRENCY = auto() + CONNECTION_FAIL_CALLBACK = auto() + + +class Actions(AutoNameEnum): + ACK = auto() + REJECT = auto() + REQUEUE = auto() + REQUEUE_TAIL = auto() + + +class Events(AutoNameEnum): + ON_SUCCESS = auto() + ON_EXCEPTION = auto() + + +class DefaultValues: + MAX_SUBMIT_WORKER_SIZE = 8 + BULK_SIZE = 1 + BULK_FLUSH_INTERVAL = 60 + ON_SUCCESS = Actions.ACK + ON_EXCEPTION = Actions.REQUEUE_TAIL + RUN_EVERY_MAX_CONCURRENCY = 1 + + +class RouteTypes(AutoNameEnum): + AMQP_RABBITMQ = auto() + HTTP = auto() diff --git a/amqpworker/rabbitmq/__init__.py b/amqpworker/rabbitmq/__init__.py new file mode 100644 index 0000000..f4ad9d8 --- /dev/null +++ b/amqpworker/rabbitmq/__init__.py @@ -0,0 +1,3 @@ +from amqpworker.routes import AMQPRouteOptions + +from .message import RabbitMQMessage # noqa: F401 diff --git a/amqpworker/rabbitmq/entrypoints.py b/amqpworker/rabbitmq/entrypoints.py new file mode 100644 index 0000000..6c01853 --- /dev/null +++ b/amqpworker/rabbitmq/entrypoints.py @@ -0,0 +1,42 @@ +from typing import List, Optional + +from amqpworker import conf +from amqpworker.connections import AMQPConnection +from amqpworker.entrypoints import EntrypointInterface, _extract_sync_callable +from amqpworker.routes import AMQPRoute, AMQPRouteOptions, RoutesRegistry + + +def _register_amqp_handler( + registry: RoutesRegistry, + routes: List[str], + vhost: str, + connection: Optional[AMQPConnection], + options: Optional[AMQPRouteOptions], +): + def _wrap(f): + cb = _extract_sync_callable(f) + route = AMQPRoute( + handler=cb, + routes=routes, + vhost=vhost, + connection=connection, + options=options, + ) + registry.add_amqp_route(route) + + return f + + return _wrap + + +class AMQPRouteEntryPointImpl(EntrypointInterface): + def consume( + self, + routes: List[str], + vhost: str = conf.settings.AMQP_DEFAULT_VHOST, + connection: Optional[AMQPConnection] = None, + options: Optional[AMQPRouteOptions] = AMQPRouteOptions(), + ): + return _register_amqp_handler( + self.app.routes_registry, routes, vhost, connection, options + ) diff --git a/amqpworker/rabbitmq/message.py b/amqpworker/rabbitmq/message.py new file mode 100644 index 0000000..1f3b2bb --- /dev/null +++ b/amqpworker/rabbitmq/message.py @@ -0,0 +1,58 @@ +from amqpworker.easyqueue.message import AMQPMessage +from amqpworker.options import Actions + + +class RabbitMQMessage: + def __init__( + self, + delivery_tag: int, + amqp_message: AMQPMessage, + on_success: Actions = Actions.ACK, + on_exception: Actions = Actions.REQUEUE, + ) -> None: + self._delivery_tag = delivery_tag + self._on_success_action = on_success + self._on_exception_action = on_exception + self._final_action = None + self._amqp_message = amqp_message + + @property + def body(self): + return self._amqp_message.deserialized_data + + @property + def serialized_data(self): + return self._amqp_message.serialized_data + + def reject(self, requeue=True): + """ + Marca essa mensagem para ser rejeitada. O parametro ``requeue`` indica se a mensagem será recolocada na fila original (``requeue=True``) ou será descartada (``requeue=False``). + """ + self._final_action = Actions.REQUEUE if requeue else Actions.REJECT + + def requeue(self): + self._final_action = Actions.REQUEUE_TAIL + + def accept(self): + """ + Marca essa mensagem para ser confirmada (``ACK``) ao fim da execução do handler. + """ + self._final_action = Actions.ACK + + def _process_action(self, action: Actions): + if action == Actions.REJECT: + self._amqp_message.reject(requeue=False) + elif action == Actions.REQUEUE: + self._amqp_message.reject(requeue=True) + elif action == Actions.REQUEUE_TAIL: + self._amqp_message.requeue() + elif action == Actions.ACK: + self._amqp_message.ack() + + def process_success(self): + action = self._final_action or self._on_success_action + return self._process_action(action) + + def process_exception(self): + action = self._final_action or self._on_exception_action + return self._process_action(action) diff --git a/amqpworker/routes.py b/amqpworker/routes.py new file mode 100644 index 0000000..147d913 --- /dev/null +++ b/amqpworker/routes.py @@ -0,0 +1,135 @@ +import abc +from collections import UserDict +from typing import ( + Any, + Callable, + Coroutine, + Dict, + Iterable, + List, + Optional, + Type, + Union, +) + +from cached_property import cached_property +from pydantic import BaseModel, Extra, root_validator, validator + +from amqpworker import conf +from amqpworker.connections import AMQPConnection, Connection +from amqpworker.options import Actions, DefaultValues, RouteTypes + +RouteHandler = Callable[..., Coroutine] + + +class Model(BaseModel, abc.ABC): + """ + An abstract pydantic BaseModel that also behaves like a Mapping + """ + + def __getitem__(self, item): + try: + return getattr(self, item) + except AttributeError as e: + raise KeyError from e + + def __setitem__(self, key, value): + try: + return self.__setattr__(key, value) + except (AttributeError, ValueError) as e: + raise KeyError from e + + def __eq__(self, other): + if isinstance(other, dict): + return self.dict() == other + return super(Model, self).__eq__(other) + + def __len__(self): + return len(self.__fields__) + + def keys(self): + return self.__fields__.keys() + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return default + + +class _RouteOptions(Model): + pass + + +class Route(Model, abc.ABC): + """ + An abstract Model that acts like a route factory + """ + + type: RouteTypes + handler: Any + routes: List[str] + connection: Optional[Connection] + options: _RouteOptions = _RouteOptions() + + @staticmethod + def factory(data: Dict) -> "Route": + try: + type_ = data.pop("type") + except KeyError as e: + raise ValueError("Routes must have a type") from e + + if type_ == RouteTypes.HTTP: + raise ValueError(f"'{type_}' is an invalid RouteType.") + if type_ == RouteTypes.AMQP_RABBITMQ: + return AMQPRoute(**data) + raise ValueError(f"'{type_}' is an invalid RouteType.") + + +class AMQPRouteOptions(_RouteOptions): + bulk_size: int = DefaultValues.BULK_SIZE + max_workers: int = DefaultValues.MAX_SUBMIT_WORKER_SIZE + bulk_flush_interval: int = DefaultValues.BULK_FLUSH_INTERVAL + on_success: Actions = DefaultValues.ON_SUCCESS + on_exception: Actions = DefaultValues.ON_EXCEPTION + connection_fail_callback: Optional[ + Callable[[Exception, int], Coroutine] + ] = None + connection: Optional[Union[AMQPConnection, str]] + + class Config: + arbitrary_types_allowed = False + extra = Extra.forbid + + +class AMQPRoute(Route): + type: RouteTypes = RouteTypes.AMQP_RABBITMQ + vhost: str = conf.settings.AMQP_DEFAULT_VHOST + connection: Optional[AMQPConnection] + options: AMQPRouteOptions + + +class RoutesRegistry(UserDict): + def _get_routes_for_type(self, route_type: Type) -> Iterable: + return tuple((r for r in self.values() if isinstance(r, route_type))) + + @cached_property + def amqp_routes(self) -> Iterable[AMQPRoute]: + return self._get_routes_for_type(AMQPRoute) + + def __setitem__(self, key: RouteHandler, value: Union[Dict, Route]): + if not isinstance(value, Route): + route = Route.factory({"handler": key, **value}) + else: + route = value + + super(RoutesRegistry, self).__setitem__(key, route) + + def add_route(self, route: Route) -> None: + self[route.handler] = route + + def add_amqp_route(self, route: AMQPRoute) -> None: + self[route.handler] = route + + def route_for(self, handler: RouteHandler) -> Route: + return self[handler] diff --git a/amqpworker/scheduled/__init__.py b/amqpworker/scheduled/__init__.py new file mode 100644 index 0000000..fc56999 --- /dev/null +++ b/amqpworker/scheduled/__init__.py @@ -0,0 +1,162 @@ +"""The MIT License (MIT) + +Copyright (c) 2021 Yogaraj.S + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.""" + +import threading +import traceback +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, Callable + +from delayedqueue import DelayedQueue +from loguru import logger + +"""Wrapper for the task submitted to ScheduledThreadPoolExecutor class""" + +import time +from typing import Callable + + +class ScheduledTask: + def __init__(self, runnable: Callable, initial_delay: int, period: int, *args, time_func=time.time, **kwargs): + super().__init__() + self.runnable = runnable + self.initial_delay = initial_delay + self.period = period + self.__time_func = time_func + self.args = args + self.kwargs = kwargs + self.__is_initial = True + self.task_time = int(self.__time_func() * 1000) + (initial_delay * 1000) + + @property + def is_initial_run(self) -> bool: + return self.__is_initial + + @property + def at_fixed_delay(self) -> bool: + return self.kwargs.get('is_fixed_delay', False) + + @property + def at_fixed_rate(self) -> bool: + return self.kwargs.get('is_fixed_rate', False) + + @property + def executor_ctx(self): + return self.kwargs['executor_ctx'] # pragma: no cover + + @property + def exception_callback(self): + return self.kwargs.get('on_exception_callback') + + @property + def time_func(self): + return self.__time_func + + def __get_next_run(self) -> int: + if not (self.at_fixed_rate or self.at_fixed_delay): + raise TypeError("`get_next_run` invoked in a non-repeatable task") + return int(self.__time_func() * 1000) + self.period * 1000 + + def set_next_run(self, time_taken: float = 0) -> None: + self.__is_initial = False + self.task_time = self.__get_next_run() - time_taken + + def __lt__(self, other) -> bool: + if not isinstance(other, ScheduledTask): + raise TypeError(f"{other} is not of type ScheduledTask") + return self.task_time < other.task_time + + def __repr__(self) -> str: + return f"""(Task: {self.runnable.__name__}, Initial Delay: {self.initial_delay} second(s), Periodic: {self.period} second(s), Next run: {time.ctime(self.task_time / 1000)})""" + + def run(self, *args, **kwargs): + st_time = time.time_ns() + try: + self.runnable(*self.args, **self.kwargs) + except Exception as e: + if self.exception_callback: + self.exception_callback(e, *self.args, **self.kwargs) + else: + traceback.print_exc() + finally: + end_time = time.time_ns() + time_taken = (end_time - st_time) / 1000000 # in milliseconds + if self.at_fixed_rate: + self.set_next_run(time_taken) + next_delay = (self.period * 1000 - time_taken) / 1000 + if next_delay < 0 or self.task_time <= (self.__time_func() * 1000): + self.executor_ctx._put(self, 0) + else: + self.executor_ctx._put(self, next_delay) + elif self.at_fixed_delay: + self.set_next_run() + self.executor_ctx._put(self, self.period) + + +class ScheduledThreadPoolExecutor(ThreadPoolExecutor): + def __init__(self, max_workers=10, name=''): + super().__init__(max_workers=max_workers, thread_name_prefix=name) + self._max_workers = max_workers + self.queue = DelayedQueue() + self.shutdown = False + + def schedule_at_fixed_rate(self, fn: Callable, initial_delay: int, period: int, *args, **kwargs) -> bool: + if self.shutdown: + raise RuntimeError(f"cannot schedule new task after shutdown!") + task = ScheduledTask(fn, initial_delay, period, *args, is_fixed_rate=True, executor_ctx=self, **kwargs) + return self._put(task, initial_delay) + + def schedule_at_fixed_delay(self, fn: Callable, initial_delay: int, period: int, *args, **kwargs) -> bool: + if self.shutdown: + raise RuntimeError(f"cannot schedule new task after shutdown!") + task = ScheduledTask(fn, initial_delay, period, *args, is_fixed_delay=True, executor_ctx=self, **kwargs) + return self._put(task, initial_delay) + + def schedule(self, fn, initial_delay, *args, **kwargs) -> bool: + task = ScheduledTask(fn, initial_delay, 0, *args, executor_ctx=self, **kwargs) + return self._put(task, initial_delay) + + def _put(self, task: ScheduledTask, delay: int) -> bool: + # Don't use this explicitly. Use schedule/schedule_at_fixed_delay/schedule_at_fixed_rate. Additionally, to be + # called by ScheduledTask only! + if not isinstance(task, ScheduledTask): + raise TypeError(f"Task `{task!r}` must be of type ScheduledTask") + if delay < 0: + raise ValueError(f"Delay `{delay}` must be a non-negative number") + # logger.debug(f" enqueuing {task} with delay of {delay}") + return self.queue.put(task, delay) + + def __run(self): + while not self.shutdown: + try: + task: ScheduledTask = self.queue.get() + super().submit(task.run, *task.args, **task.kwargs) + except Exception as e: + print(e) + + def stop(self, app, wait_for_completion: Optional[bool] = False): + self.shutdown = True + super().shutdown(wait_for_completion) + + def start(self, app): + t = threading.Thread(target=self.__run) + t.setDaemon(True) + t.start() diff --git a/amqpworker/signals/__init__.py b/amqpworker/signals/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amqpworker/signals/base.py b/amqpworker/signals/base.py new file mode 100644 index 0000000..fd77f74 --- /dev/null +++ b/amqpworker/signals/base.py @@ -0,0 +1,43 @@ +import abc +import threading + +from collections import UserList + + +class Freezable(metaclass=abc.ABCMeta): + def __init__(self): + self._frozen = False + + @property + def frozen(self) -> bool: + return self._frozen + + def freeze(self): + self._frozen = True + + +class Signal(UserList, threading.Event): + def __init__(self, owner: Freezable): + UserList.__init__(self) + threading.Event.__init__(self) + self._owner = owner + self.frozen = False + + def __repr__(self): + return "".format( + self._owner, self.frozen, list(self) + ) + + def send(self, *args, **kwargs): + """ + Sends data to all registered receivers. + """ + if self.frozen: + raise RuntimeError("Cannot send on frozen signal.") + + for receiver in self: + receiver(*args, **kwargs) + + self.frozen = True + self._owner.freeze() + self.set() diff --git a/amqpworker/signals/handlers/__init__.py b/amqpworker/signals/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amqpworker/signals/handlers/base.py b/amqpworker/signals/handlers/base.py new file mode 100644 index 0000000..f38898d --- /dev/null +++ b/amqpworker/signals/handlers/base.py @@ -0,0 +1,12 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover + from amqpworker import App # noqa: F401 + + +class SignalHandler: + def startup(self, app: "App"): + pass # pragma: no cover + + def shutdown(self, app: "App"): + pass # pragma: no cover \ No newline at end of file diff --git a/amqpworker/signals/handlers/rabbitmq.py b/amqpworker/signals/handlers/rabbitmq.py new file mode 100644 index 0000000..820e809 --- /dev/null +++ b/amqpworker/signals/handlers/rabbitmq.py @@ -0,0 +1,47 @@ +import logging +from asyncio import Task +from concurrent.futures import Future +from typing import TYPE_CHECKING, List + +from loguru import logger + +from amqpworker.connections import AMQPConnection +from amqpworker.consumer import Consumer +from amqpworker.options import RouteTypes +from amqpworker.signals.handlers.base import SignalHandler + +if TYPE_CHECKING: # pragma: no cover + from amqpworker.app import App # noqa: F401 + + +class RabbitMQ(SignalHandler): + def shutdown(self, app: "App"): + logger.debug('shutdown rabbit consumers') + if RouteTypes.AMQP_RABBITMQ in app: + for consumer in app[RouteTypes.AMQP_RABBITMQ]["consumers"]: + logger.debug(f'stop {consumer.host}') + consumer.stop() + logger.debug(f'stopped {consumer.host}') + + def startup(self, app: "App") -> List[Future]: + tasks = [] + + app[RouteTypes.AMQP_RABBITMQ]["consumers"] = [] + for route_info in app.routes_registry.amqp_routes: + conn: AMQPConnection = app.get_connection_for_route(route_info) + + consumer = Consumer( + route_info=route_info, + host=conn.hostname, + port=conn.port, + username=conn.username, + password=conn.password, + prefetch_count=conn.prefetch, + ) + app[RouteTypes.AMQP_RABBITMQ]["consumers"].append(consumer) + conn.register(consumer.queue) + task = app.loop.submit(consumer.start) + + tasks.append(task) + + return tasks diff --git a/amqpworker/types/__init__.py b/amqpworker/types/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/amqpworker/types/registry.py b/amqpworker/types/registry.py new file mode 100644 index 0000000..64d84c4 --- /dev/null +++ b/amqpworker/types/registry.py @@ -0,0 +1,54 @@ +from typing import Any, Dict, Optional, Tuple, Type + +from amqpworker.typing import get_args, get_origin + + +class RegistryItem: + def __init__( + self, type: Type, value: Any, type_args: Optional[Tuple] = None + ) -> None: + self.type = type + self.value = value + self.type_args = type_args + + +class TypesRegistry: + def __init__(self): + self._data: Dict[Tuple, RegistryItem] = {} + self._by_name: Dict[str, RegistryItem] = {} + + def set( + self, + obj: Any, + type_definition: Optional[Type] = None, + param_name: Optional[str] = None, + ) -> None: + has_type_args = get_args(type_definition) + origin = get_origin(obj) or obj.__class__ + + registry_item = RegistryItem( + type=origin, value=obj, type_args=has_type_args + ) + + self._data[(origin, has_type_args)] = registry_item + + if param_name: + self._by_name[param_name] = registry_item + + def get( + self, _type: Type, param_name: Optional[str] = None, type_args=None + ) -> Optional[Any]: + origin = get_origin(_type) or _type + _type_args = type_args or get_args(_type) + if param_name: + try: + item = self._by_name[param_name] + if (item.type, item.type_args) == (origin, _type_args): + return item.value + except KeyError: + return None + + try: + return self._data[(origin, get_args(_type))].value + except KeyError: + return None diff --git a/amqpworker/typing/__init__.py b/amqpworker/typing/__init__.py new file mode 100644 index 0000000..4e2e52e --- /dev/null +++ b/amqpworker/typing/__init__.py @@ -0,0 +1,66 @@ +from typing import Optional, Tuple, Type, get_type_hints + + +def get_args(_type: Optional[Type]) -> Optional[Tuple]: + if _type and hasattr(_type, "__args__"): + return _type.__args__ + + return None + + +def get_origin(_type: Optional[Type]) -> Optional[Type]: + if _type and hasattr(_type, "__origin__"): + return _type.__origin__ + + return None + + +def get_handler_original_typehints(handler): + """ + Retorna a assinatura do handler amqpworker que está sendo decorado. + O retorno dessa chamada é equivalente a: + typing.get_type_hints(original_handler) + Onde `original_handler` é o handler amqpworker original. + + Ideal para ser usado na pilha de decorators de um handler, ex: + + .. code-block:: python + + @deco1 + @deco2 + @deco3 + async def handler(...): + pass + + Nesse caso, se qualquer um dos 3 decorators precisar saber a assinatura + original, deve usar essa função passando a função recebida do decorator anterior. + + """ + + def _dummy(): + pass + + _dummy.__annotations__ = getattr( + handler, "amqpworker_original_annotations", handler.__annotations__ + ) + + return get_type_hints(_dummy) + + +def is_base_type(_type, base_type): + """ + Retorna True para argumentos de um tipo base `base_type`. + Ex: + (a: MyGeneric[int]) -> True + (b: MyGeneric) -> True + """ + if get_origin(_type) is base_type: + return True + + return issubclass(_type, base_type) + + +def get_handler_original_qualname(handler): + return getattr( + handler, "amqpworker_original_qualname", handler.__qualname__ + ) diff --git a/amqpworker/utils.py b/amqpworker/utils.py new file mode 100644 index 0000000..b18cad1 --- /dev/null +++ b/amqpworker/utils.py @@ -0,0 +1,13 @@ +import functools + + +def call(fn, *args, **kwargs): + fn(*args, **kwargs) + + +def entrypoint(f): + @functools.wraps(f) + def _(*args, **kwargs): + return f(*args, **kwargs) + + return _ diff --git a/async-worker_license b/async-worker_license new file mode 100644 index 0000000..f0e30e4 --- /dev/null +++ b/async-worker_license @@ -0,0 +1,19 @@ +Copyright (c) 2017 B2W Digital + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..5349d6b --- /dev/null +++ b/poetry.lock @@ -0,0 +1,222 @@ +# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. + +[[package]] +name = "amqpstorm" +version = "2.10.6" +description = "Thread-safe Python RabbitMQ Client & Management library." +category = "main" +optional = false +python-versions = ">=2.7" +files = [ + {file = "AMQPStorm-2.10.6.tar.gz", hash = "sha256:52685be58fee7783e0d2739bca4c1e8630fab22ccacb510fa84fa3cdd383e230"}, +] + +[package.dependencies] +pamqp = ">=2.0.0,<3.0" + +[package.extras] +management = ["requests (>2)"] +pool = ["amqpstorm-pool"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "cached-property" +version = "1.5.2" +description = "A decorator for caching properties in classes." +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"}, + {file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "colorama" +version = "0.4.6" +description = "Cross-platform colored terminal text." +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +files = [ + {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, + {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "delayedqueue" +version = "1.0.0" +description = "Thread safe delay queue implementation" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "delayedqueue-1.0.0-py3-none-any.whl", hash = "sha256:2617f1d14e8b7210d08540f0581a7548d7a2dcd6afc9578c635a6b5cce4d03f0"}, + {file = "delayedqueue-1.0.0.tar.gz", hash = "sha256:34085363e5ae54584b2d5977f270fa5d9321ce9a857f3acab8608d711278b68c"}, +] + +[package.extras] +testing = ["pytest", "pytest-cov", "setuptools"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "loguru" +version = "0.7.0" +description = "Python logging made (stupidly) simple" +category = "main" +optional = false +python-versions = ">=3.5" +files = [ + {file = "loguru-0.7.0-py3-none-any.whl", hash = "sha256:b93aa30099fa6860d4727f1b81f8718e965bb96253fa190fab2077aaad6d15d3"}, + {file = "loguru-0.7.0.tar.gz", hash = "sha256:1612053ced6ae84d7959dd7d5e431a0532642237ec21f7fd83ac73fe539e03e1"}, +] + +[package.dependencies] +colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""} +win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} + +[package.extras] +dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "pamqp" +version = "2.3.0" +description = "RabbitMQ Focused AMQP low-level library" +category = "main" +optional = false +python-versions = "*" +files = [ + {file = "pamqp-2.3.0-py2.py3-none-any.whl", hash = "sha256:2f81b5c186f668a67f165193925b6bfd83db4363a6222f599517f29ecee60b02"}, + {file = "pamqp-2.3.0.tar.gz", hash = "sha256:5cd0f5a85e89f20d5f8e19285a1507788031cfca4a9ea6f067e3cf18f5e294e8"}, +] + +[package.extras] +codegen = ["lxml"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "pydantic" +version = "1.10.7" +description = "Data validation and settings management using python type hints" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pydantic-1.10.7-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e79e999e539872e903767c417c897e729e015872040e56b96e67968c3b918b2d"}, + {file = "pydantic-1.10.7-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:01aea3a42c13f2602b7ecbbea484a98169fb568ebd9e247593ea05f01b884b2e"}, + {file = "pydantic-1.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:516f1ed9bc2406a0467dd777afc636c7091d71f214d5e413d64fef45174cfc7a"}, + {file = "pydantic-1.10.7-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ae150a63564929c675d7f2303008d88426a0add46efd76c3fc797cd71cb1b46f"}, + {file = "pydantic-1.10.7-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:ecbbc51391248116c0a055899e6c3e7ffbb11fb5e2a4cd6f2d0b93272118a209"}, + {file = "pydantic-1.10.7-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f4a2b50e2b03d5776e7f21af73e2070e1b5c0d0df255a827e7c632962f8315af"}, + {file = "pydantic-1.10.7-cp310-cp310-win_amd64.whl", hash = "sha256:a7cd2251439988b413cb0a985c4ed82b6c6aac382dbaff53ae03c4b23a70e80a"}, + {file = "pydantic-1.10.7-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:68792151e174a4aa9e9fc1b4e653e65a354a2fa0fed169f7b3d09902ad2cb6f1"}, + {file = "pydantic-1.10.7-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:dfe2507b8ef209da71b6fb5f4e597b50c5a34b78d7e857c4f8f3115effaef5fe"}, + {file = "pydantic-1.10.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10a86d8c8db68086f1e30a530f7d5f83eb0685e632e411dbbcf2d5c0150e8dcd"}, + {file = "pydantic-1.10.7-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d75ae19d2a3dbb146b6f324031c24f8a3f52ff5d6a9f22f0683694b3afcb16fb"}, + {file = "pydantic-1.10.7-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:464855a7ff7f2cc2cf537ecc421291b9132aa9c79aef44e917ad711b4a93163b"}, + {file = "pydantic-1.10.7-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:193924c563fae6ddcb71d3f06fa153866423ac1b793a47936656e806b64e24ca"}, + {file = "pydantic-1.10.7-cp311-cp311-win_amd64.whl", hash = "sha256:b4a849d10f211389502059c33332e91327bc154acc1845f375a99eca3afa802d"}, + {file = "pydantic-1.10.7-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:cc1dde4e50a5fc1336ee0581c1612215bc64ed6d28d2c7c6f25d2fe3e7c3e918"}, + {file = "pydantic-1.10.7-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e0cfe895a504c060e5d36b287ee696e2fdad02d89e0d895f83037245218a87fe"}, + {file = "pydantic-1.10.7-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:670bb4683ad1e48b0ecb06f0cfe2178dcf74ff27921cdf1606e527d2617a81ee"}, + {file = "pydantic-1.10.7-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:950ce33857841f9a337ce07ddf46bc84e1c4946d2a3bba18f8280297157a3fd1"}, + {file = "pydantic-1.10.7-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:c15582f9055fbc1bfe50266a19771bbbef33dd28c45e78afbe1996fd70966c2a"}, + {file = "pydantic-1.10.7-cp37-cp37m-win_amd64.whl", hash = "sha256:82dffb306dd20bd5268fd6379bc4bfe75242a9c2b79fec58e1041fbbdb1f7914"}, + {file = "pydantic-1.10.7-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8c7f51861d73e8b9ddcb9916ae7ac39fb52761d9ea0df41128e81e2ba42886cd"}, + {file = "pydantic-1.10.7-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6434b49c0b03a51021ade5c4daa7d70c98f7a79e95b551201fff682fc1661245"}, + {file = "pydantic-1.10.7-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:64d34ab766fa056df49013bb6e79921a0265204c071984e75a09cbceacbbdd5d"}, + {file = "pydantic-1.10.7-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:701daea9ffe9d26f97b52f1d157e0d4121644f0fcf80b443248434958fd03dc3"}, + {file = "pydantic-1.10.7-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:cf135c46099ff3f919d2150a948ce94b9ce545598ef2c6c7bf55dca98a304b52"}, + {file = "pydantic-1.10.7-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0f85904f73161817b80781cc150f8b906d521fa11e3cdabae19a581c3606209"}, + {file = "pydantic-1.10.7-cp38-cp38-win_amd64.whl", hash = "sha256:9f6f0fd68d73257ad6685419478c5aece46432f4bdd8d32c7345f1986496171e"}, + {file = "pydantic-1.10.7-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c230c0d8a322276d6e7b88c3f7ce885f9ed16e0910354510e0bae84d54991143"}, + {file = "pydantic-1.10.7-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:976cae77ba6a49d80f461fd8bba183ff7ba79f44aa5cfa82f1346b5626542f8e"}, + {file = "pydantic-1.10.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d45fc99d64af9aaf7e308054a0067fdcd87ffe974f2442312372dfa66e1001d"}, + {file = "pydantic-1.10.7-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2a5ebb48958754d386195fe9e9c5106f11275867051bf017a8059410e9abf1f"}, + {file = "pydantic-1.10.7-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:abfb7d4a7cd5cc4e1d1887c43503a7c5dd608eadf8bc615413fc498d3e4645cd"}, + {file = "pydantic-1.10.7-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:80b1fab4deb08a8292d15e43a6edccdffa5377a36a4597bb545b93e79c5ff0a5"}, + {file = "pydantic-1.10.7-cp39-cp39-win_amd64.whl", hash = "sha256:d71e69699498b020ea198468e2480a2f1e7433e32a3a99760058c6520e2bea7e"}, + {file = "pydantic-1.10.7-py3-none-any.whl", hash = "sha256:0cd181f1d0b1d00e2b705f1bf1ac7799a2d938cce3376b8007df62b29be3c2c6"}, + {file = "pydantic-1.10.7.tar.gz", hash = "sha256:cfc83c0678b6ba51b0532bea66860617c4cd4251ecf76e9846fa5a9f3454e97e"}, +] + +[package.dependencies] +typing-extensions = ">=4.2.0" + +[package.extras] +dotenv = ["python-dotenv (>=0.10.4)"] +email = ["email-validator (>=1.0.3)"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "typing-extensions" +version = "4.5.0" +description = "Backported and Experimental Type Hints for Python 3.7+" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "typing_extensions-4.5.0-py3-none-any.whl", hash = "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4"}, + {file = "typing_extensions-4.5.0.tar.gz", hash = "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "win32-setctime" +version = "1.1.0" +description = "A small Python utility to set file creation time on Windows" +category = "main" +optional = false +python-versions = ">=3.5" +files = [ + {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"}, + {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"}, +] + +[package.extras] +dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[metadata] +lock-version = "2.0" +python-versions = "^3.9" +content-hash = "1ad42e5ed1903b95a7dcb25cd65c32d195b6937e5b31d95d4b2b19d2f0b4fc31" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..40da0ef --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[tool.poetry] +name = "amqp-worker" +version = "0.1.0" +description = "" +authors = ["JimZhang "] +readme = "README.md" +packages = [{include = "amqp_worker"}] + +[tool.poetry.dependencies] +python = "^3.9" +amqpstorm = {version = "^2.10.6", source = "douban"} +pydantic = {version = "1.10.7", source = "douban"} +loguru = {version = "^0.7.0", source = "douban"} +cached-property = {version = "^1.5.2", source = "douban"} +delayedqueue = {version = "^1.0.0", source = "douban"} + + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + + + +[[tool.poetry.source]] +name = "douban" +url = "https://mirror.baidu.com/pypi/simple" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/logger_t.py b/tests/logger_t.py new file mode 100644 index 0000000..be16f0e --- /dev/null +++ b/tests/logger_t.py @@ -0,0 +1,6 @@ +from loguru import logger + +logger.error({'er':1, + 'aa':None + + }) \ No newline at end of file diff --git a/tests/rabbitmq.py b/tests/rabbitmq.py new file mode 100644 index 0000000..9d42b9b --- /dev/null +++ b/tests/rabbitmq.py @@ -0,0 +1,29 @@ +from datetime import datetime +from typing import List + +from amqpworker.app import App +from amqpworker.connections import AMQPConnection +from amqpworker.rabbitmq import RabbitMQMessage +from amqpworker.routes import AMQPRouteOptions + +amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672) + +app = App(connections=[amqp_conn]) + + +@app.amqp.consume( + ['test'], + options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2) +) +def _handler(msgs: List[RabbitMQMessage]): + print(f"Recv {len(msgs)} {datetime.now().isoformat()}") + +from loguru import logger + + +@app.run_every(1) +def produce(*args, **kwargs): + # logger.error("tick produce") + amqp_conn.put(data={'msg': 'ok'}, routing_key='test') + +app.run()