-
Notifications
You must be signed in to change notification settings - Fork 1
/
denylist.plugin
85 lines (75 loc) · 2.99 KB
/
denylist.plugin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import print_function
import logging
import empire.server.common.helpers as helpers
from empire.server.common.plugins import Plugin
from empire.server.core.agent_task_service import AgentTaskService
from empire.server.core.db import models
from empire.server.core.hooks import hooks
from empire.server.core.plugin_service import PluginService
log = logging.getLogger(__name__)
"""
The purpose of this plugin is to block certain IP addresses from connecting to the server.
It is to showcase the event-driven nature of the hook system.
"""
class Plugin(Plugin):
def onLoad(self):
self.info = {
'Name': 'denylist',
"Authors": [
{
"Name": "Vincent Rose",
"Handle": "@Vinnybod",
"Link": "https://twitter.com/_vinnybod",
},
],
'Description': """
The purpose of this plugin is to block certain IP addresses from connecting to the server.
It is to showcase the event-driven nature of the hook system.
""",
'Software': '',
'Techniques': [],
'Comments': []
}
self.options = {
'Addresses': {
'Description': 'List of IP addresses to block. Comma Separated.',
'Required': False,
'Value': '127.0.0.1',
},
}
def execute(self, command):
try:
self.ip = command['Addresses']
return f'[*] Set IP Addresses to {self.ip}'
except Exception as e:
log.error(e)
self.plugin_service.plugin_socketio_message(self.info["Name"], f"[!] {e}")
return False
def register(self, mainMenu):
"""
Register hooks for the plugin
"""
self.installPath = mainMenu.installPath
self.main_menu = mainMenu
self.plugin_service: PluginService = mainMenu.pluginsv2
self.agent_task_service: AgentTaskService = mainMenu.agenttasksv2
hooks.register_hook(hooks.AFTER_AGENT_CHECKIN_HOOK, "denylist", self.run_after_agent_checkin)
def run_after_agent_checkin(self, Session, agent: models.Agent):
"""
Check if agent is in the list of addresses to block. If it is, kill the agent.
"""
if not agent.internal_ip or not self.ip:
return
agent_ips = agent.internal_ip.split(',')
deny_list = self.ip.split(',')
for ip in agent_ips:
if ip in deny_list:
self.plugin_service.plugin_socketio_message(self.info['Name'], f'[!] Killing agent {agent.session_id} due to blocked ip: {ip}')
with Session as db:
self.agent_task_service.create_task_exit(db, agent, current_user_id=1)
def shutdown(self):
"""
Kills additional processes that were spawned
"""
# If the plugin spawns a process provide a shutdown method for when Empire exits else leave it as pass
pass