python amqp framework
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
JimZhang 89ce51d063 feat: support get queue name 11 months ago
amqpworker feat: support get queue name 11 months ago
example feat: use orjson 1 year ago
tests feat: use orjson 1 year ago
.gitignore feat: init project amqp-worker 2 years ago
LICENSE docs: add lisence 2 years ago
README.md docs(readme): remove content 2 years ago
README_zh.md docs(readme): chinese version 2 years ago
async-worker_license feat: init project amqp-worker 2 years ago
poetry.lock feat: support get queue name 11 months ago
pyproject.toml feat: support get queue name 11 months ago

README.md

🐰amqp-worker

English | 简体中文

amqp-worker is a Python-based multi-threaded RabbitMQ consumer framework. It allows you to consume messages more efficiently and stably.

Features

  • Batch consumption: process messages in batches, improve consumption efficiency.
  • Automatic reconnection: when RabbitMQ service disconnects, amqp-worker will automatically reconnect, ensuring uninterrupted consumption.
  • Customizable consumption mode: freely decide to use multi-threading and coroutines in the consumption function.
  • Configurable message acknowledgment mode: support automatic acknowledgment and manual acknowledgment modes, configure according to your consumption needs.
  • Configurable exception handling: support global configuration of message exception consumption mode, re-enter queue, re-insert, consume message.

Installation

You can use pip tool to install amqp-worker:

pip install amqp-workers

Usage

First, you need to import the amqp_worker module in your Python code:

from amqpworker.app import App

Then, you need to instantiate an App object, and the App object depends on the AMQPConnection object:

from amqpworker.connections import AMQPConnection
amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)

app = App(connections=[amqp_conn])

Next, you need to define the consumption function:

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

In the above code we give the consumption function a decorator, giving the consumption queue, the number of consumption per batch, it is worth noting that the parameter type of the consumption function is List[RabbitMQMessage]

Finally, just call the run method to start consuming:

app.run()

Example code

Below is a simple example code that will consume messages from a queue named test:

from datetime import datetime
from typing import List

from amqpworker.app import App
from amqpworker.connections import AMQPConnection
from amqpworker.rabbitmq import RabbitMQMessage
from amqpworker.routes import AMQPRouteOptions

amqp_conn = AMQPConnection(hostname='127.0.0.1', username='guest', password='guest', port=5672)
app = App(connections=[amqp_conn])

@app.amqp.consume(
    ['test'],
    options=AMQPRouteOptions(bulk_size=1024 * 8, bulk_flush_interval=2)
)
def _handler(msgs: List[RabbitMQMessage]):
    print(f"Recv {len(msgs)} {datetime.now().isoformat()}")

app.run()

Contributors

License

amqp-worker uses MIT license. Please refer to LICENSE file for details.