diff --git a/amqpworker/consumer.py b/amqpworker/consumer.py index b01a957..d135a2b 100644 --- a/amqpworker/consumer.py +++ b/amqpworker/consumer.py @@ -1,7 +1,7 @@ import time import traceback from concurrent.futures import ThreadPoolExecutor -from concurrent.futures.thread import _shutdown +# from concurrent.futures.thread import _shutdown from typing import Dict, Type, Union from amqpstorm import AMQPError @@ -115,9 +115,10 @@ class Consumer(QueueConsumerDelegate): ) def _flush_bucket_if_needed(self): - try: - if not self.bucket.is_empty(): - all_messages = self.bucket.pop_all() + + if not self.bucket.is_empty(): + all_messages = self.bucket.pop_all() + try: conf.logger.debug( { "event": "bucket-flush", @@ -127,48 +128,48 @@ class Consumer(QueueConsumerDelegate): ) if self.running: rv = self._handler(all_messages) - for fu in self.pool.map(call, [m.process_success for m in all_messages]): + for _ in self.pool.map(call, [m.process_success for m in all_messages]): pass return rv else: for m in all_messages: m.process_exception() - except AMQPError: - raise - except Exception as e: - traceback.print_exc() - if self.running: - for fu in self.pool.map(call, [m.process_exception for m in all_messages]): - pass - else: - for m in all_messages: - m.process_exception() - raise e - - def on_queue_error(self, body, delivery_tag, error, queue): - """ - Callback called every time that an error occurred during the validation - or deserialization stage. - - :param body: unparsed, raw message content - :type body: Any - :param delivery_tag: delivery_tag of the consumed message - :type delivery_tag: int - :param error: THe error that caused the callback to be called - :type error: MessageError - :type queue: JsonQueue - """ - conf.logger.error( - { - "parse-error": True, - "exception": "Error: not a JSON", - "original_msg": body, - } - ) - try: - queue.ack(delivery_tag=delivery_tag) - except AMQPError as e: - self._log_exception(e) + except AMQPError: + raise + except Exception as e: + traceback.print_exc() + if self.running: + for _ in self.pool.map(call, [m.process_exception for m in all_messages]): + pass + else: + for m in all_messages: + m.process_exception() + raise e + + # def on_queue_error(self, body, delivery_tag, error, queue): + # """ + # Callback called every time that an error occurred during the validation + # or deserialization stage. + # + # :param body: unparsed, raw message content + # :type body: Any + # :param delivery_tag: delivery_tag of the consumed message + # :type delivery_tag: int + # :param error: THe error that caused the callback to be called + # :type error: MessageError + # :type queue: JsonQueue + # """ + # conf.logger.error( + # { + # "parse-error": True, + # "exception": "Error: not a JSON", + # "original_msg": body, + # } + # ) + # try: + # queue.ack(delivery_tag=delivery_tag) + # except AMQPError as e: + # self._log_exception(e) def on_message_handle_error(self, handler_error: Exception, **kwargs): """ @@ -232,7 +233,7 @@ class Consumer(QueueConsumerDelegate): self.consume_all_queues(self.queue) except RuntimeError as e: traceback.print_exc() - if self.multiple_queue_pool._shutdown or _shutdown: + if self.multiple_queue_pool._shutdown: # or _shutdown: conf.logger.info('app shutdown') # if 'interpreter shutdown' in str(e): # return