-
Notifications
You must be signed in to change notification settings - Fork 0
/
wss.py
87 lines (71 loc) · 2.54 KB
/
wss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# bin/python
import websockets, websocket, asyncio
import _thread
import time
import rel
import json
import datetime
import re
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
CONNECTIONS = set
CNX_AUTH = None
def on_message(ws, messages):
messages = json.loads(messages)
for msg in messages:
if "sym" in msg.keys() and "AAPL" in msg["sym"]:
date = datetime.datetime.fromtimestamp(msg['s'] / 1000).strftime('%Y-%m-%d %H:%M:%S')
matches = re.match('O:(?P<ticker>[A-Z]{3,4})(?P<date>[0-9]+)(?P<type>[C|P])(?P<dollars>[0-9]{5})(?P<cents>[0-9]{3})', msg["sym"])
if not matches: print(msg['sym'])
msg = {
"ticker": matches.group('ticker'),
"date": matches.group('date'),
"type": matches.group('type'),
"price": matches.group('dollars') + '.' + matches.group('cents'),
"volume" : msg['v']
}
print(msg)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
async def on_open(ws: websocket):
print("Opened connection ")
print(ws)
CONNECTIONS.add(ws)
try:
await websocket.wait_closed()
finally:
CONNECTIONS.remove(websocket)
ws.send(json.dumps({"action":"auth","params":"DzfIoW0e36xu0R_B48NxMO4t856DF00V"}))
ws.send(json.dumps({"action":"subscribe", "params":"A.*"}))
def listen():
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://delayed.polygon.io/options",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever(dispatcher=rel) # Set dispatcher to automatic reconnection
rel.signal(2, rel.abort) # Keyboard Interrupt
rel.dispatch()
async def do_auth(ws):
await ws.send(json.dumps({"action":"auth","params":"DzfIoW0e36xu0R_B48NxMO4t856DF00V"}))
async def async_listen():
async with websockets.connect('wss://delayed.polygon.io/options') as ws:
await ws.send(json.dumps({"action":"auth","params":"DzfIoW0e36xu0R_B48NxMO4t856DF00V"}))
await ws.send(json.dumps({"action":"subscribe", "params":"A.*"}))
while True:
try:
messages = await ws.recv()
on_message(ws, messages)
except websockets.exceptions.ConnectionClosed:
print('ConnectionClosed')
is_alive = False
break
def main():
listen()
if __name__ == "__main__":
asyncio.run(async_listen())