diff --git a/amqpworker/easyqueue/message.py b/amqpworker/easyqueue/message.py index 806017e..ce71b0b 100644 --- a/amqpworker/easyqueue/message.py +++ b/amqpworker/easyqueue/message.py @@ -14,7 +14,6 @@ class AMQPMessage(Generic[T]): "queue_name", "serialized_data", "delivery_tag", - "_envelope", "_properties", "_deserialization_method", "_deserialized_data", diff --git a/amqpworker/easyqueue/queue.py b/amqpworker/easyqueue/queue.py index 0dcc7b7..de0c88e 100644 --- a/amqpworker/easyqueue/queue.py +++ b/amqpworker/easyqueue/queue.py @@ -18,8 +18,6 @@ from typing import ( ) from amqpstorm import Message -from loguru import logger - from amqpworker.easyqueue.connection import AMQPConnection from amqpworker.easyqueue.exceptions import UndecodableMessageException from amqpworker.easyqueue.message import AMQPMessage @@ -262,6 +260,7 @@ class JsonQueue(BaseQueue, Generic[T]): :param exchange: The exchange to publish the message :param routing_key: The routing key to publish the message """ + if data and serialized_data: raise ValueError("Only one of data or json should be specified") @@ -270,7 +269,10 @@ class JsonQueue(BaseQueue, Generic[T]): serialized_data = data else: serialized_data = self.serialize(data, ensure_ascii=False) - properties['Content-Type'] = 'application/json' + if properties is None: + properties = {'Content-Type': 'application/json'} + elif not properties.get('Content-Type'): + properties['Content-Type'] = 'application/json' if not isinstance(serialized_data, bytes): serialized_data = serialized_data.encode() diff --git a/poetry.lock b/poetry.lock index 5349d6b..95b10d6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -77,6 +77,43 @@ type = "legacy" url = "https://mirror.baidu.com/pypi/simple" reference = "douban" +[[package]] +name = "exceptiongroup" +version = "1.1.1" +description = "Backport of PEP 654 (exception groups)" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "exceptiongroup-1.1.1-py3-none-any.whl", hash = "sha256:232c37c63e4f682982c8b6459f33a8981039e5fb8756b2074364e5055c498c9e"}, + {file = "exceptiongroup-1.1.1.tar.gz", hash = "sha256:d484c3090ba2889ae2928419117447a14daf3c1231d5e30d0aae34f354f01785"}, +] + +[package.extras] +test = ["pytest (>=6)"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "iniconfig" +version = "2.0.0" +description = "brain-dead simple config-ini parsing" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, + {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + [[package]] name = "loguru" version = "0.7.0" @@ -101,6 +138,23 @@ type = "legacy" url = "https://mirror.baidu.com/pypi/simple" reference = "douban" +[[package]] +name = "packaging" +version = "23.1" +description = "Core utilities for Python packages" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "packaging-23.1-py3-none-any.whl", hash = "sha256:994793af429502c4ea2ebf6bf664629d07c1a9fe974af92966e4b8d2df7edc61"}, + {file = "packaging-23.1.tar.gz", hash = "sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + [[package]] name = "pamqp" version = "2.3.0" @@ -121,6 +175,27 @@ type = "legacy" url = "https://mirror.baidu.com/pypi/simple" reference = "douban" +[[package]] +name = "pluggy" +version = "1.0.0" +description = "plugin and hook calling mechanisms for python" +category = "dev" +optional = false +python-versions = ">=3.6" +files = [ + {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, + {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, +] + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + [[package]] name = "pydantic" version = "1.10.7" @@ -179,6 +254,74 @@ type = "legacy" url = "https://mirror.baidu.com/pypi/simple" reference = "douban" +[[package]] +name = "pytest" +version = "7.3.1" +description = "pytest: simple powerful testing with Python" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pytest-7.3.1-py3-none-any.whl", hash = "sha256:3799fa815351fea3a5e96ac7e503a96fa51cc9942c3753cda7651b93c1cfa362"}, + {file = "pytest-7.3.1.tar.gz", hash = "sha256:434afafd78b1d78ed0addf160ad2b77a30d35d4bdf8af234fe621919d9ed15e3"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "sys_platform == \"win32\""} +exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=0.12,<2.0" +tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} + +[package.extras] +testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "pytest-mock" +version = "3.10.0" +description = "Thin-wrapper around the mock package for easier use with pytest" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pytest-mock-3.10.0.tar.gz", hash = "sha256:fbbdb085ef7c252a326fd8cdcac0aa3b1333d8811f131bdcc701002e1be7ed4f"}, + {file = "pytest_mock-3.10.0-py3-none-any.whl", hash = "sha256:f4c973eeae0282963eb293eb173ce91b091a79c1334455acfac9ddee8a1c784b"}, +] + +[package.dependencies] +pytest = ">=5.0" + +[package.extras] +dev = ["pre-commit", "pytest-asyncio", "tox"] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + +[[package]] +name = "tomli" +version = "2.0.1" +description = "A lil' TOML parser" +category = "dev" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + +[package.source] +type = "legacy" +url = "https://mirror.baidu.com/pypi/simple" +reference = "douban" + [[package]] name = "typing-extensions" version = "4.5.0" @@ -219,4 +362,4 @@ reference = "douban" [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "1ad42e5ed1903b95a7dcb25cd65c32d195b6937e5b31d95d4b2b19d2f0b4fc31" +content-hash = "b8900190a2700cb316b4e37ff76589b5049edcffec156bd910a877caa5c4d38b" diff --git a/pyproject.toml b/pyproject.toml index 870cafb..33c20d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,10 @@ cached-property = {version = "^1.5.2", source = "douban"} delayedqueue = {version = "^1.0.0", source = "douban"} +[tool.poetry.group.dev.dependencies] +pytest = "^7.3.1" +pytest-mock = "^3.10.0" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" @@ -27,3 +31,10 @@ build-backend = "poetry.core.masonry.api" [[tool.poetry.source]] name = "douban" url = "https://mirror.baidu.com/pypi/simple" + +[tool.pytest.ini_options] +minversion = "6.0" +addopts = "-ra -q" +testpaths = [ + "tests", +] \ No newline at end of file diff --git a/tests/easyqueue/__init__.py b/tests/easyqueue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/easyqueue/base.py b/tests/easyqueue/base.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/easyqueue/test_base_json_queue.py b/tests/easyqueue/test_base_json_queue.py new file mode 100644 index 0000000..82d6d5c --- /dev/null +++ b/tests/easyqueue/test_base_json_queue.py @@ -0,0 +1,26 @@ +import pytest + +from amqpworker.easyqueue.queue import BaseJsonQueue + + +@pytest.fixture +def base_json_queue(mocker): + return BaseJsonQueue(mocker.ANY, mocker.ANY, mocker.ANY) + + +def test_serialize(base_json_queue): + body = {"teste": "aãç"} + result = base_json_queue.serialize(body) + assert result == '{"teste": "a\\u00e3\\u00e7"}' + + +def test_serialize_with_ensure_ascii_false(base_json_queue): + body = {"teste": "aãç"} + result = base_json_queue.serialize(body, ensure_ascii=False) + assert '{"teste": "aãç"}' == result + + +def test_deserialize(base_json_queue): + body = '{"teste": "aãç"}'.encode("utf-8") + result = base_json_queue.deserialize(body) + assert {"teste": "aãç"} == result diff --git a/tests/easyqueue/test_connection.py b/tests/easyqueue/test_connection.py new file mode 100644 index 0000000..89be202 --- /dev/null +++ b/tests/easyqueue/test_connection.py @@ -0,0 +1,57 @@ +import amqpstorm +import pytest +from amqpworker.easyqueue.connection import AMQPConnection +from tests.easyqueue.test_queue import SubscriptableMock + + +@pytest.fixture +def connection(mocker): + mocked_connection = mocker.Mock( + return_value=mocker.Mock(channel=SubscriptableMock(return_value=mocker.Mock(basic=mocker.Mock( + publish=mocker.Mock(), qos=mocker.Mock(), consume=mocker.Mock(return_value='consumer_666') + ))))) + mocker.patch.object(amqpstorm.Connection, '__new__', mocked_connection) + return mocked_connection, AMQPConnection(**dict( + host="money.que.é.good", + username="nós", + password="não", + virtual_host="have", + heartbeat=5, + )) + + +from amqpstorm import Connection + + +def test_connection_lock_ensures_amqp_connect_is_only_called_once( + mocker, connection +): + Mock = mocker.Mock + protocol = Mock(channel=Mock(is_open=True)) + connect = mocker.patch.object(Connection, "__new__", + return_value=protocol + ) + [connection[1]._connect() for _ in range(100)] + assert connect.call_count == 1 + + +def test_connects_with_correct_args(mocker, connection): + connection[1]._connect() + conn_params = dict( + host="money.que.é.good", + username="nós", + password="não", + virtual_host="have", + heartbeat=5, + ) + assert connection[0].call_args_list == [ + mocker.call( + amqpstorm.Connection, + hostname=conn_params["host"], + port=5672, + username=conn_params["username"], + password=conn_params["password"], + virtual_host=conn_params["virtual_host"], + heartbeat=conn_params["heartbeat"], + ) + ] diff --git a/tests/easyqueue/test_message.py b/tests/easyqueue/test_message.py new file mode 100644 index 0000000..1a3c86a --- /dev/null +++ b/tests/easyqueue/test_message.py @@ -0,0 +1,169 @@ +import pytest +from amqpworker.easyqueue.exceptions import UndecodableMessageException +from amqpworker.easyqueue.message import AMQPMessage + + +def test_lazy_deserialization_raises_an_error_if_deserialization_fails(mocker): + Mock = mocker.Mock + data = b"Xablau" + deserializer = Mock(side_effect=ValueError) + msg = AMQPMessage( + connection=Mock(), + channel=Mock(), + queue_name=Mock(), + serialized_data=data, + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=deserializer, + queue=Mock(), + ) + + with pytest.raises(UndecodableMessageException): + _ = msg.deserialized_data + + deserializer.assert_called_once_with(data) + + +def test_successful_deserialization(mocker): + Mock = mocker.Mock + data = b'["Xablau"]' + deserializer = Mock(return_value=["Xablau"]) + msg = AMQPMessage( + connection=Mock(), + channel=Mock(), + queue_name=Mock(), + serialized_data=data, + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=deserializer, + queue=Mock(), + ) + assert msg.deserialized_data == ["Xablau"] + + +def test_deserialization_is_only_called_once(mocker): + Mock = mocker.Mock + data = b'["Xablau"]' + deserializer = Mock(return_value=["Xablau"]) + + msg = AMQPMessage( + queue=Mock(), + connection=Mock(), + channel=Mock(), + queue_name=Mock(), + serialized_data=data, + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=deserializer, + ) + + _ = [msg.deserialized_data for _ in range(10)] + + deserializer.assert_called_once_with(data) + + +def test_equal_messages(mocker): + Mock = mocker.Mock + msg1 = AMQPMessage( + connection=Mock(), + channel=Mock(), + queue_name=Mock(), + serialized_data=Mock(), + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=Mock(), + queue=Mock(), + ) + msg2 = AMQPMessage( + connection=msg1.connection, + channel=msg1.channel, + queue_name=msg1.queue_name, + serialized_data=msg1.serialized_data, + delivery_tag=msg1.delivery_tag, + properties=msg1._properties, + deserialization_method=msg1._deserialization_method, + queue=msg1._queue, + ) + assert msg1 == msg2 + + +def test_not_equal_messages(mocker): + Mock = mocker.Mock + msg1 = AMQPMessage( + connection=Mock(), + channel=Mock(), + queue_name=Mock(), + serialized_data=Mock(), + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=Mock(), + queue=Mock(), + ) + + msg2 = AMQPMessage( + connection=msg1.connection, + channel=Mock(), + queue_name=msg1.queue_name, + serialized_data=msg1.serialized_data, + delivery_tag=msg1.delivery_tag, + properties=msg1._properties, + deserialization_method=msg1._deserialization_method, + queue=Mock(), + ) + assert msg1 != msg2 + + +def test_it_acks_messages(mocker): + Mock = mocker.Mock + msg = AMQPMessage( + connection=Mock(), + channel=Mock(basic=Mock(ack=Mock())), + queue_name=Mock(), + serialized_data=Mock(), + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=Mock(), + queue=Mock(), + ) + msg.ack() + + msg.channel.basic.ack.assert_called_once_with(msg.delivery_tag) + + +def test_it_rejects_messages_without_requeue(mocker): + Mock = mocker.Mock + msg = AMQPMessage( + connection=Mock(), + channel=Mock(return_value=Mock(basic=Mock(reject=Mock()))), + queue_name=Mock(), + serialized_data=Mock(), + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=Mock(), + queue=Mock(), + ) + + msg.reject() + + msg.channel.basic.reject.assert_called_once_with( + delivery_tag=msg.delivery_tag, requeue=False + ) + + +def test_it_rejects_messages_with_requeue(mocker): + Mock = mocker.Mock + msg = AMQPMessage( + connection=Mock(), + channel=Mock(return_value=Mock(basic=Mock(reject=Mock()))), + queue_name=Mock(), + serialized_data=Mock(), + delivery_tag=Mock(), + properties=Mock(), + deserialization_method=Mock(), + queue=Mock(), + ) + + msg.reject(requeue=True) + msg.channel.basic.reject.assert_called_once_with( + delivery_tag=msg.delivery_tag, requeue=True + ) diff --git a/tests/easyqueue/test_queue.py b/tests/easyqueue/test_queue.py new file mode 100644 index 0000000..148538a --- /dev/null +++ b/tests/easyqueue/test_queue.py @@ -0,0 +1,493 @@ +import json +import logging + +import amqpstorm +import pytest +from unittest.mock import Mock as Mk +from amqpworker.easyqueue.message import AMQPMessage +from amqpworker.easyqueue.queue import ( + ConnType, + JsonQueue, + QueueConsumerDelegate, + _ConsumptionHandler, + _ensure_conn_is_ready, +) + + +def test_it_raises_an_error_if_its_initialized_with_both_delegate_and_delegate_class(mocker): + with pytest.raises(ValueError) as e: + JsonQueue( + host="127.0.0.1", + username="guest", + password="guest", + delegate=mocker.Mock(), + delegate_class=mocker.Mock(), + ) + + +def test_its_possible_to_initialize_without_a_delegate(): + queue = JsonQueue( + host="127.0.0.1", + username="guest", + password="guest", + ) + + assert isinstance(queue, JsonQueue) + + +def test_it_initializes_a_delegate_if_delegate_class_is_provided(mocker): + Mock = mocker.Mock + delegate_class = Mock() + JsonQueue(Mock(), Mock(), Mock(), delegate_class=delegate_class) + delegate_class.assert_called_once_with() + + +class SettableMock(Mk): + def __setitem__(self, key, value): + pass + + +class SubscriptableMock(Mk): + def __getitem__(self, item): + if item == "consumer_tag": + return 'consumer_666' + raise NotImplementedError + + # basic = Mk( + # + # publish=Mk(), + # qos=Mk(), + # consume=Mk( + # return_value='consumer_666' + # ), + # + # ) + + +@pytest.fixture +def connected_queue(mocker): + mocked_connection = mocker.Mock( + return_value=mocker.Mock(channel=SubscriptableMock(return_value=mocker.Mock(basic=mocker.Mock( + publish=mocker.Mock(), qos=mocker.Mock(), consume=mocker.Mock(return_value='consumer_666') + ))))) + mocker.patch.object(amqpstorm.Connection, '__new__', mocked_connection) + return JsonQueue(host="127.0.0.1", + username="guest", + password="guest", delegate=mocker.Mock()) + + +@pytest.fixture +def write_conn(connected_queue): + return connected_queue.conn_for(ConnType.WRITE) + + +def test_it_dont_call_consumer_handler_methods(mocker, connected_queue): + assert not connected_queue.delegate.on_queue_message.called + + +def test_it_puts_messages_into_queue_as_json_if_message_is_a_json_serializeable(mocker, connected_queue, write_conn): + Mock = mocker.Mock + message = { + "artist": "Great White", + "song": "Once Bitten Twice Shy", + "album": "Twice Shy", + } + + exchange = Mock() + routing_key = Mock() + properties = SettableMock() + mandatory = Mock() + immediate = Mock() + connected_queue.put( + data=message, + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + expected = mocker.call(body=json.dumps(message).encode(), + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, ) + assert [expected] == write_conn.channel.basic.publish.call_args_list + + +def test_it_raises_an_error_if_both_data_and_json_are_passed_to_put_message( + mocker, connected_queue, write_conn +): + Mock = mocker.Mock + message = { + "artist": "Great White", + "song": "Once Bitten Twice Shy", + "album": "Twice Shy", + } + exchange = Mock() + routing_key = Mock() + properties = SettableMock() + mandatory = Mock() + immediate = Mock() + with pytest.raises(ValueError): + connected_queue.put( + serialized_data=json.dumps(message), + data=message, + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + expected = mocker.call( + body=json.dumps(message).encode(), + routing_key=routing_key, + exchange_name=exchange, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + write_conn.channel.basic.publish.assert_not_called() + + +def test_it_encodes_payload_into_bytes_if_payload_is_str(mocker, connected_queue, write_conn): + Mock = mocker.Mock + payload = json.dumps({"dog": "Xablau"}) + exchange = Mock() + routing_key = Mock() + properties = SettableMock() + mandatory = Mock() + immediate = Mock() + connected_queue.put( + serialized_data=payload, + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + + write_conn.channel.basic.publish.assert_called_once_with( + body=payload.encode(), + routing_key=routing_key, + exchange=exchange, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + + +def test_it_doesnt_encodes_payload_into_bytes_if_payload_is_already_bytes( + mocker, connected_queue, write_conn +): + Mock = mocker.Mock + payload = json.dumps({"dog": "Xablau"}).encode() + exchange = Mock() + routing_key = Mock() + properties = SettableMock() + mandatory = Mock() + immediate = Mock() + connected_queue.put( + serialized_data=payload, + exchange=exchange, + routing_key=routing_key, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + + write_conn.channel.basic.publish.assert_called_once_with( + body=payload, + routing_key=routing_key, + exchange=exchange, + properties=properties, + mandatory=mandatory, + immediate=immediate, + ) + + +def test_connect_gets_if_put_is_called_before_connect(mocker, connected_queue, write_conn): + message = { + "artist": "Great White", + "song": "Once Bitten Twice Shy", + "album": "Twice Shy", + } + Mock = mocker.Mock + connect = mocker.patch.object(write_conn, "_connect") + mocker.patch.object( + write_conn, + "channel", + Mock(is_open=False, return_value={'basic': Mock( + publish=Mock() + )}), + ) + connected_queue.put(data=message, routing_key="Xablau") + connect.assert_called_once() + + +def test_it_raises_and_error_if_put_message_isnt_json_serializeable( + mocker, connected_queue, write_conn +): + Mock = mocker.Mock + message = Mock() + exchange = Mock() + routing_key = Mock() + with pytest.raises(TypeError): + connected_queue.put(message, exchange=exchange, routing_key=routing_key) + write_conn.channel.basic.publish.assert_not_called() + + +@pytest.fixture +def consume_conn(connected_queue): + return connected_queue.conn_for(ConnType.CONSUME) + + +class ConsumeException(Exception): + pass + + +def test_it_calls_on_before_start_consumption_before_queue_consume( + mocker, connected_queue, consume_conn +): + Mock = mocker.Mock + connected_queue.connection._connect() + mocker.patch.object(connected_queue.connection.channel.basic, 'consume', side_effect=ConsumeException()) + delegate = mocker.Mock(on_before_start_consumption=mocker.Mock()) + queue_name = mocker.Mock() + with pytest.raises(ConsumeException): + connected_queue.consume(queue_name, Mock(), delegate) + delegate.on_before_start_consumption.assert_called_once_with( + queue_name=queue_name, queue=connected_queue + ) + + +def test_connect_gets_called_if_consume_is_called_before_connect( + mocker, connected_queue +): + Mock = mocker.Mock + channel = Mock( + is_open=False, + return_value={ + 'basic': Mock(qoc=Mock(), consume=Mock()) + } + ) + connect = mocker.patch.object( + connected_queue.connection, "_connect" + ) + mocker.patch.object(connected_queue.connection, "channel", channel) + queue_name = Mock() + connected_queue.consume( + queue_name, Mock(), delegate=Mock(spec=QueueConsumerDelegate) + ) + connect.assert_called_once() + + +def test_calling_consume_starts_message_consumption(mocker, connected_queue): + Mock = mocker.Mock + connected_queue.connection._connect() + connected_queue.consume(queue_name=Mock(), pool=Mock(), delegate=Mock(spec=QueueConsumerDelegate)) + assert connected_queue.connection.channel.basic.consume.call_count == 1 + + +def test_calling_consume_binds_handler_method(mocker, connected_queue): + Mock = mocker.Mock + connected_queue.connection._connect() + channel = connected_queue.connection.channel + queue_name = Mock() + consumer_name = Mock() + expected_prefetch_count = 666 + connected_queue.prefetch_count = expected_prefetch_count + Handler = mocker.patch( + "amqpworker.easyqueue.queue._ConsumptionHandler", + return_value=Mock(spec=_ConsumptionHandler), + ) + delegate = Mock(spec=QueueConsumerDelegate) + pool = Mock() + connected_queue.consume( + queue_name=queue_name, + pool=pool, + consumer_name=consumer_name, + delegate=delegate, + ) + expected = mocker.call( + callback=mocker.ANY, queue=queue_name, consumer_tag=consumer_name + ) + assert connected_queue.connection.channel.basic.consume.call_args_list == [expected] + _, kwargs = channel.basic.consume.call_args_list[0] + callback = kwargs["callback"] + message = Mock() + callback( + message=message + ) + Handler.assert_called_once_with( + delegate=delegate, queue=connected_queue, queue_name=queue_name + ) + Handler.return_value.handle_message.assert_called_once_with( + message=message + ) + + +def test_calling_consume_sets_a_prefetch_qos(mocker, connected_queue): + Mock = mocker.Mock + connected_queue.connection._connect() + expected_prefetch_count = 666 + connected_queue.prefetch_count = expected_prefetch_count + connected_queue.consume( + queue_name=Mock(), pool=Mock(), delegate=Mock(spec=QueueConsumerDelegate) + ) + expected = mocker.call( + global_=mocker.ANY, + prefetch_count=expected_prefetch_count, + prefetch_size=0, + ) + assert connected_queue.connection.channel.basic.qos.call_args_list == [expected] + + +def test_calling_consume_starts_a_connection(mocker, connected_queue): + Mock = mocker.Mock + mocked_connection = mocker.Mock(return_value=mocker.Mock()) + _connect = mocker.patch.object(amqpstorm.Connection, '__new__', mocked_connection) + consumer = Mock(spec=QueueConsumerDelegate) + assert not _connect.called + connected_queue.consume( + queue_name="test_queue", pool=Mock(), delegate=consumer + ) + assert _connect.called + + +def test_calling_consume_notifies_delegate(mocker, connected_queue): + Mock = mocker.Mock + expected_prefetch_count = 666 + connected_queue.prefetch_count = expected_prefetch_count + delegate = Mock(spec=QueueConsumerDelegate) + connected_queue.consume( + queue_name="test_queue", pool=Mock(), delegate=delegate + ) + + delegate.on_before_start_consumption.assert_called_once_with( + queue_name="test_queue", queue=connected_queue + ) + delegate.on_consumption_start.assert_called_once_with( + consumer_tag="consumer_666", queue=connected_queue + ) + + +@pytest.fixture +def handler_method(mocker, connected_queue): + properties = SettableMock(name="Properties") + delegate = mocker.Mock(spec=QueueConsumerDelegate) + consumer_tag = connected_queue.consume( + queue_name="test_queue", + pool=mocker.Mock(), + delegate=delegate, + consumer_name='fixture', + ) + + handler = _ConsumptionHandler( + delegate=delegate, + queue=connected_queue, + queue_name="test_queue", + ) + return properties, delegate, handler, mocker.Mock(name="method", consumer_tag=consumer_tag, delivery_tag='1') + + +def test_it_calls_on_queue_message_with_the_message_body_wrapped_as_a_AMQPMessage_instance(mocker, connected_queue, + handler_method): + content = { + "artist": "Caetano Veloso", + "song": "Não enche", + "album": "Livro", + } + body = json.dumps(content).encode("utf-8") + properties, delegate, handler, method = handler_method + _handle_callback = mocker.patch.object(handler, "_handle_callback", mocker.Mock()) + message = mocker.Mock(channel=connected_queue.connection.channel, body=body, method=method, properties=properties, + delivery_tag='1') + handler.handle_message( + message=message + ) + amqp_message = AMQPMessage( + connection=connected_queue.connection, + channel=connected_queue.connection.channel, + queue=connected_queue, + properties=properties, + delivery_tag=method.delivery_tag, + deserialization_method=connected_queue.deserialize, + queue_name="test_queue", + serialized_data=body, + ) + _handle_callback.assert_called_once_with( + handler.delegate.on_queue_message, + msg=amqp_message, + ) + + +def test_it_calls_on_message_handle_error_if_message_handler_raises_an_error( + mocker, connected_queue, handler_method +): + content = { + "artist": "Caetano Veloso", + "song": "Não enche", + "album": "Livro", + } + properties, delegate, handler, method = handler_method + error = handler.delegate.on_queue_message.side_effect = KeyError() + kwargs = dict( + callback=handler.delegate.on_queue_message, + channel=connected_queue.connection.channel, + body=json.dumps(content), + properties=properties, + ) + handler._handle_callback(**kwargs) + del kwargs["callback"] + handler.delegate.on_message_handle_error.assert_called_once_with( + handler_error=error, **kwargs + ) + + +@pytest.fixture +def ensure_queue(mocker): + Mock = mocker.Mock + return JsonQueue( + "127.0.0.1", + "guest", + "guest", + seconds_between_conn_retry=666, + logger=Mock(spec=logging.Logger), + connection_fail_callback=Mock(), + ) + + +def test_it_waits_before_trying_to_reconnect_if_connect_fails(mocker, ensure_queue): + Mock = mocker.Mock + coro = Mock() + sleep = mocker.patch("amqpworker.easyqueue.queue.time.sleep") + mocker.patch.object(ensure_queue.connection, '_connect', Mock(side_effect=[ConnectionError, True])) + wrapped = _ensure_conn_is_ready(ConnType.CONSUME)(coro) + wrapped(ensure_queue, 1, dog="Xablau") + sleep.assert_called_once_with(666) + ensure_queue.connection._connect.assert_has_calls([mocker.call(), mocker.call()]) + coro.assert_called_once_with(ensure_queue, 1, dog="Xablau") + + +def test_it_logs_connection_retries_if_a_logger_istance_is_available(mocker, ensure_queue): + Mock = mocker.Mock + coro = Mock() + mocker.patch("amqpworker.easyqueue.queue.time.sleep") + mocker.patch.object(ensure_queue.connection, '_connect', Mock(side_effect=[ConnectionError, True])) + wrapped = _ensure_conn_is_ready(ConnType.CONSUME)(coro) + wrapped(ensure_queue, 1, dog="Xablau") + ensure_queue.logger.error.assert_called_once() + + +def test_it_calls_connection_fail_callback_if_connect_fails(mocker, ensure_queue): + error = ConnectionError() + Mock = mocker.Mock + coro = Mock() + mocker.patch("amqpworker.easyqueue.queue.time.sleep") + mocker.patch.object(ensure_queue.connection, '_connect', Mock(side_effect=[error, True])) + wrapped = _ensure_conn_is_ready(ConnType.CONSUME)(coro) + wrapped(ensure_queue, 1, dog="Xablau") + ensure_queue.connection_fail_callback.assert_called_once_with( + error, 1 + ) diff --git a/tests/rabbitmq/__init__.py b/tests/rabbitmq/__init__.py new file mode 100644 index 0000000..e69de29