|
|
@ -1,7 +1,7 @@
|
|
|
|
import time
|
|
|
|
import time
|
|
|
|
import traceback
|
|
|
|
import traceback
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from concurrent.futures.thread import _shutdown
|
|
|
|
# from concurrent.futures.thread import _shutdown
|
|
|
|
from typing import Dict, Type, Union
|
|
|
|
from typing import Dict, Type, Union
|
|
|
|
|
|
|
|
|
|
|
|
from amqpstorm import AMQPError
|
|
|
|
from amqpstorm import AMQPError
|
|
|
@ -115,9 +115,10 @@ class Consumer(QueueConsumerDelegate):
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _flush_bucket_if_needed(self):
|
|
|
|
def _flush_bucket_if_needed(self):
|
|
|
|
try:
|
|
|
|
|
|
|
|
if not self.bucket.is_empty():
|
|
|
|
if not self.bucket.is_empty():
|
|
|
|
all_messages = self.bucket.pop_all()
|
|
|
|
all_messages = self.bucket.pop_all()
|
|
|
|
|
|
|
|
try:
|
|
|
|
conf.logger.debug(
|
|
|
|
conf.logger.debug(
|
|
|
|
{
|
|
|
|
{
|
|
|
|
"event": "bucket-flush",
|
|
|
|
"event": "bucket-flush",
|
|
|
@ -127,7 +128,7 @@ class Consumer(QueueConsumerDelegate):
|
|
|
|
)
|
|
|
|
)
|
|
|
|
if self.running:
|
|
|
|
if self.running:
|
|
|
|
rv = self._handler(all_messages)
|
|
|
|
rv = self._handler(all_messages)
|
|
|
|
for fu in self.pool.map(call, [m.process_success for m in all_messages]):
|
|
|
|
for _ in self.pool.map(call, [m.process_success for m in all_messages]):
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
return rv
|
|
|
|
return rv
|
|
|
|
else:
|
|
|
|
else:
|
|
|
@ -138,37 +139,37 @@ class Consumer(QueueConsumerDelegate):
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
traceback.print_exc()
|
|
|
|
traceback.print_exc()
|
|
|
|
if self.running:
|
|
|
|
if self.running:
|
|
|
|
for fu in self.pool.map(call, [m.process_exception for m in all_messages]):
|
|
|
|
for _ in self.pool.map(call, [m.process_exception for m in all_messages]):
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
for m in all_messages:
|
|
|
|
for m in all_messages:
|
|
|
|
m.process_exception()
|
|
|
|
m.process_exception()
|
|
|
|
raise e
|
|
|
|
raise e
|
|
|
|
|
|
|
|
|
|
|
|
def on_queue_error(self, body, delivery_tag, error, queue):
|
|
|
|
# def on_queue_error(self, body, delivery_tag, error, queue):
|
|
|
|
"""
|
|
|
|
# """
|
|
|
|
Callback called every time that an error occurred during the validation
|
|
|
|
# Callback called every time that an error occurred during the validation
|
|
|
|
or deserialization stage.
|
|
|
|
# or deserialization stage.
|
|
|
|
|
|
|
|
#
|
|
|
|
:param body: unparsed, raw message content
|
|
|
|
# :param body: unparsed, raw message content
|
|
|
|
:type body: Any
|
|
|
|
# :type body: Any
|
|
|
|
:param delivery_tag: delivery_tag of the consumed message
|
|
|
|
# :param delivery_tag: delivery_tag of the consumed message
|
|
|
|
:type delivery_tag: int
|
|
|
|
# :type delivery_tag: int
|
|
|
|
:param error: THe error that caused the callback to be called
|
|
|
|
# :param error: THe error that caused the callback to be called
|
|
|
|
:type error: MessageError
|
|
|
|
# :type error: MessageError
|
|
|
|
:type queue: JsonQueue
|
|
|
|
# :type queue: JsonQueue
|
|
|
|
"""
|
|
|
|
# """
|
|
|
|
conf.logger.error(
|
|
|
|
# conf.logger.error(
|
|
|
|
{
|
|
|
|
# {
|
|
|
|
"parse-error": True,
|
|
|
|
# "parse-error": True,
|
|
|
|
"exception": "Error: not a JSON",
|
|
|
|
# "exception": "Error: not a JSON",
|
|
|
|
"original_msg": body,
|
|
|
|
# "original_msg": body,
|
|
|
|
}
|
|
|
|
# }
|
|
|
|
)
|
|
|
|
# )
|
|
|
|
try:
|
|
|
|
# try:
|
|
|
|
queue.ack(delivery_tag=delivery_tag)
|
|
|
|
# queue.ack(delivery_tag=delivery_tag)
|
|
|
|
except AMQPError as e:
|
|
|
|
# except AMQPError as e:
|
|
|
|
self._log_exception(e)
|
|
|
|
# self._log_exception(e)
|
|
|
|
|
|
|
|
|
|
|
|
def on_message_handle_error(self, handler_error: Exception, **kwargs):
|
|
|
|
def on_message_handle_error(self, handler_error: Exception, **kwargs):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
@ -232,7 +233,7 @@ class Consumer(QueueConsumerDelegate):
|
|
|
|
self.consume_all_queues(self.queue)
|
|
|
|
self.consume_all_queues(self.queue)
|
|
|
|
except RuntimeError as e:
|
|
|
|
except RuntimeError as e:
|
|
|
|
traceback.print_exc()
|
|
|
|
traceback.print_exc()
|
|
|
|
if self.multiple_queue_pool._shutdown or _shutdown:
|
|
|
|
if self.multiple_queue_pool._shutdown: # or _shutdown:
|
|
|
|
conf.logger.info('app shutdown')
|
|
|
|
conf.logger.info('app shutdown')
|
|
|
|
# if 'interpreter shutdown' in str(e):
|
|
|
|
# if 'interpreter shutdown' in str(e):
|
|
|
|
# return
|
|
|
|
# return
|
|
|
|