-
Notifications
You must be signed in to change notification settings - Fork 1
/
discovery_server.py
69 lines (33 loc) · 1.38 KB
/
discovery_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from util import get_private_ip
import socket
from json import loads
from multiprocessing import Process
class Server():
def __init__(self) -> None:
"""
a simple server responding to udb ping on port 21988
and sending ping requests to discover others copypasta instances on the local network
"""
self.port = 21987
self.sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
def response_loop(self,PORT:int,sock:socket.socket):
msg = ("[PONG FLAG]{ip_addr:\""+get_private_ip() +"\"}").encode("utf-8")
PORT = 21988
sock.bind(('',21988))
while True:
if sock.recv(1024) == b"ping":
sock.sendto(msg, ("255.255.255.255", PORT))
def start(self):
Process(target=self.response_loop,args=(self.port,self.sock)).start()
def discover_instances(self) -> set:
"""
returns a set of tuples (ip_addr,hostname)
that corresponds to all copypasta instances running on the local network
"""
ret = set()
self.sock.sendto(b"ping", ("255.255.255.255", self.port))
resp = self.sock.recv(1024).decode("utf-8")
if resp.startswith("[PONG FLAG]"):
resp = loads(resp.replace("[PONG_FLAG]",""))
ret.add((resp["ip_addr"],socket.gethostbyaddr(resp["ip_addr"])))
return ret