forked from zjuluolun/BEVPlace2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbev_place_image_publisher.py
134 lines (99 loc) · 3.76 KB
/
bev_place_image_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
# Addressing the Protobuf issue
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = "python"
import sys
import signal
import argparse
import zmq
import datetime
import time
import sched
from termcolor import cprint
from PIL import Image
sys.path.append('/home/ubuntu/Anantak/Admin/messaging');
import sensor_messages_pb2
# Create the publisher for image receiver.
# We use the map-editor error reporter parameters
# name: "MapEditorErrorsReporter"
# endpoint: "tcp://127.0.0.1:51705"
# subject: ""
# message ImageMessage {
# required int32 camera_num = 101;
# required int32 height = 102;
# required int32 width = 103;
# required int32 depth = 104;
# required bytes image_data = 200;
# repeated float posn_data = 300;
# optional int64 timestamp_us = 400;
# }
class ImagePublisher:
# Static variables of the class
# ZMQ socket objects
ZMQ_MapEditorErrorsReporter_PORT = 7776
zmq_context = zmq.Context()
zmq_MapEditorErrorsReporter_socket = zmq_context.socket(zmq.PUB)
zmq_MapEditorErrorsReporter_url = f"tcp://127.0.0.1:{ZMQ_MapEditorErrorsReporter_PORT}"
zmq_MapEditorErrorsReporter_socket.bind(zmq_MapEditorErrorsReporter_url)
owner_names = {}
owner_msgs = {}
owner_summing_msgs = {}
# Instance init
def __init__(self):
pass
@staticmethod
def transmit_image(image_path):
try:
with Image.open(image_path) as img:
# img.show()
img_bytes = img.tobytes()
now_ts_us = int(datetime.datetime.now().timestamp()*1000000)
sensor_msg = sensor_messages_pb2.SensorMsg()
sensor_msg.header.type = "BEVImage"
sensor_msg.header.timestamp = now_ts_us
sensor_msg.header.recieve_timestamp = now_ts_us
sensor_msg.header.send_timestamp = now_ts_us
image_msg = sensor_msg.image_msg
image_msg.camera_num = -1
image_msg.height = 151
image_msg.width = 151
image_msg.depth = 1
image_msg.image_data = img_bytes
image_msg.timestamp_us = now_ts_us
image_msg_bytes_str = sensor_msg.SerializeToString()
ImagePublisher.zmq_MapEditorErrorsReporter_socket.send(image_msg_bytes_str)
print(f"published {image_path}")
except FileNotFoundError:
print("Image file not found:", image_path)
except Exception as e:
print("Error converting image to bytes:", e)
# SigInt handler for exiting
def signal_handler(sig, frame):
print('SIGINT received. Exiting...')
sys.exit(0)
# Global scheduler object
msg_pub_interval_sec = 2.0
time_scheduler = sched.scheduler(time.time, time.sleep)
last_msg_read_time = time.time()
# Repeated running using a timer
def repeatedly_run(_scheduler, image_path):
global last_msg_read_time
currtime = time.time()
# print 'running ', math.floor(currtime-starttime)
if (last_msg_read_time + msg_pub_interval_sec <= currtime):
# print ' monitoring processes ', math.floor(currtime-starttime)
ImagePublisher.transmit_image(image_path)
last_monitor_processes_time = currtime
# reenter
time_scheduler.enter(msg_pub_interval_sec, 1, repeatedly_run, (_scheduler,image_path))
if __name__ == "__main__":
parser=argparse.ArgumentParser()
parser.add_argument("--image", help="protoc filename to parse", type=str, default='')
args=parser.parse_args()
signal.signal(signal.SIGINT, signal_handler)
if (args.image == ''):
print(f"ERROR: Need an image to publish. Provide with --image=image/file/path")
sys.exit(1)
time_scheduler.enter(msg_pub_interval_sec, 1, repeatedly_run, (time_scheduler,args.image))
time_scheduler.run()
sys.exit(0)