-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_example.py
55 lines (43 loc) · 1.43 KB
/
log_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from contextlib import contextmanager
import psycopg2
import psycopg2.extensions
import select
message_log_connection = psycopg2.connect("dbname=postgres user=samba")
# prod code
def main():
read_messages_forever(
log_name="orderdata",
connection=message_log_connection,
per_message_callback=print_message)
def print_message(received_message):
print("Received: " + received_message)
# library code
def read_messages_forever(log_name, connection, per_message_callback):
@contextmanager
def transaction(conn,):
cursor = conn.cursor()
try:
yield cursor
conn.commit()
except:
conn.rollback()
raise
finally:
cursor.close()
with transaction(connection) as cursor:
cursor.execute(f"LISTEN log_{log_name};")
while True:
# read as long as there are DB entries in the queue
with transaction(connection) as cursor:
cursor.execute(f"SELECT read_log_entry('{log_name}')")
entry = cursor.fetchone()
if entry:
per_message_callback(entry[0])
continue
# block until event is received or timeout happens (100 years)
select.select([connection],[],[], 3153600000) == ([],[],[])
connection.poll()
# "eat" all notifications
connection.notifies[:] = []
if __name__ == "__main__":
main()