-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMTA_emitter.py
87 lines (70 loc) · 2.69 KB
/
MTA_emitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Created by: A. C. Coffin
May 2024
Emitter designed to send a message to a queue on the RabbitMQ server.
Make sure the RabbitMQ server is on and connected.
Run this and then run the listener.
--
This emitter contains a loop that will run through all 50 entries in the MTAHourlyData50R.csv file.
The process can be interrupted using CTRL+C if an escape is needed.
"""
import pika
import sys
import time
from datetime import datetime
import csv
import logging
import random
import pika.exceptions
from util_logger import setup_logger
# Configure logging
logger, logname = setup_logger(__file__)
input_file_name = "MTAHourlyData50R.csv"
#Defining Functions:
def preprare_message_from_row(row):
transit_date, transit_time, station_complex_id, station_complex, borough, rideship = row
# CSV Read
def stream_row(input_file_name):
with open(input_file_name, "r") as input_file:
reader=csv.reader(input_file, delimiter=",")
header = next(reader)
for row in reader:
yield row
def send_message(ns: str = "localhost"):
"""
Creates and sends a message to the cue with each execution, process runs and finishes.
"""
try:
# Creates a blocking connection to the RabbitMQ server
conection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch = conection.channel()
# Declaring the queue
ch.queue_declare(queue="MTA_task", durable=True)
# Creates a loop to emitt more than one message from the csv
while True:
# Pulls data from the csv file and creates strings that can be read.
for message in stream_row('MTAHourlyData50R.csv'):
MTAData = ','.join(message)
# Converting Data to a string
message = f"{MTAData}"
# Publish to MTA_task
ch.basic_publish(
exchange = "",
routing_key="MTA_task",
body=message,
properties=pika.BasicProperties(content_type='text/plain', delivery_mode=2)
)
logging.info(f"[x] Sent '{message}'")
print(f" [x] sent {message}")
time.sleep(random.uniform(1, 8)) # random publishing between 1 and 8 seconds
except KeyboardInterrupt:
logging.info("KeyboardInterrupt. Stopping the program.")
except pika.exceptions.AMQPConnectionError as e:
logger.error(f"Error: Connection to RabbitMQ server failed: {e}")
sys.exit(1)
finally:
# Closing the connection
logging.info("\nclosing connection. Goodby\n")
conection.close()
if __name__ == "__main__":
send_message("localhost")