Skip to content

Commit

Permalink
Fixes darklock whitelist
Browse files Browse the repository at this point in the history
  • Loading branch information
w4ffl35 committed May 9, 2024
1 parent af4f6da commit 5f29cbe
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="darklock",
version="0.1.3",
version="0.1.4",
author="Capsize LLC",
description="",
long_description=open("README.md", "r", encoding="utf-8").read(),
Expand Down
2 changes: 2 additions & 0 deletions src/darklock/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def activate(
whitelisted_filenames: list = None,
whitelisted_imports: list = None,
blacklisted_filenames: list = None,
whitelisted_directories: list = None,
):
# network.activate(
# allowed_port=4222
Expand All @@ -19,6 +20,7 @@ def activate(
whitelisted_filenames,
whitelisted_imports,
blacklisted_filenames,
whitelisted_directories,
)

def deactivate():
Expand Down
65 changes: 49 additions & 16 deletions src/darklock/restrict_os_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import traceback
import logging


class RestrictOSAccess(metaclass=Singleton):
Expand All @@ -19,52 +20,76 @@ def __init__(self):
self.whitelisted_filenames = []
self.whitelisted_imports = []
self.blacklisted_filenames = []
self.whitelisted_directories = []

self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(logging.StreamHandler())

def is_directory_whitelisted(self, directory: str) -> bool:
print("IS DIRECTORY WHITELISTED")
print(directory)
print(self.whitelisted_directories)
return directory in self.whitelisted_directories

def restrict_os_write(self, *args, **kwargs):
return self.original_os_write(*args, **kwargs)

def restricted_open(self, *args, **kwargs):
if ('open', args[0]) in self.whitelisted_operations or self.check_stack_trace():
if (
self.is_directory_whitelisted(args[0]) or
'open' in self.whitelisted_operations or self.check_stack_trace()
):
return self.original_open(*args, **kwargs)
raise PermissionError("File system operations are not allowed")
self.logger.error("File system operations are not allowed")

def restricted_os_makedirs(self, *args, **kwargs):
if ('makedirs', args[0]) in self.whitelisted_operations or self.check_stack_trace():
if (
self.is_directory_whitelisted(args[0]) or
'makedirs' in self.whitelisted_operations or self.check_stack_trace()
):
return self.original_makedirs(*args, **kwargs)
#raise PermissionError("File system operations are not allowed")

def restricted_open(self, *args, **kwargs):
if ('open', args[0]) in self.whitelisted_operations or self.check_stack_trace():
return self.original_open(*args, **kwargs)
raise PermissionError("File system operations are not allowed")
self.logger.error(f"File system operations are not allowed. Attempted to create directory: {args}")

def restricted_exec(self, *args, **kwargs):
raise PermissionError("System calls are not allowed")
if (
self.is_directory_whitelisted(args[0]) or
'exec' in self.whitelisted_operations or self.check_stack_trace()
):
return os.exec(*args, **kwargs)
self.logger.error("System calls are not allowed")

def restricted_subprocess(self, *args, **kwargs):
raise PermissionError("Subprocess invocations are not allowed")
if (
self.is_directory_whitelisted(args[0]) or
'subprocess' in self.whitelisted_operations or self.check_stack_trace()
):
return os.system(*args, **kwargs)
self.logger.error("Subprocess invocations are not allowed")

def restricted_import(self, name, *args, **kwargs):
if 'import' in self.whitelisted_operations or self.check_stack_trace():
return self.original_import(name, *args, **kwargs)
if any(re.search(pattern, name) for pattern in self.whitelisted_imports):
return self.original_import(name, *args, **kwargs)
print(f"Failed to import: {name}")
raise PermissionError("Importing modules is not allowed")
self.logger.error("Importing modules is not allowed")

def log_imports(self, name, *args, **kwargs):
#self.logging_importer = LoggingImporter()
sys.meta_path.insert(0, self.logging_importer)

def restricted_module(self, *args, **kwargs):
raise PermissionError("Module operations are not allowed")
self.logger.error("Module operations are not allowed")

def restricted_os(self, *args, **kwargs):
raise PermissionError("OS operations are not allowed")
self.logger.error("OS operations are not allowed")

def restricted_sys(self, *args, **kwargs):
raise PermissionError("System operations are not allowed")
self.logger.error("System operations are not allowed")

def restricted_socket(self, *args, **kwargs):
raise PermissionError("Socket operations are not allowed")
self.logger.error("Socket operations are not allowed")

def check_stack_trace(self) -> bool:
stack_trace = traceback.extract_stack()
Expand All @@ -90,6 +115,7 @@ def activate(
whitelisted_filenames: list = None,
whitelisted_imports: list = None,
blacklisted_filenames: list = None,
whitelisted_directories: list = None,
):
"""
Install restrictions on OS access.
Expand All @@ -100,6 +126,13 @@ def activate(
self.whitelisted_imports = whitelisted_imports or []
self.blacklisted_filenames = blacklisted_filenames or []

whitelisted_directories = whitelisted_directories or []
parsed_directories = []
for directory in whitelisted_directories:
parsed_directories.append(os.path.expanduser(directory))
self.whitelisted_directories = parsed_directories


self.original_open = builtins.open
self.original_import = builtins.__import__
self.original_os_write = os.write
Expand Down

0 comments on commit 5f29cbe

Please sign in to comment.