-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.py
100 lines (88 loc) · 3.02 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import argparse
import logging
import pika
import pprint
import time
from sys import getsizeof
LOG_FORMAT = (
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
"-35s %(lineno) -5d: %(message)s"
)
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"-h",
"--host",
dest="host",
default="localhost",
help="RabbitMQ host name",
)
parser.add_argument(
"-p",
"--port",
dest="port",
default=5672,
type=int,
help="RabbitMQ port",
)
parser.add_argument(
"-v",
"--vhost",
dest="vhost",
default="/",
help="RabbitMQ vhost",
)
args = parser.parse_args()
class Rabbitmq:
def __init__(self, host, port, vhost):
self.consolidated = []
credentials = pika.PlainCredentials(username="guest", password="guest")
parameters = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=vhost,
credentials=credentials,
heartbeat=5,
)
self.connection = pika.BlockingConnection(parameters=parameters)
self.consume_ch = self.connection.channel()
q3_test_q = self.consume_ch.queue_declare("q3_test", durable=True)
logging.info("declared queue: %s", pprint.pformat(q3_test_q))
self.consume_qname = q3_test_q.method.queue
self.publish_ch = self.connection.channel()
result_queue_q = self.publish_ch.queue_declare(queue="result_queue")
logging.info("declared queue: %s", pprint.pformat(result_queue_q))
self.publish_qname = result_queue_q.method.queue
self.consume_()
def consume_(self):
logging.info("START consumer")
self.consume_ch.basic_qos(prefetch_count=1)
totalsize = 0
for method_frame, props, body in self.consume_ch.consume(
queue=self.consume_qname
):
msgsize = getsizeof(body)
logging.info("consumed message, size %d", msgsize)
totalsize = totalsize + msgsize
# appending if the size of data is less than 4000 bytes
if totalsize < 4000:
self.consolidated.append(body)
self.consume_ch.basic_ack(method_frame.delivery_tag)
else:
# stopping to consume and processing the appended data
self.consume_ch.stop_consuming()
self.scan_data(self.consolidated)
self.consolidated.clear()
self.consume_()
def scan_data(self, data):
# processing data for 30 min
# NOTE: You WILL exceeed the channel timeout here if your task takes longer than 30 minutes
logging.info("sleeping for 10 seconds to simulate work...")
time.sleep(10)
# publishing
self.publish_ch.basic_publish(
exchange="", routing_key=self.publish_qname, body=str(data)
)
logging.info("published response")
ob = Rabbitmq(args.host, args.port, args.vhost)