diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index 3ffba05..241bfde 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -4,10 +4,17 @@ # Author : Podalirius (@podalirius_) # Date created : 20 may 2024 - +from __future__ import annotations import ntpath import os import shlex +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Optional + from smbclientng.core.SMBSession import SMBSession + from smbclientng.core.Config import Config + from smbclientng.core.Logger import Logger class CommandCompleter(object): @@ -334,8 +341,12 @@ class CommandCompleter(object): "autocomplete": ["share"] }, } + + smbSession: SMBSession + config: Config + logger: Logger - def __init__(self, smbSession, config, logger): + def __init__(self, smbSession: SMBSession, config: Config, logger: Logger): # Objects self.smbSession = smbSession self.config = config @@ -344,7 +355,7 @@ def __init__(self, smbSession, config, logger): self.commands["help"]["subcommands"] = ["format"] + list(self.commands.keys()) self.commands["help"]["subcommands"].remove("help") - def complete(self, text, state): + def complete(self, text: str, state: int) -> str: """ Function to handle command completion in the LDAP console. @@ -497,7 +508,7 @@ def complete(self, text, state): except IndexError: return None - def print_help(self, command=None): + def print_help(self, command: Optional[str] = None): """ Prints help information for a specific command or all commands if no command is specified. diff --git a/smbclientng/core/Config.py b/smbclientng/core/Config.py index b4a0004..36ed036 100644 --- a/smbclientng/core/Config.py +++ b/smbclientng/core/Config.py @@ -23,10 +23,12 @@ class Config(object): no_colors: Property to get or set the colored output preference. """ - not_interactive = False + not_interactive: bool = False startup_script = None + _debug: bool + _no_colors: bool - def __init__(self, debug=False, no_colors=None): + def __init__(self, debug: bool = False, no_colors=None): self._debug = debug if no_colors is not None: diff --git a/smbclientng/core/Credentials.py b/smbclientng/core/Credentials.py index 7a7f1e9..859a85e 100644 --- a/smbclientng/core/Credentials.py +++ b/smbclientng/core/Credentials.py @@ -4,11 +4,14 @@ # Author : Podalirius (@podalirius_) # Date created : 22 June 2024 - +from __future__ import annotations from smbclientng.core.utils import parse_lm_nt_hashes import re import binascii - +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Optional class Credentials(object): """ @@ -16,37 +19,35 @@ class Credentials(object): """ # Identity - domain = None - username = None - password = None + domain: Optional[str] + username: Optional[str] + password: Optional[str] # Hashes - nt_hex = "" - nt_raw = "" - lm_hex = "" - lm_raw = "" + nt_hex: str + nt_raw: bytes + lm_hex: str + lm_raw: bytes # Kerberos - use_kerberos = False - aesKey = None - kdcHost = None + use_kerberos: bool = False + aesKey: Optional[str] + kdcHost: Optional[str] - def __init__(self, domain, username, password, hashes=None, use_kerberos=False, aesKey=None, kdcHost=None): + def __init__(self, domain: str, username: str, password: str, hashes: Optional[str] = None, use_kerberos: bool = False, aesKey: Optional[str] = None, kdcHost: Optional[str] = None): super(Credentials, self).__init__() # Identity self.domain = domain self.username = username self.password = password + # Hashes - self.nt_hex = "" - self.nt_raw = "" - self.lm_hex = "" - self.lm_raw = "" self.set_hashes(hashes=hashes) + # Kerberos self.use_kerberos = use_kerberos self.kdcHost = kdcHost self.aesKey = aesKey - def set_hashes(self, hashes): + def set_hashes(self, hashes: Optional[str]): """ Sets the LM and NT hashes for the credentials. @@ -60,9 +61,9 @@ def set_hashes(self, hashes): """ self.nt_hex = "" - self.nt_raw = "" + self.nt_raw = b"" self.lm_hex = "" - self.lm_raw = "" + self.lm_raw = b"" lmhash, nthash = None, None if hashes is not None: diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 3958601..4108d85 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -4,10 +4,12 @@ # Author : Podalirius (@podalirius_) # Date created : 23 may 2024 - +from __future__ import annotations import charset_normalizer import datetime -import impacket +from impacket.smb3structs import * +from impacket.smbconnection import SessionError as SMBConnectionSessionError +from impacket.smb3 import SessionError as SMB3SessionError from importlib import import_module import ntpath import os @@ -21,7 +23,12 @@ from rich.syntax import Syntax from smbclientng.core.CommandCompleter import CommandCompleter from smbclientng.core.utils import b_filesize, unix_permissions, windows_ls_entry, local_tree, resolve_local_files, resolve_remote_files, smb_entry_iterator +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from smbclientng.core.SessionsManager import SessionsManager + from smbclientng.core.Config import Config + from smbclientng.core.Logger import Logger ## Decorators @@ -38,7 +45,11 @@ def wrapper(*args, **kwargs): def active_smb_connection_needed(func): def wrapper(*args, **kwargs): self, arguments,command = args[0], args[1], args[2] - # + + if self.sessionsManager.current_session is None: + self.logger.error("SMB Session is disconnected.") + return None + self.sessionsManager.current_session.ping_smb_session() if self.sessionsManager.current_session.connected: return func(*args, **kwargs) @@ -80,8 +91,12 @@ class InteractiveShell(object): running = True modules = {} + sessionsManager: SessionsManager + config: Config + logger: Logger + commandCompleterObject: CommandCompleter - def __init__(self, sessionsManager, config, logger): + def __init__(self, sessionsManager: SessionsManager, config: Config, logger: Logger): # Objects self.sessionsManager = sessionsManager self.config = config @@ -138,7 +153,7 @@ def run(self): traceback.print_exc() self.logger.error(str(err)) - def process_line(self, commandLine): + def process_line(self, commandLine: str): # Split and parse the commandLine tokens = shlex.split(commandLine) if len(tokens) == 0: @@ -303,17 +318,18 @@ def process_line(self, commandLine): # Commands ================================================================ - def command_debug(self, arguments, command): + def command_debug(self, arguments: list[str], command: str): try: self.logger.print("[debug] command = '%s'" % command) self.logger.print("[debug] arguments = %s" % arguments) except Exception as e: traceback.print_exc() + @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_bat(self, arguments, command): + def command_bat(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -343,26 +359,26 @@ def command_bat(self, arguments, command): Console().print(syntax) else: self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_cd(self, arguments, command): + def command_cd(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes try: self.sessionsManager.current_session.set_cwd(path=arguments[0]) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_cat(self, arguments, command): + def command_cat(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -384,10 +400,10 @@ def command_cat(self, arguments, command): self.logger.print(filecontent) else: self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) - def command_close(self, arguments, command): + def command_close(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -396,7 +412,7 @@ def command_close(self, arguments, command): if self.sessionsManager.current_session.connected: self.sessionsManager.current_session.close_smb_session() - def command_find(self, arguments, command): + def command_find(self, arguments: list[str], command: str): module_name = "find" if module_name in self.modules.keys(): @@ -409,7 +425,7 @@ def command_find(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_get(self, arguments, command): + def command_get(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -441,12 +457,12 @@ def command_get(self, arguments, command): keepRemotePath=keep_remote_path, is_recursive=is_recursive ) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: if self.config.debug: traceback.print_exc() self.logger.error("[!] SMB Error: %s" % e) - def command_help(self, arguments, command): + def command_help(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -457,7 +473,7 @@ def command_help(self, arguments, command): self.commandCompleterObject.print_help(command=None) @active_smb_connection_needed - def command_info(self, arguments, command): + def command_info(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : Yes # SMB share needed : No @@ -479,11 +495,11 @@ def command_info(self, arguments, command): share=print_share_info, server=print_server_info ) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("SMB Error: %s" % e) @command_arguments_required - def command_lbat(self, arguments, command): + def command_lbat(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -517,11 +533,11 @@ def command_lbat(self, arguments, command): self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) else: self.logger.error("[!] Local file '%s' does not exist." % path_to_file) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required - def command_lcat(self, arguments, command): + def command_lcat(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -547,11 +563,11 @@ def command_lcat(self, arguments, command): self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) else: self.logger.error("[!] Local file '%s' does not exist." % path_to_file) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required - def command_lcd(self, arguments, command): + def command_lcd(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -567,7 +583,7 @@ def command_lcd(self, arguments, command): self.logger.error("Directory '%s' does not exists." % path) @command_arguments_required - def command_lcp(self, arguments, command): + def command_lcp(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -585,7 +601,7 @@ def command_lcp(self, arguments, command): else: self.commandCompleterObject.print_help(command=command) - def command_lls(self, arguments, command): + def command_lls(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -631,7 +647,7 @@ def command_lls(self, arguments, command): self.logger.print() @command_arguments_required - def command_lmkdir(self, arguments, command): + def command_lmkdir(self,arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -648,7 +664,7 @@ def command_lmkdir(self, arguments, command): if not os.path.exists(tmp_path): os.mkdir(path=tmp_path) - def command_lpwd(self, arguments, command): + def command_lpwd(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -656,7 +672,7 @@ def command_lpwd(self, arguments, command): self.logger.print(os.getcwd()) @command_arguments_required - def command_lrename(self, arguments, command): + def command_lrename(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -667,7 +683,7 @@ def command_lrename(self, arguments, command): self.commandCompleterObject.print_help(command=command) @command_arguments_required - def command_lrm(self, arguments, command): + def command_lrm(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -686,7 +702,7 @@ def command_lrm(self, arguments, command): self.logger.error("Path '%s' does not exist." % path) @command_arguments_required - def command_lrmdir(self, arguments, command): + def command_lrmdir(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No @@ -707,7 +723,7 @@ def command_lrmdir(self, arguments, command): else: self.logger.error("Path '%s' does not exist." % path) - def command_ltree(self, arguments, command): + def command_ltree(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -724,7 +740,7 @@ def command_ltree(self, arguments, command): @active_smb_connection_needed @smb_share_is_set - def command_ls(self, arguments, command): + def command_ls(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : Yes # SMB share needed : Yes @@ -757,7 +773,7 @@ def command_ls(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_mkdir(self, arguments, command): + def command_mkdir(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -765,7 +781,7 @@ def command_mkdir(self, arguments, command): self.sessionsManager.current_session.mkdir(path=arguments[0]) @command_arguments_required - def command_module(self, arguments, command): + def command_module(self, arguments: list[str], command: str): module_name = arguments[0] if module_name in self.modules.keys(): @@ -778,7 +794,7 @@ def command_module(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_mount(self, arguments, command): + def command_mount(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -794,7 +810,7 @@ def command_mount(self, arguments, command): try: self.sessionsManager.current_session.mount(local_mount_point, remote_path) - except (impacket.smbconnection.SessionError, impacket.smb3.SessionError) as e: + except (SMBConnectionSessionError, SMB3SessionError) as e: self.sessionsManager.current_session.umount(local_mount_point) else: self.commandCompleterObject.print_help(command=command) @@ -802,7 +818,7 @@ def command_mount(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_put(self, arguments, command): + def command_put(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -829,10 +845,10 @@ def command_put(self, arguments, command): else: # Put this single file self.sessionsManager.current_session.put_file(localpath=localpath) - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error("[!] SMB Error: %s" % e) - def command_reconnect(self, arguments, command): + def command_reconnect(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -844,7 +860,7 @@ def command_reconnect(self, arguments, command): else: self.sessionsManager.current_session.init_smb_session() - def command_reset(self, arguments, command): + def command_reset(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : No # SMB share needed : No @@ -856,7 +872,7 @@ def command_reset(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_rm(self, arguments, command): + def command_rm(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -888,7 +904,7 @@ def command_rm(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_rmdir(self, arguments, command): + def command_rmdir(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -907,7 +923,7 @@ def command_rmdir(self, arguments, command): @active_smb_connection_needed @smb_share_is_set - def command_sizeof(self, arguments, command): + def command_sizeof(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -952,6 +968,7 @@ def command_sizeof(self, arguments, command): else: path_display = f"\x1b[1;96m{path}\x1b[0m" + size_str = "" for entry, fullpath, depth, is_last_entry in generator: if not entry.is_directory(): path_size += entry.get_filesize() @@ -966,12 +983,10 @@ def command_sizeof(self, arguments, command): print(f"{size_str}\t{path_display}") total_size += path_size - except impacket.smbconnection.SessionError as e: + except SMBConnectionSessionError as e: self.logger.error(f"Failed to access '{path}': {e}") except (BrokenPipeError, KeyboardInterrupt): self.logger.error("Interrupted.") - self.close_smb_session() - self.init_smb_session() return except Exception as e: self.logger.error(f"Error while processing '{path}': {e}") @@ -982,7 +997,7 @@ def command_sizeof(self, arguments, command): self.logger.print(f"Total size: {b_filesize(total_size)}") @active_smb_connection_needed - def command_shares(self, arguments, command): + def command_shares(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : Yes # SMB share needed : No @@ -1055,7 +1070,7 @@ def command_shares(self, arguments, command): @active_smb_connection_needed @smb_share_is_set - def command_tree(self, arguments, command): + def command_tree(self, arguments: list[str], command: str): # Command arguments required : No # Active SMB connection needed : Yes # SMB share needed : Yes @@ -1068,7 +1083,7 @@ def command_tree(self, arguments, command): @command_arguments_required @active_smb_connection_needed @smb_share_is_set - def command_umount(self, arguments, command): + def command_umount(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes @@ -1081,7 +1096,7 @@ def command_umount(self, arguments, command): @command_arguments_required @active_smb_connection_needed - def command_use(self, arguments, command): + def command_use(self, arguments: list[str], command: str): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : No diff --git a/smbclientng/core/LocalFileIO.py b/smbclientng/core/LocalFileIO.py index 0500168..e42b71f 100644 --- a/smbclientng/core/LocalFileIO.py +++ b/smbclientng/core/LocalFileIO.py @@ -4,11 +4,15 @@ # Author : Podalirius (@podalirius_) # Date created : 23 may 2024 - +from __future__ import annotations import os import ntpath from rich.progress import BarColumn, DownloadColumn, Progress, TextColumn, TimeRemainingColumn, TransferSpeedColumn +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from smbclientng.core.Logger import Logger + from typing import Optional class LocalFileIO(object): """ @@ -27,7 +31,7 @@ class LocalFileIO(object): read(self, size): Reads data from the file up to the specified size and updates the progress bar if expected size is provided. """ - def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, logger=None): + def __init__(self, mode: str, path: str, logger: Logger, expected_size: Optional[int] = None, keepRemotePath: bool = False): super(LocalFileIO, self).__init__() self.logger = logger self.mode = mode @@ -94,7 +98,7 @@ def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, lo visible=True ) - def write(self, data): + def write(self, data: bytes): """ Writes data to the file. @@ -114,7 +118,7 @@ def write(self, data): else: return 0 - def read(self, size): + def read(self, size: int): """ Reads a specified amount of data from the file. @@ -135,7 +139,7 @@ def read(self, size): else: return b"" - def close(self, remove=False): + def close(self, remove: bool = False): """ Closes the file descriptor and optionally removes the file. @@ -152,7 +156,7 @@ def close(self, remove=False): if remove: try: os.remove(path=self.path) - except (PermissionError, FileNotFoundError) as err: + except (PermissionError, FileNotFoundError): pass if self.expected_size is not None: @@ -160,7 +164,7 @@ def close(self, remove=False): del self - def set_error(self, message): + def set_error(self, message: str): """ Sets an error message in the progress bar's description and modifies the progress bar to show only essential columns. @@ -173,10 +177,10 @@ def set_error(self, message): """ self.__progress.tasks[0].description = message - self.__progress.columns = [ + self.__progress.columns = ( TextColumn("[bold blue]{task.description}", justify="right"), BarColumn(bar_width=None), "•", DownloadColumn(), - ] + ) self.__progress.update(self.__task, advance=0) diff --git a/smbclientng/core/Logger.py b/smbclientng/core/Logger.py index 30115cf..3c0ec84 100644 --- a/smbclientng/core/Logger.py +++ b/smbclientng/core/Logger.py @@ -4,10 +4,15 @@ # Author : Podalirius (@podalirius_) # Date created : 24 June 2024 - +from __future__ import annotations import os import re from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Optional + from smbclientng.core.Config import Config class LogLevel(Enum): @@ -35,8 +40,10 @@ class Logger(object): debug(message): Logs a message at the DEBUG level if debugging is enabled. error(message): Logs a message at the ERROR level. """ + config: Config + logfile: Optional[str] - def __init__(self, config, logfile=None): + def __init__(self, config: Config, logfile: Optional[str] = None): super(Logger, self).__init__() self.config = config self.logfile = logfile @@ -50,7 +57,7 @@ def __init__(self, config, logfile=None): open(self.logfile, "w").close() self.debug("Writting logs to logfile: '%s'" % self.logfile) - def print(self, message="", end='\n'): + def print(self, message: str = "", end: str = '\n'): """ Prints a message to stdout and logs it to a file if logging is enabled. @@ -67,7 +74,7 @@ def print(self, message="", end='\n'): print(message, end=end) self.write_to_logfile(nocolor_message, end=end) - def info(self, message): + def info(self, message: str): """ Logs a message at the INFO level. @@ -84,7 +91,7 @@ def info(self, message): print("[\x1b[1;92minfo\x1b[0m] %s" % message) self.write_to_logfile("[info] %s" % nocolor_message) - def debug(self, message): + def debug(self, message: str): """ Logs a message at the DEBUG level if debugging is enabled. @@ -102,7 +109,7 @@ def debug(self, message): print("[debug] %s" % message) self.write_to_logfile("[debug] %s" % nocolor_message) - def error(self, message): + def error(self, message: str): """ Logs an error message to the console and the log file. @@ -119,7 +126,7 @@ def error(self, message): print("[\x1b[1;91merror\x1b[0m] %s" % message) self.write_to_logfile("[error] %s" % nocolor_message) - def write_to_logfile(self, message, end='\n'): + def write_to_logfile(self, message: str, end: str = '\n'): """ Writes the provided message to the log file specified during Logger instance initialization. diff --git a/smbclientng/core/Module.py b/smbclientng/core/Module.py index da97f25..c544f11 100644 --- a/smbclientng/core/Module.py +++ b/smbclientng/core/Module.py @@ -4,9 +4,15 @@ # Author : Podalirius (@podalirius_) # Date created : 23 may 2024 - +from __future__ import annotations +import argparse import shlex +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from smbclientng.core.SMBSession import SMBSession + from smbclientng.core.Logger import Logger + from smbclientng.core.Config import Config class Module(object): """ @@ -15,12 +21,12 @@ class Module(object): This class provides common attributes and methods that are shared among different modules. """ - name = "" - description = "" - smbSession = None - options = None + name: str = "" + description: str = "" + smbSession: SMBSession + options: argparse.Namespace - def __init__(self, smbSession, config, logger): + def __init__(self, smbSession: SMBSession, config: Config, logger: Logger): self.smbSession = smbSession self.config = config self.logger = logger @@ -36,7 +42,7 @@ def run(self): """ raise NotImplementedError("Subclasses must implement this method") - def processArguments(self, parser, arguments): + def processArguments(self, parser: argparse.ArgumentParser, arguments) -> argparse.Namespace: if type(arguments) == list: arguments = ' '.join(arguments) diff --git a/smbclientng/core/ModuleArgumentParser.py b/smbclientng/core/ModuleArgumentParser.py index 08c28a7..931a2a2 100644 --- a/smbclientng/core/ModuleArgumentParser.py +++ b/smbclientng/core/ModuleArgumentParser.py @@ -8,7 +8,12 @@ import argparse import sys +class ModuleArgumentParserError(Exception): + def __init__(self, message): + self.message = message + def __str__(self): + return self.message class ModuleArgumentParser(argparse.ArgumentParser): """ A custom argument parser for handling module-specific command-line arguments in the smbclientng application. @@ -24,14 +29,14 @@ class ModuleArgumentParser(argparse.ArgumentParser): Overrides the default error handling to provide a more informative error message and display the help text. """ - exit_on_error = False + exit_on_error: bool = False - def error(self, message): + def error(self, message: str): """ Overrides the default error handling of argparse.ArgumentParser to provide a custom error message and help display. - This method is called when ArgumentParser encounters an error. It writes the error message to stderr, - displays the help message, and then exits the program with a status code of 2. + This method is called when ArgumentParser encounters an error. It displays the help message and writes the error message + to stderr. Args: message (str): The error message to be displayed. @@ -39,4 +44,4 @@ def error(self, message): self.print_help() sys.stderr.write('\n[!] Error: %s\n' % message) - self.exit(2) + raise ModuleArgumentParserError(message) diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index db56a99..9ddfa37 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -4,9 +4,12 @@ # Author : Podalirius (@podalirius_) # Date created : 20 may 2024 - +from __future__ import annotations import io -import impacket.smbconnection +from typing import Optional +from impacket.smbconnection import SMBConnection, SessionError +from impacket.ldap import ldaptypes +from impacket.nt_errors import STATUS_OBJECT_NAME_COLLISION import ntpath import os import random @@ -15,7 +18,13 @@ import traceback from smbclientng.core.LocalFileIO import LocalFileIO from smbclientng.core.utils import b_filesize, STYPE_MASK, is_port_open, smb_entry_iterator +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from smbclientng.core.Logger import Logger + from smbclientng.core.Config import Config + from smbclientng.core.Credentials import Credentials + from impacket.smb import SharedFile class SMBSession(object): """ @@ -52,8 +61,25 @@ class SMBSession(object): read_file(path): Reads a file from the SMB share. test_rights(sharename): Tests read and write access rights on a share. """ + config: Config + logger: Logger + host: str + port: int + timeout: int + advertisedName: Optional[str] + + # Credentials + credentials: Credentials + + smbClient: Optional[SMBConnection] = None + connected: bool = False - def __init__(self, host, port, timeout, credentials, advertisedName=None, config=None, logger=None): + available_shares: dict[str,dict] = {} + smb_share: Optional[str] = None + smb_cwd: str = "" + smb_tree_id: Optional[int] = None + + def __init__(self, host: str, port: int, timeout: int, credentials: Credentials, advertisedName: Optional[str], config: Config, logger: Logger): super(SMBSession, self).__init__() # Objects self.config = config @@ -70,14 +96,6 @@ def __init__(self, host, port, timeout, credentials, advertisedName=None, config # Credentials self.credentials = credentials - self.smbClient = None - self.connected = False - - self.available_shares = {} - self.smb_share = None - self.smb_cwd = "" - self.smb_tree_id = None - self.list_shares() # Connect and disconnect SMB session @@ -103,7 +121,7 @@ def close_smb_session(self): else: raise Exception("SMB client is not initialized.") - def init_smb_session(self): + def init_smb_session(self) -> bool: """ Initializes and establishes a session with the SMB server. @@ -123,7 +141,7 @@ def init_smb_session(self): try: result, error = is_port_open(self.host, self.port, self.timeout) if result: - self.smbClient = impacket.smbconnection.SMBConnection( + self.smbClient = SMBConnection( remoteName=self.host, remoteHost=self.host, myName=self.advertisedName, @@ -134,86 +152,85 @@ def init_smb_session(self): else: self.logger.error(f"Could not connect to '{self.host}:{self.port}', {error}.") self.connected = False - self.smbClient = None + return False except OSError as err: if self.config.debug: traceback.print_exc() self.logger.error("Could not connect to '%s:%d': %s" % (self.host, int(self.port), err)) self.connected = False - self.smbClient = None + return False - if self.smbClient is not None: - if self.credentials.use_kerberos: - self.logger.debug("[>] Authenticating as '%s\\%s' with kerberos ... " % (self.credentials.domain, self.credentials.username)) + if self.credentials.use_kerberos: + self.logger.debug("[>] Authenticating as '%s\\%s' with kerberos ... " % (self.credentials.domain, self.credentials.username)) + try: + self.connected = self.smbClient.kerberosLogin( + user=self.credentials.username, + password=self.credentials.password, + domain=self.credentials.domain, + lmhash=self.credentials.lm_hex, + nthash=self.credentials.nt_hex, + aesKey=self.credentials.aesKey, + kdcHost=self.credentials.kdcHost + ) + except SessionError as err: + if self.config.debug: + traceback.print_exc() + self.logger.error("Could not login: %s" % err) + self.connected = False + + else: + if len(self.credentials.lm_hex) != 0 and len(self.credentials.nt_hex) != 0: + self.logger.debug("[>] Authenticating as '%s\\%s' with NTLM with pass the hash ... " % (self.credentials.domain, self.credentials.username)) try: - self.connected = self.smbClient.kerberosLogin( + self.logger.debug(" | user = %s" % self.credentials.username) + self.logger.debug(" | password = %s" % self.credentials.password) + self.logger.debug(" | domain = %s" % self.credentials.domain) + self.logger.debug(" | lmhash = %s" % self.credentials.lm_hex) + self.logger.debug(" | nthash = %s" % self.credentials.nt_hex) + + self.connected = self.smbClient.login( user=self.credentials.username, password=self.credentials.password, domain=self.credentials.domain, lmhash=self.credentials.lm_hex, - nthash=self.credentials.nt_hex, - aesKey=self.credentials.aesKey, - kdcHost=self.credentials.kdcHost + nthash=self.credentials.nt_hex ) - except impacket.smbconnection.SessionError as err: + except SessionError as err: if self.config.debug: traceback.print_exc() self.logger.error("Could not login: %s" % err) self.connected = False else: - if len(self.credentials.lm_hex) != 0 and len(self.credentials.nt_hex) != 0: - self.logger.debug("[>] Authenticating as '%s\\%s' with NTLM with pass the hash ... " % (self.credentials.domain, self.credentials.username)) - try: - self.logger.debug(" | user = %s" % self.credentials.username) - self.logger.debug(" | password = %s" % self.credentials.password) - self.logger.debug(" | domain = %s" % self.credentials.domain) - self.logger.debug(" | lmhash = %s" % self.credentials.lm_hex) - self.logger.debug(" | nthash = %s" % self.credentials.nt_hex) - - self.connected = self.smbClient.login( - user=self.credentials.username, - password=self.credentials.password, - domain=self.credentials.domain, - lmhash=self.credentials.lm_hex, - nthash=self.credentials.nt_hex - ) - except impacket.smbconnection.SessionError as err: - if self.config.debug: - traceback.print_exc() - self.logger.error("Could not login: %s" % err) - self.connected = False + self.logger.debug("[>] Authenticating as '%s\\%s' with NTLM with password ... " % (self.credentials.domain, self.credentials.username)) + try: + self.logger.debug(" | user = %s" % self.credentials.username) + self.logger.debug(" | password = %s" % self.credentials.password) + self.logger.debug(" | domain = %s" % self.credentials.domain) + self.logger.debug(" | lmhash = %s" % self.credentials.lm_hex) + self.logger.debug(" | nthash = %s" % self.credentials.nt_hex) - else: - self.logger.debug("[>] Authenticating as '%s\\%s' with NTLM with password ... " % (self.credentials.domain, self.credentials.username)) - try: - self.logger.debug(" | user = %s" % self.credentials.username) - self.logger.debug(" | password = %s" % self.credentials.password) - self.logger.debug(" | domain = %s" % self.credentials.domain) - self.logger.debug(" | lmhash = %s" % self.credentials.lm_hex) - self.logger.debug(" | nthash = %s" % self.credentials.nt_hex) - - self.connected = self.smbClient.login( - user=self.credentials.username, - password=self.credentials.password, - domain=self.credentials.domain, - lmhash=self.credentials.lm_hex, - nthash=self.credentials.nt_hex - ) - except impacket.smbconnection.SessionError as err: - if self.config.debug: - traceback.print_exc() - self.logger.error("Could not login: %s" % err) - self.connected = False + self.connected = self.smbClient.login( + user=self.credentials.username, + password=self.credentials.password, + domain=self.credentials.domain, + lmhash=self.credentials.lm_hex, + nthash=self.credentials.nt_hex + ) + except SessionError as err: + if self.config.debug: + traceback.print_exc() + self.logger.error("Could not login: %s" % err) + self.connected = False - if self.connected: - self.logger.print("[+] Successfully authenticated to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) - else: - self.logger.error("Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + if self.connected: + self.logger.print("[+] Successfully authenticated to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + else: + self.logger.error("Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) return self.connected - def ping_smb_session(self): + def ping_smb_session(self) -> bool: """ Tests the connectivity to the SMB server by sending an echo command. @@ -237,7 +254,7 @@ def ping_smb_session(self): return self.connected # Operations - def get_file(self, path=None, keepRemotePath=False, localDownloadDir="./", is_recursive=False): + def get_file(self, path: Optional[str] = None, keepRemotePath: bool = False, localDownloadDir: str ="./", is_recursive: bool = False): """ Retrieves files or directories from the specified path(s) on the SMB share. @@ -348,7 +365,7 @@ def get_file(self, path=None, keepRemotePath=False, localDownloadDir="./", is_re self.logger.error(f"Failed to download '{path}': {e}") - def download_file(self, full_path, outputfile, keepRemotePath): + def download_file(self, full_path: str, outputfile: str, keepRemotePath: bool): """Downloads a single file.""" try: # Get the file entry @@ -361,9 +378,9 @@ def download_file(self, full_path, outputfile, keepRemotePath): f = LocalFileIO( mode="wb", path=outputfile, + logger=self.logger, expected_size=entry.get_filesize(), keepRemotePath=keepRemotePath, - logger=self.logger ) try: self.smbClient.getFile( @@ -377,7 +394,7 @@ def download_file(self, full_path, outputfile, keepRemotePath): self.logger.error(f"Failed to download '{full_path}': {e}") - def get_file_recursively(self, path=None, localDownloadDir="./"): + def get_file_recursively(self, path: Optional[str] = None, localDownloadDir: str = "./"): """ Recursively retrieves files from a specified path on the SMB share. @@ -417,9 +434,9 @@ def recurse_action(base_dir="", path=[], localDownloadDir="./"): f = LocalFileIO( mode="wb", path=downloadToPath, + logger=self.logger, expected_size=entry_file.get_filesize(), keepRemotePath=True, - logger=self.logger ) try: self.smbClient.getFile( @@ -442,7 +459,9 @@ def recurse_action(base_dir="", path=[], localDownloadDir="./"): base_dir=self.smb_cwd, path=path+[entry_directory.get_longname()], localDownloadDir=localDownloadDir - ) + ) + if path is None: + path = self.smb_cwd or '' # Entrypoint try: if path.startswith(ntpath.sep): @@ -457,12 +476,12 @@ def recurse_action(base_dir="", path=[], localDownloadDir="./"): path=[path], localDownloadDir=localDownloadDir ) - except (BrokenPipeError, KeyboardInterrupt) as e: + except (BrokenPipeError, KeyboardInterrupt): print("\x1b[v\x1b[o\r[!] Interrupted.") self.close_smb_session() self.init_smb_session() - def get_entry(self, path=None): + def get_entry(self, path: Optional[str] = None) -> Optional[SharedFile]: """ Retrieves information about a specific entry located at the provided path on the SMB share. @@ -488,7 +507,7 @@ def get_entry(self, path=None): else: return None - def info(self, share=True, server=True): + def info(self, share: bool = True, server: bool = True): """ Displays information about the server and optionally the shares. @@ -545,11 +564,11 @@ def info(self, share=True, server=True): self.logger.print(" └─") if share and self.smb_share is not None: - share_name = self.available_shares.get(self.smb_share.lower(), "")["name"] - share_comment = self.available_shares.get(self.smb_share.lower(), "")["comment"] - share_type = self.available_shares.get(self.smb_share.lower(), "")["type"] + share_name = self.available_shares.get(self.smb_share.lower(), {"name": ""})["name"] + share_comment = self.available_shares.get(self.smb_share.lower(), {"comment": ""})["comment"] + share_type = self.available_shares.get(self.smb_share.lower(), {"type": ""})["type"] share_type =', '.join([s.replace("STYPE_","") for s in share_type]) - share_rawtype = self.available_shares.get(self.smb_share.lower(), "")["rawtype"] + share_rawtype = self.available_shares.get(self.smb_share.lower(), {"rawtype": ""})["rawtype"] if self.config.no_colors: self.logger.print("\n[+] Share:") self.logger.print(" ├─ Name ──────────── : %s" % (share_name)) @@ -563,7 +582,7 @@ def info(self, share=True, server=True): self.logger.print(" ├─ \x1b[94mType\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_type)) self.logger.print(" └─ \x1b[94mRaw type value\x1b[0m \x1b[90m──\x1b[0m : \x1b[93m%s\x1b[0m" % (share_rawtype)) - def list_contents(self, path=None): + def list_contents(self, path: Optional[str] = None) -> dict[str, SharedFile]: """ Lists the contents of a specified directory on the SMB share. @@ -595,7 +614,7 @@ def list_contents(self, path=None): return contents - def list_shares(self): + def list_shares(self) -> dict[str,dict]: """ Lists all the shares available on the connected SMB server. @@ -626,12 +645,13 @@ def list_shares(self): "rawtype": sharetype, "comment": sharecomment } + else: self.logger.error("Error: SMBSession.smbClient is None.") return self.available_shares - def mkdir(self, path=None): + def mkdir(self, path: str): """ Creates a directory at the specified path on the SMB share. @@ -640,42 +660,38 @@ def mkdir(self, path=None): the creation for that directory without raising an error. Args: - path (str, optional): The full path of the directory to create on the SMB share. Defaults to None. + path (str): The full path of the directory to create on the SMB share. Defaults to None. Note: The path should use forward slashes ('/') which will be converted to backslashes (ntpath.sep) for SMB compatibility. """ - if path is not None: - # Prepare path - path = path.replace('/',ntpath.sep) - if ntpath.sep in path: - path = path.strip(ntpath.sep).split(ntpath.sep) - else: - path = [path] - - # Create each dir in the path - for depth in range(1, len(path)+1): - tmp_path = ntpath.sep.join(path[:depth]) - try: - self.smbClient.createDirectory( - shareName=self.smb_share, - pathName=ntpath.normpath(self.smb_cwd + ntpath.sep + tmp_path + ntpath.sep) - ) - except impacket.smbconnection.SessionError as err: - if err.getErrorCode() == 0xc0000035: - # STATUS_OBJECT_NAME_COLLISION - # Remote directory already created, this is normal - # Src: https://github.com/fortra/impacket/blob/269ce69872f0e8f2188a80addb0c39fedfa6dcb8/impacket/nt_errors.py#L268C9-L268C19 - pass - else: - self.logger.error("Failed to create directory '%s': %s" % (tmp_path, err)) - if self.config.debug: - traceback.print_exc() + # Prepare path + split_path = path.replace('/',ntpath.sep) + if ntpath.sep in path: + split_path = path.strip(ntpath.sep).split(ntpath.sep) else: - pass + split_path = [split_path] + + # Create each dir in the path + for depth in range(1, len(split_path)+1): + tmp_path = ntpath.sep.join(split_path[:depth]) + try: + self.smbClient.createDirectory( + shareName=self.smb_share, + pathName=ntpath.normpath(self.smb_cwd + ntpath.sep + tmp_path + ntpath.sep) + ) + except SessionError as err: + if err.getErrorCode() == STATUS_OBJECT_NAME_COLLISION: + # Remote directory already created, this is normal + # Src: https://github.com/fortra/impacket/blob/269ce69872f0e8f2188a80addb0c39fedfa6dcb8/impacket/nt_errors.py#L268C9-L268C19 + pass + else: + self.logger.error("Failed to create directory '%s': %s" % (tmp_path, err)) + if self.config.debug: + traceback.print_exc() - def mount(self, local_mount_point, remote_path): + def mount(self, local_mount_point: str, remote_path: str): """ Generates the command to mount an SMB share on different platforms. @@ -720,7 +736,7 @@ def mount(self, local_mount_point, remote_path): self.logger.debug("Executing: %s" % command) os.system(command) - def path_exists(self, path=None): + def path_exists(self, path: Optional[str] = None): """ Checks if the specified path exists on the SMB share. @@ -748,7 +764,7 @@ def path_exists(self, path=None): else: return False - def path_isdir(self, pathFromRoot=None): + def path_isdir(self, pathFromRoot: Optional[str] = None): """ Checks if the specified path is a directory on the SMB share. @@ -791,7 +807,7 @@ def path_isdir(self, pathFromRoot=None): else: return False - def path_isfile(self, pathFromRoot=None): + def path_isfile(self, pathFromRoot: Optional[str] = None): """ Checks if the specified path is a file on the SMB share. @@ -830,7 +846,7 @@ def path_isfile(self, pathFromRoot=None): else: return False - def put_file(self, localpath=None): + def put_file(self, localpath: str): """ Uploads a single file to the SMB share. @@ -906,7 +922,7 @@ def put_file(self, localpath=None): # [!] The specified localpath does not exist. pass - def put_file_recursively(self, localpath=None): + def put_file_recursively(self, localpath: Optional[str] = None): """ Recursively uploads files from a specified local directory to the SMB share. @@ -919,6 +935,9 @@ def put_file_recursively(self, localpath=None): localpath (str, optional): The local directory path from which files will be uploaded. Defaults to None. """ + if localpath is None: + localpath = "./" + if os.path.exists(localpath): if os.path.isdir(localpath): # Iterate over all files and directories within the local path @@ -942,7 +961,7 @@ def put_file_recursively(self, localpath=None): f = LocalFileIO( mode="rb", path=local_dir_path + os.path.sep + local_file_path, - debug=self.config.debug + logger=self.logger ) self.smbClient.putFile( shareName=self.smb_share, @@ -966,7 +985,7 @@ def put_file_recursively(self, localpath=None): else: self.logger.error("The specified localpath does not exist.") - def read_file(self, path=None): + def read_file(self, path: Optional[str] = None): """ Reads a file from the SMB share. @@ -996,7 +1015,7 @@ def read_file(self, path=None): # opening the files in streams instead of mounting shares allows # for running the script from unprivileged containers self.smbClient.getFile(self.smb_share, tmp_file_path, fh.write) - except impacket.smbconnection.SessionError as e: + except SessionError as e: return None rawdata = fh.getvalue() fh.close() @@ -1004,7 +1023,7 @@ def read_file(self, path=None): else: return None - def rmdir(self, path=None): + def rmdir(self, path: str): """ Removes a directory from the SMB share at the specified path. @@ -1013,8 +1032,9 @@ def rmdir(self, path=None): the stack trace of the exception. Args: - path (str, optional): The path of the directory to be removed on the SMB share. Defaults to None. + path (str): The path of the directory to be removed on the SMB share. Defaults to None. """ + try: self.smbClient.deleteDirectory( shareName=self.smb_share, @@ -1025,7 +1045,7 @@ def rmdir(self, path=None): if self.config.debug: traceback.print_exc() - def rm(self, path=None): + def rm(self, path: str): """ Removes a file from the SMB share at the specified path. @@ -1034,7 +1054,7 @@ def rm(self, path=None): the stack trace of the exception. Args: - path (str, optional): The path of the file to be removed on the SMB share. Defaults to None. + path (str): The path of the file to be removed on the SMB share. Defaults to None. """ # Parse path @@ -1078,7 +1098,7 @@ def rm(self, path=None): if self.config.debug: traceback.print_exc() - def tree(self, path=None, quiet=False, outputfile=None): + def tree(self, path: Optional[str] = None, quiet: bool = False, outputfile: Optional[str] = None): """ Recursively lists the directory structure of the SMB share starting from the specified path. @@ -1170,7 +1190,7 @@ def tree(self, path=None, quiet=False, outputfile=None): except Exception as e: self.logger.error(f"Error during tree traversal: {e}") - def umount(self, local_mount_point): + def umount(self, local_mount_point: str): """ Unmounts the specified local mount point of the remote share. @@ -1203,7 +1223,7 @@ def umount(self, local_mount_point): # Other functions - def test_rights(self, sharename, test_write=False): + def test_rights(self, sharename: str, test_write: bool = False): """ Tests the read and write access rights of the current SMB session. @@ -1225,7 +1245,7 @@ def test_rights(self, sharename, test_write=False): try: self.smbClient.listPath(self.smb_share, '*', password=None) access_rights["readable"] = True - except impacket.smbconnection.SessionError as e: + except SessionError: access_rights["readable"] = False @@ -1236,7 +1256,7 @@ def test_rights(self, sharename, test_write=False): self.smbClient.createDirectory(self.smb_share, temp_dir) self.smbClient.deleteDirectory(self.smb_share, temp_dir) access_rights["writable"] = True - except impacket.smbconnection.SessionError as e: + except SessionError: access_rights["writable"] = False # Restore the current share @@ -1246,7 +1266,7 @@ def test_rights(self, sharename, test_write=False): # Setter / Getter - def set_share(self, shareName): + def set_share(self, shareName: str): """ Sets the current SMB share to the specified share name. @@ -1269,7 +1289,7 @@ def set_share(self, shareName): # Connects the tree try: self.smb_tree_id = self.smbClient.connectTree(self.smb_share) - except impacket.smbconnection.SessionError as err: + except SessionError as err: self.smb_share = None self.smb_cwd = "" self.logger.error("Could not access share '%s': %s" % (shareName, err)) @@ -1278,7 +1298,7 @@ def set_share(self, shareName): else: self.smb_share = None - def set_cwd(self, path=None): + def set_cwd(self, path: Optional[str] = None): """ Sets the current working directory on the SMB share to the specified path. diff --git a/smbclientng/core/SessionsManager.py b/smbclientng/core/SessionsManager.py index 0081f47..9dc04d6 100644 --- a/smbclientng/core/SessionsManager.py +++ b/smbclientng/core/SessionsManager.py @@ -4,12 +4,18 @@ # Author : Podalirius (@podalirius_) # Date created : 20 may 2024 +from __future__ import annotations import datetime +import time from smbclientng.core.Credentials import Credentials -from smbclientng.core.ModuleArgumentParser import ModuleArgumentParser +from smbclientng.core.ModuleArgumentParser import ModuleArgumentParser, ModuleArgumentParserError from smbclientng.core.SMBSession import SMBSession -import time +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from typing import Optional + from smbclientng.core.Logger import Logger + from smbclientng.core.Config import Config class SessionsManager(object): """ @@ -24,21 +30,20 @@ class SessionsManager(object): sessions (dict): A dictionary of all active sessions, keyed by their session ID. """ - next_session_id = 1 - current_session = None - current_session_id = None sessions = {} + next_session_id: int = 1 + current_session: Optional[SMBSession] + current_session_id: Optional[int] + + config: Config + logger: Logger - def __init__(self, config, logger): - self.sessions = {} - self.next_session_id = 1 - self.current_session = None - self.current_session_id = None + def __init__(self, config: Config, logger: Logger): self.config = config self.logger = logger - def create_new_session(self, credentials, host, timeout, port=445, advertisedName=None): + def create_new_session(self, credentials: Credentials, host: str, timeout: int, port: int = 445, advertisedName: Optional[str] = None): """ Creates a new session with the given session information. @@ -68,7 +73,7 @@ def create_new_session(self, credentials, host, timeout, port=445, advertisedNam self.switch_session(self.next_session_id) self.next_session_id += 1 - def switch_session(self, session_id): + def switch_session(self, session_id: int) -> bool: """ Switches the current session to the session with the specified ID. @@ -86,7 +91,7 @@ def switch_session(self, session_id): else: return False - def delete_session(self, session_id): + def delete_session(self, session_id: int) -> bool: """ Deletes a session with the given session ID. @@ -106,7 +111,7 @@ def delete_session(self, session_id): return True return False - def process_command_line(self, arguments): + def process_command_line(self, arguments: list[str]): """ Processes command line arguments to manage SMB sessions. @@ -132,7 +137,7 @@ def process_command_line(self, arguments): group_target = mode_create.add_argument_group("Target") group_target.add_argument("--host", action="store", metavar="HOST", required=True, type=str, help="IP address or hostname of the SMB Server to connect to.") group_target.add_argument("--port", action="store", metavar="PORT", type=int, default=445, help="Port of the SMB Server to connect to. (default: 445)") - group_target.add_argument( "--timeout", dest="timeout", metavar="TIMEOUT", required=False, type=float, default=3, help="Timeout in seconds for SMB connections (default: 3)") + group_target.add_argument( "--timeout", dest="timeout", metavar="TIMEOUT", required=False, type=int, default=3, help="Timeout in seconds for SMB connections (default: 3)") authconn = mode_create.add_argument_group("Authentication & connection") authconn.add_argument("--kdcHost", dest="kdcHost", action="store", metavar="FQDN KDC", help="FQDN of KDC for Kerberos.") authconn.add_argument("-d", "--domain", dest="auth_domain", metavar="DOMAIN", action="store", default='.', help="(FQDN) domain to authenticate to.") @@ -152,11 +157,11 @@ def process_command_line(self, arguments): group_sessions.add_argument("-a", "--all", default=False, action="store_true", help="Delete all sessions.") # Execute - mode_execute = ModuleArgumentParser(add_help=False, description="Send a smbclient-ng command line in one or more sessions.") - group_sessions = mode_execute.add_mutually_exclusive_group(required=True) - group_sessions.add_argument("-i", "--session-id", type=int, default=[], action="append", help="One or more ID of sessions to target.") - group_sessions.add_argument("-a", "--all", default=False, action="store_true", help="Execute command in all sessions.") - mode_execute.add_argument("-c", "--command", type=str, required=True, help="Command to execute in the target sessions.") + #mode_execute = ModuleArgumentParser(add_help=False, description="Send a smbclient-ng command line in one or more sessions.") + #group_sessions = mode_execute.add_mutually_exclusive_group(required=True) + #group_sessions.add_argument("-i", "--session-id", type=int, default=[], action="append", help="One or more ID of sessions to target.") + #group_sessions.add_argument("-a", "--all", default=False, action="store_true", help="Execute command in all sessions.") + #mode_execute.add_argument("-c", "--command", type=str, required=True, help="Command to execute in the target sessions.") # List mode_list = ModuleArgumentParser(add_help=False, description="List the registered sessions.") @@ -166,12 +171,12 @@ def process_command_line(self, arguments): subparsers.add_parser("interact", parents=[mode_interact], help=mode_interact.description) subparsers.add_parser("create", parents=[mode_create], help=mode_create.description) subparsers.add_parser("delete", parents=[mode_delete], help=mode_delete.description) - subparsers.add_parser("execute", parents=[mode_execute], help=mode_execute.description) + #subparsers.add_parser("execute", parents=[mode_execute], help=mode_execute.description) subparsers.add_parser("list", parents=[mode_list], help=mode_list.description) try: options = parser.parse_args(arguments) - except SystemExit as e: + except (SystemExit, ModuleArgumentParserError) as e: return # Process actions @@ -218,19 +223,19 @@ def process_command_line(self, arguments): print("[+] Closing and deleting session #%d" % session_id) self.delete_session(session_id=session_id) - # - elif options.action == "execute": - if options.command is not None: - if len(options.session_id) != 0: - for session_id in session_id: - if session_id in self.sessions.keys(): - self.logger.info("Executing '%s to session #%d" % (options.command, options.session_id)) - else: - self.logger.error("No session with id #%d" % options.session_id) - elif options.all == True: - all_session_ids = list(self.sessions.keys()) - for session_id in all_session_ids: - pass + # FIXME: Not implemented + #elif options.action == "execute": + # if options.command is not None: + # if len(options.session_id) != 0: + # for session_id in session_id: + # if session_id in self.sessions.keys(): + # self.logger.info("Executing '%s to session #%d" % (options.command, options.session_id)) + # else: + # self.logger.error("No session with id #%d" % options.session_id) + # elif options.all == True: + # all_session_ids = list(self.sessions.keys()) + # for session_id in all_session_ids: + # pass # elif options.action == "list": @@ -244,4 +249,4 @@ def process_command_line(self, arguments): print(f"\x1b[48;2;50;50;50m=> #{sessionId:<2} - '\x1b[1;96m{session.credentials.domain}\x1b[0m\x1b[48;2;50;50;50m\\\x1b[1;96m{session.credentials.username}\x1b[0m\x1b[48;2;50;50;50m\x1b[1m' @ {session.host}:{session.port} created at [{created_at_str}]\x1b[0m\x1b[48;2;50;50;50m [\x1b[93mcurrent session\x1b[0m\x1b[48;2;50;50;50m]\x1b[0m") else: print(f"── #{sessionId:<2} - '\x1b[1;96m{session.credentials.domain}\x1b[0m\\\x1b[1;96m{session.credentials.username}\x1b[0m\x1b[1m' @ {session.host}:{session.port} created at [{created_at_str}]\x1b[0m") - \ No newline at end of file + diff --git a/smbclientng/core/utils.py b/smbclientng/core/utils.py index 27b853b..d03028b 100644 --- a/smbclientng/core/utils.py +++ b/smbclientng/core/utils.py @@ -4,18 +4,24 @@ # Author : Podalirius (@podalirius_) # Date created : 23 may 2024 - +from __future__ import annotations import datetime import fnmatch -import impacket import ntpath import os import re import socket import stat +from impacket.smbconnection import SessionError +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from typing import Optional + from impacket.smb import SharedFile + from smbclientng.core.Config import Config + from smbclientng.core.SMBSession import SMBSession -def parse_lm_nt_hashes(lm_nt_hashes_string): +def parse_lm_nt_hashes(lm_nt_hashes_string: str) -> tuple[str, str]: """ Parse the input string containing LM and NT hash values and return them separately. @@ -51,7 +57,7 @@ def parse_lm_nt_hashes(lm_nt_hashes_string): return lm_hash_value, nt_hash_value -def b_filesize(l): +def b_filesize(l: int) -> str: """ Convert a file size from bytes to a more readable format using the largest appropriate unit. @@ -73,7 +79,7 @@ def b_filesize(l): return "%4.2f %s" % (round(l/(1024**(k)),2), units[k]) -def unix_permissions(entryname): +def unix_permissions(entryname: str) -> str: """ Generate a string representing the Unix-style permissions for a given file or directory. @@ -110,7 +116,7 @@ def unix_permissions(entryname): return ''.join(permissions) -def STYPE_MASK(stype_value): +def STYPE_MASK(stype_value: int) -> list[str]: """ Extracts the share type flags from a given share type value. @@ -146,7 +152,7 @@ def STYPE_MASK(stype_value): # A temporary share. "STYPE_TEMPORARY": 0x40000000 } - flags = [] + flags : list[str] = [] if (stype_value & 0b11) == known_flags["STYPE_DISKTREE"]: flags.append("STYPE_DISKTREE") elif (stype_value & 0b11) == known_flags["STYPE_PRINTQ"]: @@ -162,7 +168,7 @@ def STYPE_MASK(stype_value): return flags -def windows_ls_entry(entry, config, pathToPrint=None): +def windows_ls_entry(entry: SharedFile, config: Config, pathToPrint: Optional[str] =None): """ This function generates a metadata string based on the attributes of the provided entry object. @@ -207,7 +213,7 @@ def windows_ls_entry(entry, config, pathToPrint=None): return output_str -def local_tree(path, config): +def local_tree(path: str, config: Config): """ This function recursively lists the contents of a directory in a tree-like format. @@ -329,7 +335,7 @@ def recurse_action(base_dir="", path=[], prompt=[]): print("[!] Interrupted.") -def resolve_local_files(arguments): +def resolve_local_files(arguments: list[str]) -> list[str]: """ Resolves local file paths based on the provided arguments. @@ -344,7 +350,7 @@ def resolve_local_files(arguments): list: A list of resolved file paths that match the provided arguments. """ - resolved_files = [] + resolved_files: list[str] = [] for arg in arguments: if '*' in arg: try: @@ -361,7 +367,7 @@ def resolve_local_files(arguments): return resolved_files -def resolve_remote_files(smbSession, arguments): +def resolve_remote_files(smbSession: SMBSession, arguments: list[str]) -> list[str]: """ Resolves remote file paths based on the provided arguments using an SMB session. @@ -451,7 +457,7 @@ def resolve_remote_files(smbSession, arguments): return resolved_pathFromRoot_files -def is_port_open(target, port, timeout): +def is_port_open(target: str, port: int, timeout: float) -> Tuple[bool, Optional[str]]: """ Check if a specific port on a target host is open. @@ -477,7 +483,7 @@ def is_port_open(target, port, timeout): return False, str(e) -def smb_entry_iterator(smb_client, smb_share, start_paths, exclusion_rules=[], max_depth=None, min_depth=0, current_depth=0, filters=None): +def smb_entry_iterator(smb_client, smb_share: str, start_paths: list[str], exclusion_rules=[], max_depth: Optional[int] = None, min_depth: int = 0, current_depth: int = 0, filters: Optional[dict] = None): """ Iterates over SMB entries by traversing directories in a depth-first manner. @@ -500,7 +506,7 @@ def smb_entry_iterator(smb_client, smb_share, start_paths, exclusion_rules=[], m - depth (int): The current depth level of the entry within the traversal. - is_last_entry (bool): True if the entry is the last within its directory, False otherwise. """ - def entry_matches_filters(entry, filters): + def entry_matches_filters(entry, filters) -> bool: """ Checks if an entry matches the provided filters. @@ -542,7 +548,7 @@ def entry_matches_filters(entry, filters): return True - def size_matches_filter(size, size_filter): + def size_matches_filter(size: int, size_filter: str) -> bool: """ Checks if a size matches the size filter. @@ -653,7 +659,7 @@ def size_matches_filter(size, size_filter): # Yield the file yield entry, fullpath, current_depth, is_last_entry - except impacket.smbconnection.SessionError as err: + except SessionError as err: message = f"{err}. Base path: {base_path}" print("[\x1b[1;91merror\x1b[0m] %s" % message) continue