-
Notifications
You must be signed in to change notification settings - Fork 34
/
main.py
137 lines (115 loc) · 6.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import asyncio, websockets, sys, click, http.client, time
from prompt_toolkit import print_formatted_text, HTML
from websockets.exceptions import ConnectionClosedError
from utils.helper_display import HelperDisplay
USERS = {}
sepr = chr(969696)
helper_display = HelperDisplay()
def obtntime():
timestmp = time.localtime()
timehour = str(timestmp.tm_hour)
timemint = str(timestmp.tm_min)
timesecs = str(timestmp.tm_sec)
if int(timehour) < 10: timehour = "0" + timehour
if int(timemint) < 10: timemint = "0" + timemint
if int(timesecs) < 10: timesecs = "0" + timesecs
return timehour + ":" + timemint + ":" + timesecs
def ipaddress(v):
url = "api64.ipify.org"
if v == 4:
url = "api.ipify.org"
elif v == 6:
url = "api6.ipify.org"
try:
connection = http.client.HTTPSConnection(url)
connection.request("GET", "/")
response = connection.getresponse()
return response.read().decode("UTF-8")
except:
return "Error getting IP address."
def getallus(chatroom):
userlist = []
for indx in USERS:
if USERS[indx]!="" and chatroom == USERS[indx][1]:
userlist.append(USERS[indx][0])
return userlist
async def notify_mesej(message):
if USERS:
for user in USERS:
await user.send(message)
def chk_username_presence(mesg_json):
new_name = mesg_json.split(sepr)[1]
chatroom_id = mesg_json.split(sepr)[2]
if new_name in getallus(chatroom_id):
return True
else:
return False
async def send_chatroommembers_list(websoc):
chatroom_id = USERS[websoc][1]
users_list = "SNCTRYZERO" + sepr + "USERSLIST" + sepr + str(getallus(chatroom_id)) + sepr + chatroom_id
await websoc.send(users_list)
async def chatroom(websocket, path):
if not websocket in USERS:
USERS[websocket] = ""
try:
async for mesgjson in websocket:
if sepr in mesgjson and websocket in USERS:
if (mesgjson.split(sepr)[0] == "CHKUSR") & (len(mesgjson.split(sepr)) == 3) :
result = str(chk_username_presence(mesgjson))
await websocket.send(result)
if(result == "True"):
await websocket.close()
USERS.pop(websocket)
elif USERS[websocket] == "":
USERS[websocket] = [mesgjson.split(sepr)[0], mesgjson.split(sepr)[1]]
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>USERJOINED</b> > <green>" + mesgjson.split(sepr)[0] + "@" + mesgjson.split(sepr)[1] + "</green>"))
await notify_mesej("SNCTRYZERO" + sepr + "USERJOINED" + sepr + mesgjson.split(sepr)[0] + sepr + mesgjson.split(sepr)[1] + sepr + str(getallus(mesgjson.split(sepr)[1])))
else:
if str(mesgjson) == "/list":
await send_chatroommembers_list(websocket)
else:
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > " + helper_display.wrap_text(str(mesgjson))))
await notify_mesej(mesgjson)
except ConnectionClosedError as EXPT:
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>USEREXITED</b> > <red>" + USERS[websocket][0] + "@" + USERS[websocket][1] + "</red>"))
userlist = getallus(USERS[websocket][1])
userlist.remove(USERS[websocket][0])
leftmesg = "SNCTRYZERO" + sepr + "USEREXITED" + sepr + USERS[websocket][0] + sepr + USERS[websocket][1] + sepr + str(userlist)
USERS.pop(websocket)
await notify_mesej(leftmesg)
def servenow(netpdata="127.0.0.1", chatport="9696"):
try:
start_server = websockets.serve(chatroom, netpdata, int(chatport))
asyncio.get_event_loop().run_until_complete(start_server)
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green>SNCTRYZERO server was started up on 'ws://" + str(netpdata) + ":" + str(chatport) + "/'</green>"))
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print("")
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <red><b>SNCTRYZERO server was shut down</b></red>"))
sys.exit()
@click.command()
@click.option("-c", "--chatport", "chatport", help="Set the port value for the server [0-65536]", required=True)
@click.option("-6", "--ipprotv6", "netprotc", flag_value="ipprotv6", help="Start the server on an IPv6 address", required=True)
@click.option("-4", "--ipprotv4", "netprotc", flag_value="ipprotv4", help="Start the server on an IPv4 address", required=True)
@click.version_option(version="30102020", prog_name="SNCTRYZERO Server")
def mainfunc(chatport, netprotc):
try:
click.clear()
print_formatted_text("\n")
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green><b>Starting SNCTRYZERO server v30102020...</b></green>" + "\n" + \
"[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > Know more about the project at https://github.com/t0xic0der/sanctuary-zero/wiki" + "\n" + \
"[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > Find folks we're thankful to at https://github.com/t0xic0der/sanctuary-zero/graphs/contributors"))
netpdata = ""
if netprotc == "ipprotv6":
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green>IP version : 6</green>"))
netpdata = "::"
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green>IP address : " + ipaddress(6) + "</green>"))
elif netprotc == "ipprotv4":
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green>IP version : 4</green>"))
netpdata = "0.0.0.0"
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <green>IP address : " + ipaddress(4) + "</green>"))
servenow(netpdata, chatport)
except OSError:
print_formatted_text(HTML("[" + obtntime() + "] " + "<b>SNCTRYZERO</b> > <red><b>The server could not be started up</b></red>"))
if __name__ == "__main__":
mainfunc()