feat: init project amqp-worker
commit
6cf096b833
@ -0,0 +1,160 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/#use-with-ide
|
||||
.pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
.idea/
|
@ -0,0 +1,234 @@
|
||||
import asyncio
|
||||
import time
|
||||
from collections.abc import MutableMapping
|
||||
from signal import Signals
|
||||
import signal
|
||||
from typing import Any, Dict, Iterable, Optional, Callable
|
||||
|
||||
from amqpworker.conf import logger
|
||||
from amqpworker.connections import Connection, ConnectionsMapping
|
||||
from amqpworker.exceptions import InvalidConnection, InvalidRoute
|
||||
from amqpworker.options import Options, DefaultValues, RouteTypes
|
||||
from amqpworker.rabbitmq.entrypoints import AMQPRouteEntryPointImpl
|
||||
from amqpworker.routes import RoutesRegistry, Route
|
||||
from amqpworker.scheduled import ScheduledThreadPoolExecutor
|
||||
|
||||
from amqpworker.signals.base import Freezable, Signal
|
||||
from concurrent.futures import ThreadPoolExecutor, Executor
|
||||
|
||||
from amqpworker.signals.handlers.rabbitmq import RabbitMQ
|
||||
from amqpworker.utils import entrypoint
|
||||
|
||||
|
||||
class App(MutableMapping, Freezable):
|
||||
shutdown_os_signals = (Signals.SIGINT, Signals.SIGTERM)
|
||||
handlers = (RabbitMQ(),)
|
||||
|
||||
def __init__(self, connections: Optional[Iterable[Connection]] = None, loop: Optional[Executor] = None):
|
||||
Freezable.__init__(self)
|
||||
self.routes_registry = RoutesRegistry()
|
||||
self.default_route_options: dict = {}
|
||||
self.loop: Optional[Executor] = loop
|
||||
self.scheduler = None
|
||||
self._state: Dict[Any, Any] = self._get_initial_state()
|
||||
self.connections = ConnectionsMapping()
|
||||
if connections:
|
||||
self.connections.add(connections)
|
||||
|
||||
self._on_startup: Signal = Signal(self)
|
||||
self._on_shutdown: Signal = Signal(self)
|
||||
|
||||
for handler in self.handlers:
|
||||
self._on_startup.append(handler.startup)
|
||||
self._on_shutdown.append(handler.shutdown)
|
||||
|
||||
# for signal_ in self.shutdown_os_signals:
|
||||
# signal.signal(signal_, self.shutdown)
|
||||
# self.loop.add_signal_handler(signal, self.shutdown)
|
||||
|
||||
def _check_frozen(self):
|
||||
if self.frozen:
|
||||
raise RuntimeError(
|
||||
"You shouldn't change the state of a started application"
|
||||
)
|
||||
|
||||
def freeze(self):
|
||||
self.connections.freeze()
|
||||
super(App, self).freeze()
|
||||
|
||||
def get_connection(self, name: str) -> Connection:
|
||||
try:
|
||||
return self.connections[name]
|
||||
except KeyError as e:
|
||||
raise InvalidConnection(
|
||||
f"There's no Connection with name `{name}` registered "
|
||||
f"in `App.connections`"
|
||||
) from e
|
||||
|
||||
def _get_initial_state(self) -> Dict[str, Dict]:
|
||||
# fixme: typeignore reason - https://github.com/python/mypy/issues/4537
|
||||
return {route_type: {} for route_type in RouteTypes} # type: ignore
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._state[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._check_frozen()
|
||||
self._state[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
self._check_frozen()
|
||||
del self._state[key]
|
||||
|
||||
def __len__(self):
|
||||
return len(self._state)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._state)
|
||||
|
||||
@entrypoint
|
||||
def run(self):
|
||||
logger.debug({"event": "Booting App..."})
|
||||
self.startup()
|
||||
try:
|
||||
while True:
|
||||
time.sleep(10)
|
||||
except KeyboardInterrupt:
|
||||
self.shutdown()
|
||||
|
||||
def startup(self):
|
||||
"""
|
||||
Causes on_startup signal
|
||||
|
||||
Should be called in the event loop along with the request handler.
|
||||
"""
|
||||
if self.loop is None:
|
||||
self.loop = ThreadPoolExecutor(thread_name_prefix='AMQPWORKER-',max_workers=min(16, sum(1 for _ in self.routes_registry.amqp_routes)))
|
||||
self._on_startup.send(self)
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Schedules an on_startup signal
|
||||
|
||||
Is called automatically when the application receives
|
||||
a SIGINT or SIGTERM
|
||||
"""
|
||||
|
||||
return self._on_shutdown.send(self)
|
||||
|
||||
@property
|
||||
def amqp(self) -> AMQPRouteEntryPointImpl:
|
||||
return AMQPRouteEntryPointImpl(self)
|
||||
|
||||
def route(
|
||||
self,
|
||||
routes: Iterable[str],
|
||||
type: RouteTypes,
|
||||
options: Optional[dict] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if options is None:
|
||||
options = {}
|
||||
if not isinstance(type, RouteTypes):
|
||||
raise TypeError(
|
||||
f"type parameter is not a valid RouteTypes." f" Found: '{type}'"
|
||||
)
|
||||
|
||||
def wrapper(f):
|
||||
handler = f
|
||||
if not callable(handler):
|
||||
raise TypeError(
|
||||
f"Object passed as handler is not callable: {f}"
|
||||
)
|
||||
if asyncio.iscoroutinefunction(handler) or (
|
||||
hasattr(handler, '__call__') and asyncio.iscoroutinefunction(handler.__call__)):
|
||||
raise TypeError(
|
||||
f"handler must be not a coroutine: {handler}"
|
||||
)
|
||||
self.routes_registry[handler] = {
|
||||
"type": type,
|
||||
"routes": routes,
|
||||
"handler": handler,
|
||||
"options": options,
|
||||
"default_options": self.default_route_options,
|
||||
**kwargs,
|
||||
}
|
||||
return f
|
||||
|
||||
return wrapper
|
||||
|
||||
def run_on_startup(self, func: Callable[["App"], None]) -> None:
|
||||
"""
|
||||
Registers a coroutine to be awaited for during app startup
|
||||
"""
|
||||
self._on_startup.append(func)
|
||||
|
||||
def run_on_shutdown(self, func: Callable[["App"], None]) -> None:
|
||||
"""
|
||||
Registers a coroutine to be awaited for during app shutdown
|
||||
"""
|
||||
self._on_shutdown.append(func)
|
||||
|
||||
def run_every(self, seconds: int, options: Optional[Dict] = None):
|
||||
"""
|
||||
Registers a coroutine to be called with a given interval
|
||||
"""
|
||||
if options is None:
|
||||
options = {}
|
||||
|
||||
max_concurrency = options.get(
|
||||
Options.MAX_CONCURRENCY, DefaultValues.RUN_EVERY_MAX_CONCURRENCY
|
||||
)
|
||||
|
||||
def wrapper(task: Callable[..., None]):
|
||||
runner = ScheduledThreadPoolExecutor(max_workers=max_concurrency)
|
||||
runner.schedule_at_fixed_delay(task, initial_delay=seconds, period=seconds)
|
||||
# runner = ScheduledTaskRunner(
|
||||
# seconds=seconds,
|
||||
# task=task,
|
||||
# app=self,
|
||||
# max_concurrency=max_concurrency,
|
||||
# )
|
||||
self._on_startup.append(runner.start)
|
||||
self._on_shutdown.append(runner.stop)
|
||||
if "task_runners" not in self:
|
||||
self["task_runners"] = []
|
||||
self["task_runners"].append(runner)
|
||||
|
||||
return task
|
||||
|
||||
return wrapper
|
||||
|
||||
def get_connection_for_route(self, route_info: Route):
|
||||
route_connection = route_info.connection
|
||||
if route_connection is None:
|
||||
route_connection = route_info.options.get("connection")
|
||||
|
||||
connections = self.connections.with_type(route_info.type)
|
||||
if route_connection is not None:
|
||||
if isinstance(route_connection, Connection):
|
||||
return route_connection
|
||||
elif isinstance(route_connection, str):
|
||||
return self.get_connection(name=route_connection)
|
||||
else:
|
||||
# pragma: nocover
|
||||
raise InvalidRoute(
|
||||
f"Expected `Route.connection` to be either `str` or "
|
||||
f"`Connection`. Got `{type(route_connection)}` instead."
|
||||
)
|
||||
elif len(connections) > 1:
|
||||
raise InvalidRoute(
|
||||
f"Invalid route definition for App. You are trying to "
|
||||
f"define a {route_info.type} into an amqpworker.App "
|
||||
f"with multiple connections without specifying which "
|
||||
f"one to use."
|
||||
)
|
||||
else:
|
||||
try:
|
||||
return connections[0]
|
||||
except IndexError as e:
|
||||
raise InvalidRoute(
|
||||
f"Invalid route definition for App. You are trying to "
|
||||
f"define a {route_info.type} without an "
|
||||
f"Connection registered on App"
|
||||
) from e
|
@ -0,0 +1,36 @@
|
||||
from typing import Generic, List, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class Bucket(Generic[T]):
|
||||
def __init__(self, size: int) -> None:
|
||||
self.size = size
|
||||
# fixme: Criar uma interface comum para as *Message
|
||||
# para substituir esse Any
|
||||
self._items: List[T] = []
|
||||
|
||||
def is_full(self) -> bool:
|
||||
return len(self._items) == self.size
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return len(self._items) == 0
|
||||
|
||||
def put(self, item: T):
|
||||
if self.is_full():
|
||||
error_msg = f"Bucket is at full capacity: {self.size}"
|
||||
raise BucketFullException(error_msg)
|
||||
self._items.append(item)
|
||||
|
||||
def pop_all(self) -> List[T]:
|
||||
_r = self._items
|
||||
self._items = []
|
||||
return _r
|
||||
|
||||
@property
|
||||
def used(self) -> int:
|
||||
return len(self._items)
|
||||
|
||||
|
||||
class BucketFullException(Exception):
|
||||
pass
|
@ -0,0 +1,49 @@
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from loguru import logger as _logger
|
||||
from pydantic import BaseSettings
|
||||
|
||||
from amqpworker.options import DefaultValues
|
||||
|
||||
INFINITY = float("inf")
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
LOGLEVEL: str = "ERROR"
|
||||
|
||||
AMQP_DEFAULT_VHOST: str = "/"
|
||||
AMQP_DEFAULT_PREFETCH_COUNT: int = 128
|
||||
AMQP_DEFAULT_HEARTBEAT: int = 60
|
||||
|
||||
HTTP_HOST: str = "127.0.0.1"
|
||||
HTTP_PORT: int = 8080
|
||||
|
||||
FLUSH_TIMEOUT: int = DefaultValues.BULK_FLUSH_INTERVAL
|
||||
|
||||
# metrics
|
||||
METRICS_NAMESPACE: str = "amqpworker"
|
||||
METRICS_APPPREFIX: Optional[str]
|
||||
METRICS_ROUTE_PATH: str = "/metrics"
|
||||
METRICS_ROUTE_ENABLED: bool = True
|
||||
METRICS_DEFAULT_HISTOGRAM_BUCKETS_IN_MS: List[float] = [
|
||||
10,
|
||||
50,
|
||||
100,
|
||||
200,
|
||||
500,
|
||||
1000,
|
||||
5000,
|
||||
INFINITY,
|
||||
]
|
||||
|
||||
class Config:
|
||||
allow_mutation = False
|
||||
env_prefix = "AMQPWORKER_"
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
loglevel = getattr(logging, settings.LOGLEVEL, logging.INFO)
|
||||
_logger.level( settings.LOGLEVEL)
|
||||
logger = _logger
|
@ -0,0 +1,178 @@
|
||||
import abc
|
||||
import collections
|
||||
from collections.abc import KeysView, ValuesView
|
||||
from typing import (
|
||||
Any,
|
||||
Counter,
|
||||
Dict,
|
||||
ItemsView,
|
||||
Iterable,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from pydantic import BaseModel, validator
|
||||
|
||||
from amqpworker.conf import settings
|
||||
from amqpworker.easyqueue.queue import JsonQueue
|
||||
from amqpworker.exceptions import InvalidConnection
|
||||
from amqpworker.options import RouteTypes
|
||||
from amqpworker.signals.base import Freezable
|
||||
|
||||
|
||||
class Connection(BaseModel, abc.ABC):
|
||||
"""
|
||||
Common ancestral for all Connection classes that auto generates a
|
||||
connection name and is responsible for keeping track of new connections on
|
||||
the ConnectionsMapping
|
||||
"""
|
||||
|
||||
route_type: RouteTypes
|
||||
name: Optional[str] = None
|
||||
|
||||
|
||||
class ConnectionsMapping(Mapping[str, Connection], Freezable):
|
||||
"""
|
||||
A mapping (Connection.name->Connection) of all available connections that
|
||||
also keeps a counter for each connection type
|
||||
"""
|
||||
|
||||
def __getitem__(self, k: str) -> Connection:
|
||||
return self._data[k]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._data)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._data)
|
||||
|
||||
def __init__(self) -> None:
|
||||
Freezable.__init__(self)
|
||||
self._data: Dict[str, Connection] = {}
|
||||
self.counter: Counter[Type[Connection]] = collections.Counter()
|
||||
|
||||
def __contains__(self, item):
|
||||
if isinstance(item, Connection):
|
||||
return item in self.values()
|
||||
return super(ConnectionsMapping, self).__contains__(item)
|
||||
|
||||
def __setitem__(self, key: str, value: Connection) -> None:
|
||||
if self.frozen:
|
||||
raise RuntimeError(
|
||||
"You shouldn't change the state of ConnectionsMapping "
|
||||
"after App startup"
|
||||
)
|
||||
|
||||
if key is None:
|
||||
key = id(value)
|
||||
|
||||
if key in self:
|
||||
raise InvalidConnection(
|
||||
f"Invalid connection: `{value}`. "
|
||||
f"The name `{key}` already exists in {self.__class__.__name__}"
|
||||
)
|
||||
self._data[key] = value
|
||||
self.counter[value.__class__] += 1
|
||||
|
||||
def __delitem__(self, key: str) -> None:
|
||||
if self.frozen:
|
||||
raise RuntimeError(
|
||||
"You shouldn't change the state of ConnectionsMapping "
|
||||
"after App startup"
|
||||
)
|
||||
del self._data[key]
|
||||
|
||||
def add(self, connections: Iterable[Connection]) -> None:
|
||||
for conn in connections:
|
||||
self[conn.name] = conn # type: ignore
|
||||
|
||||
def with_type(self, route_type: RouteTypes) -> List["Connection"]:
|
||||
# todo: manter uma segunda estrutura de dados ou aceitar O(n) sempre que chamado?
|
||||
return [conn for conn in self.values() if conn.route_type == route_type]
|
||||
|
||||
|
||||
_TYPE_COUNTER: Counter[Type[Connection]] = collections.Counter()
|
||||
|
||||
Message = Union[List, Dict]
|
||||
|
||||
|
||||
class AMQPConnection(Connection):
|
||||
hostname: str
|
||||
port: int
|
||||
username: str
|
||||
password: str
|
||||
route_type = RouteTypes.AMQP_RABBITMQ
|
||||
prefetch: int = settings.AMQP_DEFAULT_PREFETCH_COUNT
|
||||
heartbeat: int = settings.AMQP_DEFAULT_HEARTBEAT
|
||||
name: Optional[str] = None
|
||||
connections: Dict[str, JsonQueue] = {}
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
@validator("connections", pre=True, always=True, check_fields=False)
|
||||
def set_connections(cls, v):
|
||||
return v or {}
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.connections)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.connections)
|
||||
|
||||
def __getitem__(self, key: str) -> JsonQueue:
|
||||
"""
|
||||
Gets a JsonQueue instance for a given virtual host
|
||||
|
||||
:param key: The virtual host of the connection
|
||||
:return: An instance of the connection
|
||||
"""
|
||||
try:
|
||||
return self.connections[key]
|
||||
except KeyError:
|
||||
conn: JsonQueue = JsonQueue(
|
||||
host=self.hostname,
|
||||
port=self.port,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
virtual_host=key,
|
||||
)
|
||||
self.connections[key] = conn
|
||||
return conn
|
||||
|
||||
def keys(self):
|
||||
return KeysView(self)
|
||||
|
||||
def items(self):
|
||||
return ItemsView(self)
|
||||
|
||||
def values(self):
|
||||
return ValuesView(self)
|
||||
|
||||
def register(self, queue: JsonQueue) -> None:
|
||||
self.connections[queue.virtual_host] = queue
|
||||
|
||||
def put(
|
||||
self,
|
||||
routing_key: str,
|
||||
data: Any = None,
|
||||
serialized_data: Union[str, bytes, None] = None,
|
||||
exchange: str = "",
|
||||
vhost: str = settings.AMQP_DEFAULT_VHOST,
|
||||
properties: Optional[dict] = None,
|
||||
mandatory: bool = False,
|
||||
immediate: bool = False,
|
||||
):
|
||||
conn = self[vhost]
|
||||
return conn.put(
|
||||
routing_key=routing_key,
|
||||
data=data,
|
||||
serialized_data=serialized_data,
|
||||
exchange=exchange,
|
||||
properties=properties,
|
||||
mandatory=mandatory,
|
||||
immediate=immediate,
|
||||
)
|
@ -0,0 +1,253 @@
|
||||
import time
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from concurrent.futures.thread import _shutdown
|
||||
from typing import Dict, Type, Union
|
||||
|
||||
from amqpstorm import AMQPError
|
||||
from loguru import logger
|
||||
|
||||
from amqpworker import conf
|
||||
from amqpworker.easyqueue.message import AMQPMessage
|
||||
from amqpworker.easyqueue.queue import JsonQueue, QueueConsumerDelegate
|
||||
from amqpworker.options import Events, Options
|
||||
from amqpworker.routes import AMQPRoute
|
||||
from .bucket import Bucket
|
||||
from .rabbitmq import RabbitMQMessage
|
||||
from .scheduled import ScheduledThreadPoolExecutor
|
||||
from .utils import call
|
||||
|
||||
|
||||
class Consumer(QueueConsumerDelegate):
|
||||
def __init__(
|
||||
self,
|
||||
route_info: Union[Dict, AMQPRoute],
|
||||
host: str,
|
||||
port: int,
|
||||
username: str,
|
||||
password: str,
|
||||
prefetch_count: int = 128,
|
||||
bucket_class: Type[Bucket] = Bucket[RabbitMQMessage],
|
||||
) -> None:
|
||||
self.route = route_info
|
||||
self._handler = route_info["handler"]
|
||||
self._queue_name = route_info["routes"]
|
||||
self._route_options = route_info["options"]
|
||||
self.host = host
|
||||
self.vhost = route_info.get("vhost", "/")
|
||||
self.bucket = bucket_class(
|
||||
size=min(self._route_options["bulk_size"], prefetch_count)
|
||||
)
|
||||
self.running = False
|
||||
self.queue: JsonQueue = JsonQueue(
|
||||
host,
|
||||
username,
|
||||
password,
|
||||
port,
|
||||
virtual_host=self.vhost,
|
||||
delegate=self,
|
||||
prefetch_count=prefetch_count,
|
||||
logger=conf.logger,
|
||||
connection_fail_callback=self._route_options.get(
|
||||
Options.CONNECTION_FAIL_CALLBACK, None
|
||||
),
|
||||
)
|
||||
|
||||
self.pool = ThreadPoolExecutor(thread_name_prefix=f'submit-{"-".join(self._queue_name)}-',
|
||||
max_workers=self._route_options.get('max_workers', 8))
|
||||
self.multiple_queue_pool = ThreadPoolExecutor(thread_name_prefix='consumer-queue-',
|
||||
max_workers=len(self._queue_name))
|
||||
self.clock = ScheduledThreadPoolExecutor(max_workers=1, name='flush-rabbit-')
|
||||
self.clock_task = None
|
||||
|
||||
@property
|
||||
def queue_name(self) -> str:
|
||||
return self._queue_name
|
||||
|
||||
def on_before_start_consumption(
|
||||
self, queue_name: str, queue: "JsonQueue"
|
||||
):
|
||||
"""
|
||||
Coroutine called before queue consumption starts. May be overwritten to
|
||||
implement further custom initialization.
|
||||
|
||||
:param queue_name: Queue name that will be consumed
|
||||
:type queue_name: str
|
||||
:param queue: AsynQueue instanced
|
||||
:type queue: JsonQueue
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_consumption_start(self, consumer_tag: str, queue: "JsonQueue"):
|
||||
"""
|
||||
Coroutine called once consumption started.
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_queue_message(self, msg: AMQPMessage):
|
||||
"""
|
||||
Callback called every time that a new, valid and deserialized message
|
||||
is ready to be handled.
|
||||
"""
|
||||
if not self.bucket.is_full():
|
||||
message = RabbitMQMessage(
|
||||
delivery_tag=msg.delivery_tag,
|
||||
amqp_message=msg,
|
||||
on_success=self._route_options[Events.ON_SUCCESS],
|
||||
on_exception=self._route_options[Events.ON_EXCEPTION],
|
||||
)
|
||||
self.bucket.put(message)
|
||||
|
||||
if self.bucket.is_full():
|
||||
return self._flush_bucket_if_needed()
|
||||
|
||||
def _flush_clocked(self, *args, **kwargs):
|
||||
try:
|
||||
self._flush_bucket_if_needed()
|
||||
except Exception as e:
|
||||
conf.logger.error(
|
||||
{
|
||||
"type": "flush-bucket-failed",
|
||||
"dest": self.host,
|
||||
"retry": True,
|
||||
"exc_traceback": traceback.format_exc(),
|
||||
}
|
||||
)
|
||||
|
||||
def _flush_bucket_if_needed(self):
|
||||
try:
|
||||
if not self.bucket.is_empty():
|
||||
all_messages = self.bucket.pop_all()
|
||||
conf.logger.debug(
|
||||
{
|
||||
"event": "bucket-flush",
|
||||
"bucket-size": len(all_messages),
|
||||
"handler": self._handler.__name__,
|
||||
}
|
||||
)
|
||||
if self.running:
|
||||
rv = self._handler(all_messages)
|
||||
for fu in self.pool.map(call, [m.process_success for m in all_messages]):
|
||||
pass
|
||||
return rv
|
||||
else:
|
||||
for m in all_messages:
|
||||
m.process_exception()
|
||||
except AMQPError:
|
||||
raise
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
if self.running:
|
||||
for fu in self.pool.map(call, [m.process_exception for m in all_messages]):
|
||||
pass
|
||||
else:
|
||||
for m in all_messages:
|
||||
m.process_exception()
|
||||
raise e
|
||||
|
||||
def on_queue_error(self, body, delivery_tag, error, queue):
|
||||
"""
|
||||
Callback called every time that an error occurred during the validation
|
||||
or deserialization stage.
|
||||
|
||||
:param body: unparsed, raw message content
|
||||
:type body: Any
|
||||
:param delivery_tag: delivery_tag of the consumed message
|
||||
:type delivery_tag: int
|
||||
:param error: THe error that caused the callback to be called
|
||||
:type error: MessageError
|
||||
:type queue: JsonQueue
|
||||
"""
|
||||
conf.logger.error(
|
||||
{
|
||||
"parse-error": True,
|
||||
"exception": "Error: not a JSON",
|
||||
"original_msg": body,
|
||||
}
|
||||
)
|
||||
try:
|
||||
queue.ack(delivery_tag=delivery_tag)
|
||||
except AMQPError as e:
|
||||
self._log_exception(e)
|
||||
|
||||
def on_message_handle_error(self, handler_error: Exception, **kwargs):
|
||||
"""
|
||||
Callback called when an uncaught exception was raised during message
|
||||
handling stage.
|
||||
|
||||
:param handler_error: The exception that triggered
|
||||
:param kwargs: arguments used to call the coroutine that handled
|
||||
the message
|
||||
"""
|
||||
self._log_exception(handler_error)
|
||||
|
||||
def on_connection_error(self, exception: Exception):
|
||||
"""
|
||||
Called when the connection fails
|
||||
"""
|
||||
self._log_exception(exception)
|
||||
|
||||
def _log_exception(self, exception):
|
||||
current_exception = {
|
||||
"exc_message": str(exception),
|
||||
"exc_traceback": traceback.format_exc(),
|
||||
}
|
||||
conf.logger.error(current_exception)
|
||||
|
||||
def consume_all_queues(self, queue: JsonQueue):
|
||||
|
||||
# for r in self.multiple_queue_pool.map(queue.consume, self._queue_name, repeat(self)):
|
||||
# pass
|
||||
for queue_name in self._queue_name:
|
||||
# Por enquanto não estamos guardando a consumer_tag retornada
|
||||
# se precisar, podemos passar a guardar.
|
||||
conf.logger.debug(
|
||||
{"queue": queue_name, "event": "start-consume"}
|
||||
)
|
||||
queue.consume(queue_name=queue_name, pool=self.multiple_queue_pool, delegate=self)
|
||||
|
||||
def keep_runnig(self):
|
||||
return self.running
|
||||
|
||||
def stop(self):
|
||||
logger.debug('stop consumer')
|
||||
self.running = False
|
||||
self.queue.connection.close()
|
||||
self.pool.shutdown(False)
|
||||
self.multiple_queue_pool.shutdown(False)
|
||||
self.clock.stop(None)
|
||||
|
||||
def start(self):
|
||||
self.running = True
|
||||
if not self.clock_task:
|
||||
seconds = self._route_options.get(
|
||||
Options.BULK_FLUSH_INTERVAL, conf.settings.FLUSH_TIMEOUT
|
||||
)
|
||||
self.clock.schedule_at_fixed_rate(self._flush_clocked, seconds, seconds)
|
||||
self.clock.start(None)
|
||||
self.clock_task = self._flush_clocked
|
||||
while self.keep_runnig():
|
||||
if not self.queue.connection.has_channel_ready():
|
||||
try:
|
||||
self.consume_all_queues(self.queue)
|
||||
except RuntimeError as e:
|
||||
traceback.print_exc()
|
||||
if self.multiple_queue_pool._shutdown or _shutdown:
|
||||
conf.logger.info('app shutdown')
|
||||
# if 'interpreter shutdown' in str(e):
|
||||
# return
|
||||
else:
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
except Exception as e:
|
||||
|
||||
conf.logger.error(
|
||||
{
|
||||
"type": "connection-failed",
|
||||
"dest": self.host,
|
||||
"retry": True,
|
||||
"exc_traceback": traceback.format_exc(),
|
||||
}
|
||||
)
|
||||
time.sleep(1)
|
@ -0,0 +1,29 @@
|
||||
def wraps(original_handler):
|
||||
"""
|
||||
Esse decorator faz com que a assinatura da função original
|
||||
"suba" até o último decorator, que deverá ser sempre um registrador do
|
||||
próprio amqpworker. ex:
|
||||
@app.http.get(...)
|
||||
@deco1
|
||||
@deco2
|
||||
async def handler(...)
|
||||
pass
|
||||
|
||||
Nesse caso, os decorators `@deco1` e `@deco2` devem, *necessariamente*
|
||||
fazer uso desse `@wraps()`
|
||||
"""
|
||||
|
||||
def _wrap(deco):
|
||||
deco.amqpworker_original_annotations = getattr(
|
||||
original_handler,
|
||||
"amqpworker_original_annotations",
|
||||
original_handler.__annotations__,
|
||||
)
|
||||
deco.amqpworker_original_qualname = getattr(
|
||||
original_handler,
|
||||
"amqpworker_original_qualname",
|
||||
original_handler.__qualname__,
|
||||
)
|
||||
return deco
|
||||
|
||||
return _wrap
|
@ -0,0 +1,75 @@
|
||||
import threading
|
||||
from typing import Callable, Optional, Union
|
||||
from amqpstorm.base import Stateful
|
||||
from amqpstorm import Connection, Channel, AMQPError
|
||||
|
||||
OnErrorCallback = Union[
|
||||
None, Callable[[Exception], None]
|
||||
]
|
||||
|
||||
|
||||
class AMQPConnection:
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 5672,
|
||||
heartbeat: int = 60,
|
||||
virtual_host: str = "/",
|
||||
on_error: OnErrorCallback = None,
|
||||
) -> None:
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.virtual_host = virtual_host
|
||||
self.heartbeat = heartbeat
|
||||
self._on_error = on_error
|
||||
self._connection_lock = threading.Lock()
|
||||
self.channel: Optional[Channel] = None
|
||||
self._protocol: Optional[Connection] = None
|
||||
|
||||
@property
|
||||
def connection_parameters(self):
|
||||
return {
|
||||
"hostname": self.host,
|
||||
|
||||
"port": self.port,
|
||||
"username": self.username,
|
||||
"password": self.password,
|
||||
"virtual_host": self.virtual_host,
|
||||
# "on_error": self._on_error,
|
||||
"heartbeat": self.heartbeat,
|
||||
}
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self._protocol and self._protocol.current_state == Stateful.OPEN
|
||||
|
||||
def has_channel_ready(self):
|
||||
return self.channel and self.channel.is_open
|
||||
|
||||
def close(self) -> None:
|
||||
if not self.is_connected:
|
||||
return None
|
||||
self._protocol.close()
|
||||
|
||||
def _connect(self) -> None:
|
||||
with self._connection_lock:
|
||||
if self.is_connected and self.has_channel_ready():
|
||||
return
|
||||
|
||||
try:
|
||||
if self._protocol:
|
||||
self.channel = self._protocol.channel()
|
||||
return
|
||||
except AMQPError as e:
|
||||
# Se não conseguirmos pegar um channel novo
|
||||
# a conexão atual deve mesmo ser renovada e isso
|
||||
# será feito logo abaixo.
|
||||
pass
|
||||
|
||||
self._protocol = Connection(**self.connection_parameters)
|
||||
|
||||
self.channel = self._protocol.channel()
|
@ -0,0 +1,16 @@
|
||||
class EmptyQueueException(Exception):
|
||||
"""No message to get"""
|
||||
|
||||
|
||||
class MessageError(ValueError):
|
||||
"""Base for all message exceptions"""
|
||||
|
||||
|
||||
class UndecodableMessageException(MessageError):
|
||||
"""Can't decode as JSON"""
|
||||
|
||||
|
||||
class InvalidMessageSizeException(MessageError):
|
||||
def __init__(self, message=None):
|
||||
"""Message size if bigger than it should be"""
|
||||
self.message = message
|
@ -0,0 +1,86 @@
|
||||
from typing import Callable, Generic, Optional, TypeVar
|
||||
|
||||
from amqpstorm import Channel
|
||||
from amqpworker.easyqueue.connection import AMQPConnection
|
||||
from amqpworker.easyqueue.exceptions import UndecodableMessageException
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class AMQPMessage(Generic[T]):
|
||||
__slots__ = (
|
||||
"connection",
|
||||
"channel",
|
||||
"queue_name",
|
||||
"serialized_data",
|
||||
"delivery_tag",
|
||||
"_envelope",
|
||||
"_properties",
|
||||
"_deserialization_method",
|
||||
"_deserialized_data",
|
||||
"_queue",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
connection: AMQPConnection,
|
||||
channel: Channel,
|
||||
queue_name: str,
|
||||
serialized_data: bytes,
|
||||
delivery_tag: int,
|
||||
properties: dict,
|
||||
deserialization_method: Callable[[bytes], T],
|
||||
queue,
|
||||
) -> None:
|
||||
self.queue_name = queue_name
|
||||
self.serialized_data = serialized_data
|
||||
self.delivery_tag = delivery_tag
|
||||
self.connection = connection
|
||||
self.channel = channel
|
||||
self._properties = properties
|
||||
self._deserialization_method = deserialization_method
|
||||
|
||||
self._deserialized_data: Optional[T] = None
|
||||
self._queue = queue
|
||||
|
||||
@property
|
||||
def deserialized_data(self) -> T:
|
||||
if self._deserialized_data:
|
||||
return self._deserialized_data
|
||||
try:
|
||||
self._deserialized_data = self._deserialization_method(
|
||||
self.serialized_data
|
||||
)
|
||||
except ValueError as e:
|
||||
raise UndecodableMessageException(
|
||||
"msg couldn't be decoded as JSON"
|
||||
) from e
|
||||
return self._deserialized_data
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return False
|
||||
for attr in self.__slots__:
|
||||
if attr.startswith("__"):
|
||||
continue
|
||||
if getattr(self, attr) != getattr(other, attr):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def ack(self):
|
||||
return self.channel.basic.ack(self.delivery_tag)
|
||||
|
||||
def reject(self, requeue=False):
|
||||
return self.channel.basic.reject(
|
||||
delivery_tag=self.delivery_tag, requeue=requeue
|
||||
)
|
||||
|
||||
def requeue(self):
|
||||
self.channel.basic.ack(self.delivery_tag)
|
||||
|
||||
self.channel.basic.publish(body=self.serialized_data,
|
||||
routing_key=self.queue_name,
|
||||
properties=self._properties,
|
||||
mandatory=False,
|
||||
immediate=False)
|
@ -0,0 +1,386 @@
|
||||
import abc
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum, auto
|
||||
from functools import wraps
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Generic,
|
||||
Optional,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
)
|
||||
|
||||
from amqpstorm import Message
|
||||
from loguru import logger
|
||||
|
||||
from amqpworker.easyqueue.connection import AMQPConnection
|
||||
from amqpworker.easyqueue.exceptions import UndecodableMessageException
|
||||
from amqpworker.easyqueue.message import AMQPMessage
|
||||
|
||||
|
||||
class DeliveryModes:
|
||||
NON_PERSISTENT = 1
|
||||
PERSISTENT = 2
|
||||
|
||||
|
||||
class ConnType(Enum):
|
||||
CONSUME = auto()
|
||||
WRITE = auto()
|
||||
|
||||
|
||||
class BaseQueue(metaclass=abc.ABCMeta):
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 5672,
|
||||
virtual_host: str = "/",
|
||||
heartbeat: int = 60,
|
||||
) -> None:
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.virtual_host = virtual_host
|
||||
self.heartbeat = heartbeat
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize(self, body: Any, **kwargs) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize(self, body: bytes) -> Any:
|
||||
raise NotImplementedError
|
||||
|
||||
def _parse_message(self, content) -> Dict[str, Any]:
|
||||
"""
|
||||
Gets the raw message body as an input, handles deserialization and
|
||||
outputs
|
||||
:param content: The raw message body
|
||||
"""
|
||||
try:
|
||||
return self.deserialize(content)
|
||||
except TypeError:
|
||||
return self.deserialize(content.decode())
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
raise UndecodableMessageException(
|
||||
'"{content}" can\'t be decoded as JSON'.format(content=content)
|
||||
)
|
||||
|
||||
|
||||
class BaseJsonQueue(BaseQueue):
|
||||
content_type = "application/json"
|
||||
|
||||
def serialize(self, body: Any, **kwargs) -> str:
|
||||
return json.dumps(body, **kwargs)
|
||||
|
||||
def deserialize(self, body: bytes) -> Any:
|
||||
return json.loads(body.decode())
|
||||
|
||||
|
||||
def _ensure_conn_is_ready(conn_type: ConnType):
|
||||
def _ensure_connected(coro: Callable[..., Any]):
|
||||
@wraps(coro)
|
||||
def wrapper(self: "JsonQueue", *args, **kwargs):
|
||||
conn = self.conn_types[conn_type]
|
||||
retries = 0
|
||||
while self.is_running and not conn.has_channel_ready():
|
||||
try:
|
||||
conn._connect()
|
||||
break
|
||||
except Exception as e:
|
||||
time.sleep(self.seconds_between_conn_retry)
|
||||
retries += 1
|
||||
if self.connection_fail_callback:
|
||||
self.connection_fail_callback(e, retries)
|
||||
if self.logger:
|
||||
self.logger.error(
|
||||
{
|
||||
"event": "reconnect-failure",
|
||||
"retry_count": retries,
|
||||
"exc_traceback": traceback.format_tb(
|
||||
e.__traceback__
|
||||
),
|
||||
}
|
||||
)
|
||||
return coro(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return _ensure_connected
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class _ConsumptionHandler:
|
||||
def __init__(
|
||||
self,
|
||||
delegate: "QueueConsumerDelegate",
|
||||
queue: "JsonQueue",
|
||||
queue_name: str,
|
||||
) -> None:
|
||||
self.delegate = delegate
|
||||
self.queue = queue
|
||||
self.queue_name = queue_name
|
||||
self.consumer_tag: Optional[str] = None
|
||||
|
||||
def _handle_callback(self, callback, **kwargs):
|
||||
"""
|
||||
Chains the callback coroutine into a try/except and calls
|
||||
`on_message_handle_error` in case of failure, avoiding unhandled
|
||||
exceptions.
|
||||
|
||||
:param callback:
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
return callback(**kwargs)
|
||||
except Exception as e:
|
||||
return self.delegate.on_message_handle_error(
|
||||
handler_error=e, **kwargs
|
||||
)
|
||||
|
||||
def handle_message(
|
||||
self,
|
||||
message: Message
|
||||
):
|
||||
|
||||
msg = AMQPMessage(
|
||||
connection=self.queue.connection,
|
||||
channel=message.channel,
|
||||
queue=self.queue,
|
||||
properties=message.properties,
|
||||
delivery_tag=message.delivery_tag,
|
||||
deserialization_method=self.queue.deserialize,
|
||||
queue_name=self.queue_name,
|
||||
serialized_data=message.body,
|
||||
)
|
||||
|
||||
callback = self._handle_callback(
|
||||
self.delegate.on_queue_message, msg=msg # type: ignore
|
||||
)
|
||||
return callback
|
||||
|
||||
|
||||
class JsonQueue(BaseQueue, Generic[T]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 5672,
|
||||
delegate_class: Optional[Type["QueueConsumerDelegate"]] = None,
|
||||
delegate: Optional["QueueConsumerDelegate"] = None,
|
||||
virtual_host: str = "/",
|
||||
heartbeat: int = 60,
|
||||
prefetch_count: int = 100,
|
||||
seconds_between_conn_retry: int = 1,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
connection_fail_callback: Optional[
|
||||
Callable[[Exception, int], None]
|
||||
] = None,
|
||||
) -> None:
|
||||
super().__init__(host, username, password, port, virtual_host, heartbeat)
|
||||
|
||||
if delegate is not None and delegate_class is not None:
|
||||
raise ValueError("Cant provide both delegate and delegate_class")
|
||||
|
||||
if delegate_class is not None:
|
||||
self.delegate = delegate_class()
|
||||
else:
|
||||
self.delegate = delegate # type: ignore
|
||||
|
||||
self.prefetch_count = prefetch_count
|
||||
|
||||
on_error = self.delegate.on_connection_error if self.delegate else None
|
||||
|
||||
self.connection = AMQPConnection(
|
||||
host=host,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password,
|
||||
virtual_host=virtual_host,
|
||||
heartbeat=heartbeat,
|
||||
on_error=on_error,
|
||||
)
|
||||
|
||||
self._write_connection = AMQPConnection(
|
||||
host=host,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password,
|
||||
virtual_host=virtual_host,
|
||||
heartbeat=heartbeat,
|
||||
on_error=on_error,
|
||||
)
|
||||
|
||||
self.conn_types = {
|
||||
ConnType.CONSUME: self.connection,
|
||||
ConnType.WRITE: self._write_connection,
|
||||
}
|
||||
|
||||
self.seconds_between_conn_retry = seconds_between_conn_retry
|
||||
self.is_running = True
|
||||
self.logger = logger
|
||||
self.connection_fail_callback = connection_fail_callback
|
||||
|
||||
def serialize(self, body: T, **kwargs) -> str:
|
||||
return json.dumps(body, **kwargs)
|
||||
|
||||
def deserialize(self, body: bytes) -> T:
|
||||
return json.loads(body.decode())
|
||||
|
||||
def conn_for(self, type: ConnType) -> AMQPConnection:
|
||||
return self.conn_types[type]
|
||||
|
||||
@_ensure_conn_is_ready(ConnType.WRITE)
|
||||
def put(
|
||||
self,
|
||||
routing_key: str,
|
||||
data: Any = None,
|
||||
serialized_data: Union[str, bytes] = "",
|
||||
exchange: str = "",
|
||||
properties: Optional[dict] = None,
|
||||
mandatory: bool = False,
|
||||
immediate: bool = False,
|
||||
):
|
||||
"""
|
||||
:param data: A serializable data that should be serialized before
|
||||
publishing
|
||||
:param serialized_data: A payload to be published as is
|
||||
:param exchange: The exchange to publish the message
|
||||
:param routing_key: The routing key to publish the message
|
||||
"""
|
||||
if data and serialized_data:
|
||||
raise ValueError("Only one of data or json should be specified")
|
||||
|
||||
if data:
|
||||
if isinstance(data, (str, bytes)):
|
||||
serialized_data = data
|
||||
else:
|
||||
serialized_data = self.serialize(data, ensure_ascii=False)
|
||||
properties['Content-Type'] = 'application/json'
|
||||
|
||||
if not isinstance(serialized_data, bytes):
|
||||
serialized_data = serialized_data.encode()
|
||||
|
||||
return self._write_connection.channel.basic.publish(
|
||||
body=serialized_data,
|
||||
exchange=exchange,
|
||||
routing_key=routing_key,
|
||||
properties=properties,
|
||||
mandatory=mandatory,
|
||||
immediate=immediate,
|
||||
)
|
||||
|
||||
@_ensure_conn_is_ready(ConnType.CONSUME)
|
||||
def consume(
|
||||
self,
|
||||
queue_name: str,
|
||||
pool: ThreadPoolExecutor,
|
||||
delegate: "QueueConsumerDelegate",
|
||||
consumer_name: str = "",
|
||||
|
||||
) -> str:
|
||||
"""
|
||||
Connects the client if needed and starts queue consumption, sending
|
||||
`on_before_start_consumption` and `on_consumption_start` notifications
|
||||
to the delegate object
|
||||
|
||||
:param queue_name: queue name to consume from
|
||||
:param consumer_name: An optional name to be used as a consumer
|
||||
identifier. If one isn't provided, a random one is generated by the
|
||||
broker
|
||||
:return: The consumer tag. Useful for cancelling/stopping consumption
|
||||
"""
|
||||
# todo: Implement a consumer tag generator
|
||||
handler = _ConsumptionHandler(
|
||||
delegate=delegate, queue=self, queue_name=queue_name
|
||||
)
|
||||
|
||||
delegate.on_before_start_consumption(
|
||||
queue_name=queue_name, queue=self
|
||||
)
|
||||
self.connection.channel.basic.qos(
|
||||
prefetch_count=self.prefetch_count,
|
||||
prefetch_size=0,
|
||||
global_=False,
|
||||
)
|
||||
consumer_tag = self.connection.channel.basic.consume(
|
||||
callback=handler.handle_message,
|
||||
consumer_tag=consumer_name,
|
||||
queue=queue_name,
|
||||
)
|
||||
delegate.on_consumption_start(
|
||||
consumer_tag=consumer_tag, queue=self
|
||||
)
|
||||
|
||||
def start():
|
||||
try:
|
||||
self.connection.channel.start_consuming()
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
pool.submit(start)
|
||||
handler.consumer_tag = consumer_tag
|
||||
return consumer_tag
|
||||
|
||||
|
||||
class QueueConsumerDelegate(metaclass=abc.ABCMeta):
|
||||
def on_before_start_consumption(
|
||||
self, queue_name: str, queue: JsonQueue
|
||||
):
|
||||
"""
|
||||
Coroutine called before queue consumption starts. May be overwritten to
|
||||
implement further custom initialization.
|
||||
|
||||
:param queue_name: Queue name that will be consumed
|
||||
:type queue_name: str
|
||||
:param queue: AsynQueue instanced
|
||||
:type queue: JsonQueue
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_consumption_start(self, consumer_tag: str, queue: JsonQueue):
|
||||
"""
|
||||
Coroutine called once consumption started.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_queue_message(self, msg: AMQPMessage[Any]):
|
||||
"""
|
||||
Callback called every time that a new, valid and deserialized message
|
||||
is ready to be handled.
|
||||
|
||||
:param msg: the consumed message
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def on_message_handle_error(self, handler_error: Exception, **kwargs):
|
||||
"""
|
||||
Callback called when an uncaught exception was raised during message
|
||||
handling stage.
|
||||
|
||||
:param handler_error: The exception that triggered
|
||||
:param kwargs: arguments used to call the coroutine that handled
|
||||
the message
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_connection_error(self, exception: Exception):
|
||||
"""
|
||||
Called when the connection fails
|
||||
"""
|
||||
pass
|
@ -0,0 +1,22 @@
|
||||
from abc import ABC
|
||||
from asyncio import iscoroutinefunction
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
import amqpworker
|
||||
from amqpworker.routes import RouteHandler
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _extract_sync_callable(handler) -> RouteHandler:
|
||||
cb = handler
|
||||
if not callable(cb):
|
||||
raise TypeError(f"Object passed as handler is not callable: {cb}")
|
||||
if iscoroutinefunction(cb) or (hasattr(cb, '__call__') and iscoroutinefunction(cb)):
|
||||
raise TypeError(f"handler must be sync: {cb}")
|
||||
return cb
|
||||
|
||||
|
||||
class EntrypointInterface(ABC, Generic[T]):
|
||||
def __init__(self, app: "amqpworker.App") -> None:
|
||||
self.app = app
|
@ -0,0 +1,10 @@
|
||||
class InvalidRoute(ValueError):
|
||||
"""
|
||||
Defines an invalid route definition condition.
|
||||
"""
|
||||
|
||||
|
||||
class InvalidConnection(ValueError):
|
||||
"""
|
||||
Defines an invalid connection definition condition.
|
||||
"""
|
@ -0,0 +1,42 @@
|
||||
from enum import Enum, auto
|
||||
from typing import List
|
||||
|
||||
|
||||
class AutoNameEnum(str, Enum):
|
||||
def _generate_next_value_( # type: ignore
|
||||
name: str, start: int, count: int, last_values: List[str]
|
||||
) -> str:
|
||||
return name.lower()
|
||||
|
||||
|
||||
class Options(AutoNameEnum):
|
||||
BULK_SIZE = auto()
|
||||
BULK_FLUSH_INTERVAL = auto()
|
||||
MAX_CONCURRENCY = auto()
|
||||
CONNECTION_FAIL_CALLBACK = auto()
|
||||
|
||||
|
||||
class Actions(AutoNameEnum):
|
||||
ACK = auto()
|
||||
REJECT = auto()
|
||||
REQUEUE = auto()
|
||||
REQUEUE_TAIL = auto()
|
||||
|
||||
|
||||
class Events(AutoNameEnum):
|
||||
ON_SUCCESS = auto()
|
||||
ON_EXCEPTION = auto()
|
||||
|
||||
|
||||
class DefaultValues:
|
||||
MAX_SUBMIT_WORKER_SIZE = 8
|
||||
BULK_SIZE = 1
|
||||
BULK_FLUSH_INTERVAL = 60
|
||||
ON_SUCCESS = Actions.ACK
|
||||
ON_EXCEPTION = Actions.REQUEUE_TAIL
|
||||
RUN_EVERY_MAX_CONCURRENCY = 1
|
||||
|
||||
|
||||
class RouteTypes(AutoNameEnum):
|
||||
AMQP_RABBITMQ = auto()
|
||||
HTTP = auto()
|
@ -0,0 +1,3 @@
|
||||
from amqpworker.routes import AMQPRouteOptions
|
||||
|
||||
from .message import RabbitMQMessage # noqa: F401
|
@ -0,0 +1,42 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from amqpworker import conf
|
||||
from amqpworker.connections import AMQPConnection
|
||||
from amqpworker.entrypoints import EntrypointInterface, _extract_sync_callable
|
||||
from amqpworker.routes import AMQPRoute, AMQPRouteOptions, RoutesRegistry
|
||||
|
||||
|
||||
def _register_amqp_handler(
|
||||
registry: RoutesRegistry,
|
||||
routes: List[str],
|
||||
vhost: str,
|
||||
connection: Optional[AMQPConnection],
|
||||
options: Optional[AMQPRouteOptions],
|
||||
):
|
||||
def _wrap(f):
|
||||
cb = _extract_sync_callable(f)
|
||||
route = AMQPRoute(
|
||||
handler=cb,
|
||||
routes=routes,
|
||||
vhost=vhost,
|
||||
connection=connection,
|
||||
options=options,
|
||||
)
|
||||
registry.add_amqp_route(route)
|
||||
|
||||
return f
|
||||
|
||||
return _wrap
|
||||
|
||||
|
||||
class AMQPRouteEntryPointImpl(EntrypointInterface):
|
||||
def consume(
|
||||
self,
|
||||
routes: List[str],
|
||||
vhost: str = conf.settings.AMQP_DEFAULT_VHOST,
|
||||
connection: Optional[AMQPConnection] = None,
|
||||
options: Optional[AMQPRouteOptions] = AMQPRouteOptions(),
|
||||
):
|
||||
return _register_amqp_handler(
|
||||
self.app.routes_registry, routes, vhost, connection, options
|
||||
)
|
@ -0,0 +1,58 @@
|
||||
from amqpworker.easyqueue.message import AMQPMessage
|
||||
from amqpworker.options import Actions
|
||||
|
||||
|
||||
class RabbitMQMessage:
|
||||
def __init__(
|
||||
self,
|
||||
delivery_tag: int,
|
||||
amqp_message: AMQPMessage,
|
||||
on_success: Actions = Actions.ACK,
|
||||
on_exception: Actions = Actions.REQUEUE,
|
||||
) -> None:
|
||||
self._delivery_tag = delivery_tag
|
||||
self._on_success_action = on_success
|
||||
self._on_exception_action = on_exception
|
||||
self._final_action = None
|
||||
self._amqp_message = amqp_message
|
||||
|
||||
@property
|
||||
def body(self):
|
||||
return self._amqp_message.deserialized_data
|
||||
|
||||
@property
|
||||
def serialized_data(self):
|
||||
return self._amqp_message.serialized_data
|
||||
|
||||
def reject(self, requeue=True):
|
||||
"""
|
||||
Marca essa mensagem para ser rejeitada. O parametro ``requeue`` indica se a mensagem será recolocada na fila original (``requeue=True``) ou será descartada (``requeue=False``).
|
||||
"""
|
||||
self._final_action = Actions.REQUEUE if requeue else Actions.REJECT
|
||||
|
||||
def requeue(self):
|
||||
self._final_action = Actions.REQUEUE_TAIL
|
||||
|
||||
def accept(self):
|
||||
"""
|
||||
Marca essa mensagem para ser confirmada (``ACK``) ao fim da execução do handler.
|
||||
"""
|
||||
self._final_action = Actions.ACK
|
||||
|
||||
def _process_action(self, action: Actions):
|
||||
if action == Actions.REJECT:
|
||||
self._amqp_message.reject(requeue=False)
|
||||
elif action == Actions.REQUEUE:
|
||||
self._amqp_message.reject(requeue=True)
|
||||
elif action == Actions.REQUEUE_TAIL:
|
||||
self._amqp_message.requeue()
|
||||
elif action == Actions.ACK:
|
||||
self._amqp_message.ack()
|
||||
|
||||
def process_success(self):
|
||||
action = self._final_action or self._on_success_action
|
||||
return self._process_action(action)
|
||||
|
||||
def process_exception(self):
|
||||
action = self._final_action or self._on_exception_action
|
||||
return self._process_action(action)
|
@ -0,0 +1,135 @@
|
||||
import abc
|
||||
from collections import UserDict
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Coroutine,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from cached_property import cached_property
|
||||
from pydantic import BaseModel, Extra, root_validator, validator
|
||||
|
||||
from amqpworker import conf
|
||||
from amqpworker.connections import AMQPConnection, Connection
|
||||
from amqpworker.options import Actions, DefaultValues, RouteTypes
|
||||
|
||||
RouteHandler = Callable[..., Coroutine]
|
||||
|
||||
|
||||
class Model(BaseModel, abc.ABC):
|
||||
"""
|
||||
An abstract pydantic BaseModel that also behaves like a Mapping
|
||||
"""
|
||||
|
||||
def __getitem__(self, item):
|
||||
try:
|
||||
return getattr(self, item)
|
||||
except AttributeError as e:
|
||||
raise KeyError from e
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
try:
|
||||
return self.__setattr__(key, value)
|
||||
except (AttributeError, ValueError) as e:
|
||||
raise KeyError from e
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, dict):
|
||||
return self.dict() == other
|
||||
return super(Model, self).__eq__(other)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__fields__)
|
||||
|
||||
def keys(self):
|
||||
return self.__fields__.keys()
|
||||
|
||||
def get(self, key, default=None):
|
||||
try:
|
||||
return self[key]
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
|
||||
class _RouteOptions(Model):
|
||||
pass
|
||||
|
||||
|
||||
class Route(Model, abc.ABC):
|
||||
"""
|
||||
An abstract Model that acts like a route factory
|
||||
"""
|
||||
|
||||
type: RouteTypes
|
||||
handler: Any
|
||||
routes: List[str]
|
||||
connection: Optional[Connection]
|
||||
options: _RouteOptions = _RouteOptions()
|
||||
|
||||
@staticmethod
|
||||
def factory(data: Dict) -> "Route":
|
||||
try:
|
||||
type_ = data.pop("type")
|
||||
except KeyError as e:
|
||||
raise ValueError("Routes must have a type") from e
|
||||
|
||||
if type_ == RouteTypes.HTTP:
|
||||
raise ValueError(f"'{type_}' is an invalid RouteType.")
|
||||
if type_ == RouteTypes.AMQP_RABBITMQ:
|
||||
return AMQPRoute(**data)
|
||||
raise ValueError(f"'{type_}' is an invalid RouteType.")
|
||||
|
||||
|
||||
class AMQPRouteOptions(_RouteOptions):
|
||||
bulk_size: int = DefaultValues.BULK_SIZE
|
||||
max_workers: int = DefaultValues.MAX_SUBMIT_WORKER_SIZE
|
||||
bulk_flush_interval: int = DefaultValues.BULK_FLUSH_INTERVAL
|
||||
on_success: Actions = DefaultValues.ON_SUCCESS
|
||||
on_exception: Actions = DefaultValues.ON_EXCEPTION
|
||||
connection_fail_callback: Optional[
|
||||
Callable[[Exception, int], Coroutine]
|
||||
] = None
|
||||
connection: Optional[Union[AMQPConnection, str]]
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = False
|
||||
extra = Extra.forbid
|
||||
|
||||
|
||||
class AMQPRoute(Route):
|
||||
type: RouteTypes = RouteTypes.AMQP_RABBITMQ
|
||||
vhost: str = conf.settings.AMQP_DEFAULT_VHOST
|
||||
connection: Optional[AMQPConnection]
|
||||
options: AMQPRouteOptions
|
||||
|
||||
|
||||
class RoutesRegistry(UserDict):
|
||||
def _get_routes_for_type(self, route_type: Type) -> Iterable:
|
||||
return tuple((r for r in self.values() if isinstance(r, route_type)))
|
||||
|
||||
@cached_property
|
||||
def amqp_routes(self) -> Iterable[AMQPRoute]:
|
||||
return self._get_routes_for_type(AMQPRoute)
|
||||
|
||||
def __setitem__(self, key: RouteHandler, value: Union[Dict, Route]):
|
||||
if not isinstance(value, Route):
|
||||
route = Route.factory({"handler": key, **value})
|
||||
else:
|
||||
route = value
|
||||
|
||||
super(RoutesRegistry, self).__setitem__(key, route)
|
||||
|
||||
def add_route(self, route: Route) -> None:
|
||||
self[route.handler] = route
|
||||
|
||||
def add_amqp_route(self, route: AMQPRoute) -> None:
|
||||
self[route.handler] = route
|
||||
|
||||
def route_for(self, handler: RouteHandler) -> Route:
|
||||
return self[handler]
|
@ -0,0 +1,162 @@
|
||||
"""The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2021 Yogaraj.S
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE."""
|
||||
|
||||
import threading
|
||||
import traceback
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Optional, Callable
|
||||
|
||||
from delayedqueue import DelayedQueue
|
||||
from loguru import logger
|
||||
|
||||
"""Wrapper for the task submitted to ScheduledThreadPoolExecutor class"""
|
||||
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
|
||||
class ScheduledTask:
|
||||
def __init__(self, runnable: Callable, initial_delay: int, period: int, *args, time_func=time.time, **kwargs):
|
||||
super().__init__()
|
||||
self.runnable = runnable
|
||||
self.initial_delay = initial_delay
|
||||
self.period = period
|
||||
self.__time_func = time_func
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.__is_initial = True
|
||||
self.task_time = int(self.__time_func() * 1000) + (initial_delay * 1000)
|
||||
|
||||
@property
|
||||
def is_initial_run(self) -> bool:
|
||||
return self.__is_initial
|
||||
|
||||
@property
|
||||
def at_fixed_delay(self) -> bool:
|
||||
return self.kwargs.get('is_fixed_delay', False)
|
||||
|
||||
@property
|
||||
def at_fixed_rate(self) -> bool:
|
||||
return self.kwargs.get('is_fixed_rate', False)
|
||||
|
||||
@property
|
||||
def executor_ctx(self):
|
||||
return self.kwargs['executor_ctx'] # pragma: no cover
|
||||
|
||||
@property
|
||||
def exception_callback(self):
|
||||
return self.kwargs.get('on_exception_callback')
|
||||
|
||||
@property
|
||||
def time_func(self):
|
||||
return self.__time_func
|
||||
|
||||
def __get_next_run(self) -> int:
|
||||
if not (self.at_fixed_rate or self.at_fixed_delay):
|
||||
raise TypeError("`get_next_run` invoked in a non-repeatable task")
|
||||
return int(self.__time_func() * 1000) + self.period * 1000
|
||||
|
||||
def set_next_run(self, time_taken: float = 0) -> None:
|
||||
self.__is_initial = False
|
||||
self.task_time = self.__get_next_run() - time_taken
|
||||
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, ScheduledTask):
|
||||
raise TypeError(f"{other} is not of type ScheduledTask")
|
||||
return self.task_time < other.task_time
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"""(Task: {self.runnable.__name__}, Initial Delay: {self.initial_delay} second(s), Periodic: {self.period} second(s), Next run: {time.ctime(self.task_time / 1000)})"""
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
st_time = time.time_ns()
|
||||
try:
|
||||
self.runnable(*self.args, **self.kwargs)
|
||||
except Exception as e:
|
||||
if self.exception_callback:
|
||||
self.exception_callback(e, *self.args, **self.kwargs)
|
||||
else:
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
end_time = time.time_ns()
|
||||
time_taken = (end_time - st_time) / 1000000 # in milliseconds
|
||||
if self.at_fixed_rate:
|
||||
self.set_next_run(time_taken)
|
||||
next_delay = (self.period * 1000 - time_taken) / 1000
|
||||
if next_delay < 0 or self.task_time <= (self.__time_func() * 1000):
|
||||
self.executor_ctx._put(self, 0)
|
||||
else:
|
||||
self.executor_ctx._put(self, next_delay)
|
||||
elif self.at_fixed_delay:
|
||||
self.set_next_run()
|
||||
self.executor_ctx._put(self, self.period)
|
||||
|
||||
|
||||
class ScheduledThreadPoolExecutor(ThreadPoolExecutor):
|
||||
def __init__(self, max_workers=10, name=''):
|
||||
super().__init__(max_workers=max_workers, thread_name_prefix=name)
|
||||
self._max_workers = max_workers
|
||||
self.queue = DelayedQueue()
|
||||
self.shutdown = False
|
||||
|
||||
def schedule_at_fixed_rate(self, fn: Callable, initial_delay: int, period: int, *args, **kwargs) -> bool:
|
||||
if self.shutdown:
|
||||
raise RuntimeError(f"cannot schedule new task after shutdown!")
|
||||
task = ScheduledTask(fn, initial_delay, period, *args, is_fixed_rate=True, executor_ctx=self, **kwargs)
|
||||
return self._put(task, initial_delay)
|
||||
|
||||
def schedule_at_fixed_delay(self, fn: Callable, initial_delay: int, period: int, *args, **kwargs) -> bool:
|
||||
if self.shutdown:
|
||||
raise RuntimeError(f"cannot schedule new task after shutdown!")
|
||||
task = ScheduledTask(fn, initial_delay, period, *args, is_fixed_delay=True, executor_ctx=self, **kwargs)
|
||||
return self._put(task, initial_delay)
|
||||
|
||||
def schedule(self, fn, initial_delay, *args, **kwargs) -> bool:
|
||||
task = ScheduledTask(fn, initial_delay, 0, *args, executor_ctx=self, **kwargs)
|
||||
return self._put(task, initial_delay)
|
||||
|
||||
def _put(self, task: ScheduledTask, delay: int) -> bool:
|
||||
# Don't use this explicitly. Use schedule/schedule_at_fixed_delay/schedule_at_fixed_rate. Additionally, to be
|
||||
# called by ScheduledTask only!
|
||||
if not isinstance(task, ScheduledTask):
|
||||
raise TypeError(f"Task `{task!r}` must be of type ScheduledTask")
|
||||
if delay < 0:
|
||||
raise ValueError(f"Delay `{delay}` must be a non-negative number")
|
||||
# logger.debug(f" enqueuing {task} with delay of {delay}")
|
||||
return self.queue.put(task, delay)
|
||||
|
||||
def __run(self):
|
||||
while not self.shutdown:
|
||||
try:
|
||||
task: ScheduledTask = self.queue.get()
|
||||
super().submit(task.run, *task.args, **task.kwargs)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def stop(self, app, wait_for_completion: Optional[bool] = False):
|
||||
self.shutdown = True
|
||||
super().shutdown(wait_for_completion)
|
||||
|
||||
def start(self, app):
|
||||
t = threading.Thread(target=self.__run)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
@ -0,0 +1,43 @@
|
||||
import abc
|
||||
import threading
|
||||
|
||||
from collections import UserList
|
||||
|
||||
|
||||
class Freezable(metaclass=abc.ABCMeta):
|
||||
def __init__(self):
|
||||
self._frozen = False
|
||||
|
||||
@property
|
||||
def frozen(self) -> bool:
|
||||
return self._frozen
|
||||
|
||||
def freeze(self):
|
||||
self._frozen = True
|
||||
|
||||
|
||||
class Signal(UserList, threading.Event):
|
||||
def __init__(self, owner: Freezable):
|
||||
UserList.__init__(self)
|
||||
threading.Event.__init__(self)
|
||||
self._owner = owner
|
||||
self.frozen = False
|
||||
|
||||
def __repr__(self):
|
||||
return "<Signal owner={}, frozen={}, {!r}>".format(
|
||||
self._owner, self.frozen, list(self)
|
||||
)
|
||||
|
||||
def send(self, *args, **kwargs):
|
||||
"""
|
||||
Sends data to all registered receivers.
|
||||
"""
|
||||
if self.frozen:
|
||||
raise RuntimeError("Cannot send on frozen signal.")
|
||||
|
||||
for receiver in self:
|
||||
receiver(*args, **kwargs)
|
||||
|
||||
self.frozen = True
|
||||
self._owner.freeze()
|
||||
self.set()
|
@ -0,0 +1,12 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from amqpworker import App # noqa: F401
|
||||
|
||||
|
||||
class SignalHandler:
|
||||
def startup(self, app: "App"):
|
||||
pass # pragma: no cover
|
||||
|
||||
def shutdown(self, app: "App"):
|
||||
pass # pragma: no cover
|
@ -0,0 +1,47 @@
|
||||
import logging
|
||||
from asyncio import Task
|
||||
from concurrent.futures import Future
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from amqpworker.connections import AMQPConnection
|
||||
from amqpworker.consumer import Consumer
|
||||
from amqpworker.options import RouteTypes
|
||||
from amqpworker.signals.handlers.base import SignalHandler
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from amqpworker.app import App # noqa: F401
|
||||
|
||||
|
||||
class RabbitMQ(SignalHandler):
|
||||
def shutdown(self, app: "App"):
|
||||
logger.debug('shutdown rabbit consumers')
|
||||
if RouteTypes.AMQP_RABBITMQ in app:
|
||||
for consumer in app[RouteTypes.AMQP_RABBITMQ]["consumers"]:
|
||||
logger.debug(f'stop {consumer.host}')
|
||||
consumer.stop()
|
||||
logger.debug(f'stopped {consumer.host}')
|
||||
|
||||
def startup(self, app: "App") -> List[Future]:
|
||||
tasks = []
|
||||
|
||||
app[RouteTypes.AMQP_RABBITMQ]["consumers"] = []
|
||||
for route_info in app.routes_registry.amqp_routes:
|
||||
conn: AMQPConnection = app.get_connection_for_route(route_info)
|
||||
|
||||
consumer = Consumer(
|
||||
route_info=route_info,
|
||||
host=conn.hostname,
|
||||
port=conn.port,
|
||||
username=conn.username,
|
||||
password=conn.password,
|
||||
prefetch_count=conn.prefetch,
|
||||
)
|
||||
app[RouteTypes.AMQP_RABBITMQ]["consumers"].append(consumer)
|
||||
conn.register(consumer.queue)
|
||||
task = app.loop.submit(consumer.start)
|
||||
|
||||
tasks.append(task)
|
||||
|
||||
return tasks
|
@ -0,0 +1,54 @@
|
||||
from typing import Any, Dict, Optional, Tuple, Type
|
||||
|
||||
from amqpworker.typing import get_args, get_origin
|
||||
|
||||
|
||||
class RegistryItem:
|
||||
def __init__(
|
||||
self, type: Type, value: Any, type_args: Optional[Tuple] = None
|
||||
) -> None:
|
||||
self.type = type
|
||||
self.value = value
|
||||
self.type_args = type_args
|
||||
|
||||
|
||||
class TypesRegistry:
|
||||
def __init__(self):
|
||||
self._data: Dict[Tuple, RegistryItem] = {}
|
||||
self._by_name: Dict[str, RegistryItem] = {}
|
||||
|
||||
def set(
|
||||
self,
|
||||
obj: Any,
|
||||
type_definition: Optional[Type] = None,
|
||||
param_name: Optional[str] = None,
|
||||
) -> None:
|
||||
has_type_args = get_args(type_definition)
|
||||
origin = get_origin(obj) or obj.__class__
|
||||
|
||||
registry_item = RegistryItem(
|
||||
type=origin, value=obj, type_args=has_type_args
|
||||
)
|
||||
|
||||
self._data[(origin, has_type_args)] = registry_item
|
||||
|
||||
if param_name:
|
||||
self._by_name[param_name] = registry_item
|
||||
|
||||
def get(
|
||||
self, _type: Type, param_name: Optional[str] = None, type_args=None
|
||||
) -> Optional[Any]:
|
||||
origin = get_origin(_type) or _type
|
||||
_type_args = type_args or get_args(_type)
|
||||
if param_name:
|
||||
try:
|
||||
item = self._by_name[param_name]
|
||||
if (item.type, item.type_args) == (origin, _type_args):
|
||||
return item.value
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
try:
|
||||
return self._data[(origin, get_args(_type))].value
|
||||
except KeyError:
|
||||
return None
|
@ -0,0 +1,66 @@
|
||||
from typing import Optional, Tuple, Type, get_type_hints
|
||||
|
||||
|
||||
def get_args(_type: Optional[Type]) -> Optional[Tuple]:
|
||||
if _type and hasattr(_type, "__args__"):
|
||||
return _type.__args__
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_origin(_type: Optional[Type]) -> Optional[Type]:
|
||||
if _type and hasattr(_type, "__origin__"):
|
||||
return _type.__origin__
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_handler_original_typehints(handler):
|
||||
"""
|
||||
Retorna a assinatura do handler amqpworker que está sendo decorado.
|
||||
O retorno dessa chamada é equivalente a:
|
||||
typing.get_type_hints(original_handler)
|
||||
Onde `original_handler` é o handler amqpworker original.
|
||||
|
||||
Ideal para ser usado na pilha de decorators de um handler, ex:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@deco1
|
||||
@deco2
|
||||
@deco3
|
||||
async def handler(...):
|
||||
pass
|
||||
|
||||
Nesse caso, se qualquer um dos 3 decorators precisar saber a assinatura
|
||||
original, deve usar essa função passando a função recebida do decorator anterior.
|
||||
|
||||
"""
|
||||
|
||||
def _dummy():
|
||||
pass
|
||||
|
||||
_dummy.__annotations__ = getattr(
|
||||
handler, "amqpworker_original_annotations", handler.__annotations__
|
||||
)
|
||||
|
||||
return get_type_hints(_dummy)
|
||||
|
||||
|
||||
def is_base_type(_type, base_type):
|
||||
"""
|
||||
Retorna True para argumentos de um tipo base `base_type`.
|
||||
Ex:
|
||||
(a: MyGeneric[int]) -> True
|
||||
(b: MyGeneric) -> True
|
||||
"""
|
||||
if get_origin(_type) is base_type:
|
||||
return True
|
||||
|
||||
return issubclass(_type, base_type)
|
||||
|
||||
|
||||
def get_handler_original_qualname(handler):
|
||||
return getattr(
|
||||
handler, "amqpworker_original_qualname", handler.__qualname__
|
||||
)
|
@ -0,0 +1,13 @@
|
||||
import functools
|
||||
|
||||
|
||||
def call(fn, *args, **kwargs):
|
||||
fn(*args, **kwargs)
|
||||
|
||||
|
||||
def entrypoint(f):
|
||||
@functools.wraps(f)
|
||||
def _(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return _
|
@ -0,0 +1,19 @@
|
||||
Copyright (c) 2017 B2W Digital
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
@ -0,0 +1,222 @@
|
||||
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "amqpstorm"
|
||||
version = "2.10.6"
|
||||
description = "Thread-safe Python RabbitMQ Client & Management library."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7"
|
||||
files = [
|
||||
{file = "AMQPStorm-2.10.6.tar.gz", hash = "sha256:52685be58fee7783e0d2739bca4c1e8630fab22ccacb510fa84fa3cdd383e230"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
pamqp = ">=2.0.0,<3.0"
|
||||
|
||||
[package.extras]
|
||||
management = ["requests (>2)"]
|
||||
pool = ["amqpstorm-pool"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "cached-property"
|
||||
version = "1.5.2"
|
||||
description = "A decorator for caching properties in classes."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "cached-property-1.5.2.tar.gz", hash = "sha256:9fa5755838eecbb2d234c3aa390bd80fbd3ac6b6869109bfc1b499f7bd89a130"},
|
||||
{file = "cached_property-1.5.2-py2.py3-none-any.whl", hash = "sha256:df4f613cf7ad9a588cc381aaf4a512d26265ecebd5eb9e1ba12f1319eb85a6a0"},
|
||||
]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "colorama"
|
||||
version = "0.4.6"
|
||||
description = "Cross-platform colored terminal text."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
|
||||
files = [
|
||||
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
|
||||
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
|
||||
]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "delayedqueue"
|
||||
version = "1.0.0"
|
||||
description = "Thread safe delay queue implementation"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "delayedqueue-1.0.0-py3-none-any.whl", hash = "sha256:2617f1d14e8b7210d08540f0581a7548d7a2dcd6afc9578c635a6b5cce4d03f0"},
|
||||
{file = "delayedqueue-1.0.0.tar.gz", hash = "sha256:34085363e5ae54584b2d5977f270fa5d9321ce9a857f3acab8608d711278b68c"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
testing = ["pytest", "pytest-cov", "setuptools"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "loguru"
|
||||
version = "0.7.0"
|
||||
description = "Python logging made (stupidly) simple"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
{file = "loguru-0.7.0-py3-none-any.whl", hash = "sha256:b93aa30099fa6860d4727f1b81f8718e965bb96253fa190fab2077aaad6d15d3"},
|
||||
{file = "loguru-0.7.0.tar.gz", hash = "sha256:1612053ced6ae84d7959dd7d5e431a0532642237ec21f7fd83ac73fe539e03e1"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
|
||||
win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
|
||||
|
||||
[package.extras]
|
||||
dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "pamqp"
|
||||
version = "2.3.0"
|
||||
description = "RabbitMQ Focused AMQP low-level library"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "pamqp-2.3.0-py2.py3-none-any.whl", hash = "sha256:2f81b5c186f668a67f165193925b6bfd83db4363a6222f599517f29ecee60b02"},
|
||||
{file = "pamqp-2.3.0.tar.gz", hash = "sha256:5cd0f5a85e89f20d5f8e19285a1507788031cfca4a9ea6f067e3cf18f5e294e8"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
codegen = ["lxml"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "pydantic"
|
||||
version = "1.10.7"
|
||||
description = "Data validation and settings management using python type hints"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "pydantic-1.10.7-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e79e999e539872e903767c417c897e729e015872040e56b96e67968c3b918b2d"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:01aea3a42c13f2602b7ecbbea484a98169fb568ebd9e247593ea05f01b884b2e"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:516f1ed9bc2406a0467dd777afc636c7091d71f214d5e413d64fef45174cfc7a"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ae150a63564929c675d7f2303008d88426a0add46efd76c3fc797cd71cb1b46f"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:ecbbc51391248116c0a055899e6c3e7ffbb11fb5e2a4cd6f2d0b93272118a209"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f4a2b50e2b03d5776e7f21af73e2070e1b5c0d0df255a827e7c632962f8315af"},
|
||||
{file = "pydantic-1.10.7-cp310-cp310-win_amd64.whl", hash = "sha256:a7cd2251439988b413cb0a985c4ed82b6c6aac382dbaff53ae03c4b23a70e80a"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:68792151e174a4aa9e9fc1b4e653e65a354a2fa0fed169f7b3d09902ad2cb6f1"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:dfe2507b8ef209da71b6fb5f4e597b50c5a34b78d7e857c4f8f3115effaef5fe"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10a86d8c8db68086f1e30a530f7d5f83eb0685e632e411dbbcf2d5c0150e8dcd"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d75ae19d2a3dbb146b6f324031c24f8a3f52ff5d6a9f22f0683694b3afcb16fb"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:464855a7ff7f2cc2cf537ecc421291b9132aa9c79aef44e917ad711b4a93163b"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:193924c563fae6ddcb71d3f06fa153866423ac1b793a47936656e806b64e24ca"},
|
||||
{file = "pydantic-1.10.7-cp311-cp311-win_amd64.whl", hash = "sha256:b4a849d10f211389502059c33332e91327bc154acc1845f375a99eca3afa802d"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:cc1dde4e50a5fc1336ee0581c1612215bc64ed6d28d2c7c6f25d2fe3e7c3e918"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e0cfe895a504c060e5d36b287ee696e2fdad02d89e0d895f83037245218a87fe"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:670bb4683ad1e48b0ecb06f0cfe2178dcf74ff27921cdf1606e527d2617a81ee"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:950ce33857841f9a337ce07ddf46bc84e1c4946d2a3bba18f8280297157a3fd1"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:c15582f9055fbc1bfe50266a19771bbbef33dd28c45e78afbe1996fd70966c2a"},
|
||||
{file = "pydantic-1.10.7-cp37-cp37m-win_amd64.whl", hash = "sha256:82dffb306dd20bd5268fd6379bc4bfe75242a9c2b79fec58e1041fbbdb1f7914"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8c7f51861d73e8b9ddcb9916ae7ac39fb52761d9ea0df41128e81e2ba42886cd"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:6434b49c0b03a51021ade5c4daa7d70c98f7a79e95b551201fff682fc1661245"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:64d34ab766fa056df49013bb6e79921a0265204c071984e75a09cbceacbbdd5d"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:701daea9ffe9d26f97b52f1d157e0d4121644f0fcf80b443248434958fd03dc3"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:cf135c46099ff3f919d2150a948ce94b9ce545598ef2c6c7bf55dca98a304b52"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b0f85904f73161817b80781cc150f8b906d521fa11e3cdabae19a581c3606209"},
|
||||
{file = "pydantic-1.10.7-cp38-cp38-win_amd64.whl", hash = "sha256:9f6f0fd68d73257ad6685419478c5aece46432f4bdd8d32c7345f1986496171e"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c230c0d8a322276d6e7b88c3f7ce885f9ed16e0910354510e0bae84d54991143"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:976cae77ba6a49d80f461fd8bba183ff7ba79f44aa5cfa82f1346b5626542f8e"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d45fc99d64af9aaf7e308054a0067fdcd87ffe974f2442312372dfa66e1001d"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2a5ebb48958754d386195fe9e9c5106f11275867051bf017a8059410e9abf1f"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:abfb7d4a7cd5cc4e1d1887c43503a7c5dd608eadf8bc615413fc498d3e4645cd"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:80b1fab4deb08a8292d15e43a6edccdffa5377a36a4597bb545b93e79c5ff0a5"},
|
||||
{file = "pydantic-1.10.7-cp39-cp39-win_amd64.whl", hash = "sha256:d71e69699498b020ea198468e2480a2f1e7433e32a3a99760058c6520e2bea7e"},
|
||||
{file = "pydantic-1.10.7-py3-none-any.whl", hash = "sha256:0cd181f1d0b1d00e2b705f1bf1ac7799a2d938cce3376b8007df62b29be3c2c6"},
|
||||
{file = "pydantic-1.10.7.tar.gz", hash = "sha256:cfc83c0678b6ba51b0532bea66860617c4cd4251ecf76e9846fa5a9f3454e97e"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
typing-extensions = ">=4.2.0"
|
||||
|
||||
[package.extras]
|
||||
dotenv = ["python-dotenv (>=0.10.4)"]
|
||||
email = ["email-validator (>=1.0.3)"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "typing-extensions"
|
||||
version = "4.5.0"
|
||||
description = "Backported and Experimental Type Hints for Python 3.7+"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "typing_extensions-4.5.0-py3-none-any.whl", hash = "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4"},
|
||||
{file = "typing_extensions-4.5.0.tar.gz", hash = "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb"},
|
||||
]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[[package]]
|
||||
name = "win32-setctime"
|
||||
version = "1.1.0"
|
||||
description = "A small Python utility to set file creation time on Windows"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
files = [
|
||||
{file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
|
||||
{file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
|
||||
|
||||
[package.source]
|
||||
type = "legacy"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
||||
reference = "douban"
|
||||
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "1ad42e5ed1903b95a7dcb25cd65c32d195b6937e5b31d95d4b2b19d2f0b4fc31"
|
@ -0,0 +1,26 @@
|
||||
[tool.poetry]
|
||||
name = "amqp-worker"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["JimZhang <zzl22100048@gmail.com>"]
|
||||
readme = "README.md"
|
||||
packages = [{include = "amqp_worker"}]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.9"
|
||||
amqpstorm = {version = "^2.10.6", source = "douban"}
|
||||
pydantic = {version = "1.10.7", source = "douban"}
|
||||
loguru = {version = "^0.7.0", source = "douban"}
|
||||
cached-property = {version = "^1.5.2", source = "douban"}
|
||||
delayedqueue = {version = "^1.0.0", source = "douban"}
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
|
||||
|
||||
[[tool.poetry.source]]
|
||||
name = "douban"
|
||||
url = "https://mirror.baidu.com/pypi/simple"
|
@ -0,0 +1,6 @@
|
||||
from loguru import logger
|
||||
|
||||
logger.error({'er':1,
|
||||
'aa':None
|
||||
|
||||
})
|
@ -0,0 +1,29 @@
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
|
||||
from amqpworker.app import App
|
||||
from amqpworker.connections import AMQPConnection
|
||||
from amqpworker.rabbitmq import RabbitMQMessage
|
||||
from amqpworker.routes import AMQPRouteOptions
|
||||
|
||||
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
|
||||
|
||||
app = App(connections=[amqp_conn])
|
||||
|
||||
|
||||
@app.amqp.consume(
|
||||
['test'],
|
||||
options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
|
||||
)
|
||||
def _handler(msgs: List[RabbitMQMessage]):
|
||||
print(f"Recv {len(msgs)} {datetime.now().isoformat()}")
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
@app.run_every(1)
|
||||
def produce(*args, **kwargs):
|
||||
# logger.error("tick produce")
|
||||
amqp_conn.put(data={'msg': 'ok'}, routing_key='test')
|
||||
|
||||
app.run()
|
Loading…
Reference in New Issue