From eefa2cc6233fd2357c362777276101a64a855a78 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 10:37:31 +0200 Subject: [PATCH 1/6] First version of Logger --- smbclientng/core/Logger.py | 90 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 smbclientng/core/Logger.py diff --git a/smbclientng/core/Logger.py b/smbclientng/core/Logger.py new file mode 100644 index 0000000..0060fce --- /dev/null +++ b/smbclientng/core/Logger.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# File name : LocalFileIO.py +# Author : Podalirius (@podalirius_) +# Date created : 24 June 2024 + + +import os +import re +from enum import Enum + + +class LogLevel(Enum): + INFO = 1 + DEBUG = 2 + WARNING = 3 + ERROR = 4 + CRITICAL = 5 + + +class Logger(object): + """ + A Logger class that provides logging functionalities with various levels such as INFO, DEBUG, WARNING, ERROR, and CRITICAL. + It supports color-coded output, which can be disabled, and can also log messages to a file. + + Attributes: + __debug (bool): If True, debug level messages will be printed and logged. + __nocolors (bool): If True, disables color-coded output. + logfile (str|None): Path to a file where logs will be written. If None, logging to a file is disabled. + + Methods: + __init__(debug=False, logfile=None, nocolors=False): Initializes the Logger instance. + print(message=""): Prints a message to stdout and logs it to a file if logging is enabled. + info(message): Logs a message at the INFO level. + debug(message): Logs a message at the DEBUG level if debugging is enabled. + error(message): Logs a message at the ERROR level. + """ + + def __init__(self, debug=False, logfile=None, no_colors=False): + super(Logger, self).__init__() + self.__debug = debug + self.no_colors = no_colors + self.logfile = logfile + # + if self.logfile is not None: + if os.path.exists(self.logfile): + k = 1 + while os.path.exists(self.logfile+(".%d"%k)): + k += 1 + self.logfile = self.logfile + (".%d" % k) + open(self.logfile, "w").close() + + def print(self, message=""): + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) + if self.no_colors: + print(nocolor_message) + else: + print(message) + self.__write_to_logfile(nocolor_message) + + def info(self, message): + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) + if self.no_colors: + print("[info] %s" % nocolor_message) + else: + print("[info] %s" % message) + self.__write_to_logfile("[info] %s" % nocolor_message) + + def debug(self, message): + if self.__debug == True: + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) + if self.no_colors: + print("[debug] %s" % nocolor_message) + else: + print("[debug] %s" % message) + self.__write_to_logfile("[debug] %s" % nocolor_message) + + def error(self, message): + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) + if self.no_colors: + print("[error] %s" % nocolor_message) + else: + print("[error] %s" % message) + self.__write_to_logfile("[error] %s" % nocolor_message) + + def __write_to_logfile(self, message): + if self.logfile is not None: + f = open(self.logfile, "a") + f.write(message + "\n") + f.close() \ No newline at end of file From 1855d81f734c97476deb0c137780859d942da1e3 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 11:41:06 +0200 Subject: [PATCH 2/6] Updated logger --- smbclientng/core/Logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smbclientng/core/Logger.py b/smbclientng/core/Logger.py index 0060fce..6e3fd7b 100644 --- a/smbclientng/core/Logger.py +++ b/smbclientng/core/Logger.py @@ -63,7 +63,7 @@ def info(self, message): if self.no_colors: print("[info] %s" % nocolor_message) else: - print("[info] %s" % message) + print("[\x1b[1;92minfo\x1b[0m] %s" % message) self.__write_to_logfile("[info] %s" % nocolor_message) def debug(self, message): @@ -80,7 +80,7 @@ def error(self, message): if self.no_colors: print("[error] %s" % nocolor_message) else: - print("[error] %s" % message) + print("[\x1b[1;91merror\x1b[0m] %s" % message) self.__write_to_logfile("[error] %s" % nocolor_message) def __write_to_logfile(self, message): From 43100f60fb5702463ff8dcb9f1441af9c05b7f92 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 11:49:16 +0200 Subject: [PATCH 3/6] Passing config to Logger --- smbclientng/core/Logger.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/smbclientng/core/Logger.py b/smbclientng/core/Logger.py index 6e3fd7b..32008f1 100644 --- a/smbclientng/core/Logger.py +++ b/smbclientng/core/Logger.py @@ -36,10 +36,9 @@ class Logger(object): error(message): Logs a message at the ERROR level. """ - def __init__(self, debug=False, logfile=None, no_colors=False): + def __init__(self, config, logfile=None): super(Logger, self).__init__() - self.__debug = debug - self.no_colors = no_colors + self.config = config self.logfile = logfile # if self.logfile is not None: @@ -52,7 +51,7 @@ def __init__(self, debug=False, logfile=None, no_colors=False): def print(self, message=""): nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) - if self.no_colors: + if self.config.no_colors: print(nocolor_message) else: print(message) @@ -60,16 +59,16 @@ def print(self, message=""): def info(self, message): nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) - if self.no_colors: + if self.config.no_colors: print("[info] %s" % nocolor_message) else: print("[\x1b[1;92minfo\x1b[0m] %s" % message) self.__write_to_logfile("[info] %s" % nocolor_message) def debug(self, message): - if self.__debug == True: + if self.config.debug == True: nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) - if self.no_colors: + if self.config.no_colors: print("[debug] %s" % nocolor_message) else: print("[debug] %s" % message) @@ -77,7 +76,7 @@ def debug(self, message): def error(self, message): nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) - if self.no_colors: + if self.config.no_colors: print("[error] %s" % nocolor_message) else: print("[\x1b[1;91merror\x1b[0m] %s" % message) From e514ac54d247abb7074921920ebdcf535ff2627c Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 13:19:54 +0200 Subject: [PATCH 4/6] Connected logger to other classes --- smbclientng/__main__.py | 8 ++++++-- smbclientng/core/InteractiveShell.py | 4 +++- smbclientng/core/Module.py | 3 ++- smbclientng/core/SMBSession.py | 3 ++- smbclientng/core/SessionsManager.py | 5 +++-- 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/smbclientng/__main__.py b/smbclientng/__main__.py index d36fd17..67767f2 100644 --- a/smbclientng/__main__.py +++ b/smbclientng/__main__.py @@ -9,6 +9,7 @@ from smbclientng.core.Config import Config from smbclientng.core.Credentials import Credentials from smbclientng.core.InteractiveShell import InteractiveShell +from smbclientng.core.Logger import Logger from smbclientng.core.SessionsManager import SessionsManager @@ -106,7 +107,9 @@ def main(): config.not_interactive = options.not_interactive config.startup_script = options.startup_script - sessionsManager = SessionsManager(config=config) + logger = Logger(config=config, logfile=None) + + sessionsManager = SessionsManager(config=config, logger=logger) if any([(options.auth_domain != '.'), (options.auth_username is not None), (options.auth_password is not None),(options.auth_hashes is not None)]): credentials = Credentials( @@ -127,7 +130,8 @@ def main(): # Start the main interactive command line shell = InteractiveShell( sessionsManager=sessionsManager, - config=config + config=config, + logger=logger ) shell.run() diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index b56cbe1..0137e01 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -81,10 +81,12 @@ class InteractiveShell(object): running = True modules = {} - def __init__(self, sessionsManager, config): + def __init__(self, sessionsManager, config, logger): # Objects self.sessionsManager = sessionsManager self.config = config + self.logger = logger + # Internals self.commandCompleterObject = CommandCompleter(smbSession=self.sessionsManager.current_session, config=self.config) readline.set_completer(self.commandCompleterObject.complete) readline.parse_and_bind("tab: complete") diff --git a/smbclientng/core/Module.py b/smbclientng/core/Module.py index 70b0777..9c6d402 100644 --- a/smbclientng/core/Module.py +++ b/smbclientng/core/Module.py @@ -20,9 +20,10 @@ class Module(object): smbSession = None options = None - def __init__(self, smbSession, config): + def __init__(self, smbSession, config, logger): self.smbSession = smbSession self.config = config + self.logger = logger def parseArgs(self): raise NotImplementedError("Subclasses must implement this method") diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index 3e0b848..dded3bf 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -53,10 +53,11 @@ class SMBSession(object): test_rights(sharename): Tests read and write access rights on a share. """ - def __init__(self, host, port, credentials, config=None): + def __init__(self, host, port, credentials, config=None, logger=None): super(SMBSession, self).__init__() # Objects self.config = config + self.logger = logger # Target server self.host = host diff --git a/smbclientng/core/SessionsManager.py b/smbclientng/core/SessionsManager.py index 57acda8..9ed070f 100644 --- a/smbclientng/core/SessionsManager.py +++ b/smbclientng/core/SessionsManager.py @@ -17,13 +17,14 @@ class SessionsManager(object): current_session_id = None sessions = {} - def __init__(self, config): + def __init__(self, config, logger): self.sessions = {} self.next_session_id = 1 self.current_session = None self.current_session_id = None - self.config = config + self.config = config, + self.logger = logger def create_new_session(self, credentials, host, port=445): """ From 02dd2cf6c8c29b7766e4aa69e2b327de6b1ebfbc Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:50:46 +0200 Subject: [PATCH 5/6] Added logger everywhere --- smbclientng/core/CommandCompleter.py | 83 +++++------ smbclientng/core/InteractiveShell.py | 151 ++++++++++---------- smbclientng/core/LocalFileIO.py | 12 +- smbclientng/core/SMBSession.py | 197 +++++++++++++-------------- smbclientng/core/SessionsManager.py | 12 +- 5 files changed, 223 insertions(+), 232 deletions(-) diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index cf03f45..d70a7f0 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -325,10 +325,11 @@ class CommandCompleter(object): }, } - def __init__(self, smbSession, config): + def __init__(self, smbSession, config, logger): # Objects self.smbSession = smbSession self.config = config + self.logger = logger # Pre computing for some commands self.commands["help"]["subcommands"] = ["format"] + list(self.commands.keys()) self.commands["help"]["subcommands"].remove("help") @@ -532,54 +533,54 @@ def print_help(self, command=None): if command == "format": self.print_help_format() else: - print("│") + self.logger.print("│") if self.config.no_colors: command_str = command + "─"* (15 - len(command)) if len(self.commands[command]["description"]) == 0: - print("│ ■ %s┤ " % command_str) + self.logger.print("│ ■ %s┤ " % command_str) elif len(self.commands[command]["description"]) == 1: - print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) else: - print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) for line in self.commands[command]["description"][1:]: - print("│ %s│ %s " % (" "*(15+2), line)) + self.logger.print("│ %s│ %s " % (" "*(15+2), line)) else: command_str = command + " \x1b[90m" + "─"* (15 - len(command)) + "\x1b[0m" if len(self.commands[command]["description"]) == 0: - print("│ ■ %s\x1b[90m┤\x1b[0m " % command_str) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m " % command_str) elif len(self.commands[command]["description"]) == 1: - print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) else: - print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) for line in self.commands[command]["description"][1:]: - print("│ %s\x1b[90m│\x1b[0m %s " % (" "*(15+3), line)) - print("│") + self.logger.print("│ %s\x1b[90m│\x1b[0m %s " % (" "*(15+3), line)) + self.logger.print("│") # Generic help else: - print("│") + self.logger.print("│") commands = sorted(self.commands.keys()) for command in commands: if self.config.no_colors: command_str = command + "─"* (15 - len(command)) if len(self.commands[command]["description"]) == 0: - print("│ ■ %s┤ " % command_str) + self.logger.print("│ ■ %s┤ " % command_str) elif len(self.commands[command]["description"]) == 1: - print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) else: - print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s┤ %s " % (command_str, self.commands[command]["description"][0])) for line in self.commands[command]["description"][1:]: - print("│ %s│ %s " % (" "*(15+2), line)) + self.logger.print("│ %s│ %s " % (" "*(15+2), line)) else: command_str = command + " \x1b[90m" + "─"* (15 - len(command)) + "\x1b[0m" if len(self.commands[command]["description"]) == 0: - print("│ ■ %s\x1b[90m┤\x1b[0m " % command_str) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m " % command_str) elif len(self.commands[command]["description"]) == 1: - print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) else: - print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) + self.logger.print("│ ■ %s\x1b[90m┤\x1b[0m %s " % (command_str, self.commands[command]["description"][0])) for line in self.commands[command]["description"][1:]: - print("│ %s\x1b[90m│\x1b[0m %s " % (" "*(15+3), line)) - print("│") + self.logger.print("│ %s\x1b[90m│\x1b[0m %s " % (" "*(15+3), line)) + self.logger.print("│") def print_help_format(self): """ @@ -589,25 +590,25 @@ def print_help_format(self): of each character in the file attribute string, such as whether a file is read-only, hidden, or a directory. """ if self.config.no_colors: - print("File attributes format:\n") - print("dachnrst") - print("│││││││└──> Temporary") - print("││││││└───> System") - print("│││││└────> Read-Only") - print("││││└─────> Normal") - print("│││└──────> Hidden") - print("││└───────> Compressed") - print("│└────────> Archived") - print("└─────────> Directory") + self.logger.print("File attributes format:\n") + self.logger.print("dachnrst") + self.logger.print("│││││││└──> Temporary") + self.logger.print("││││││└───> System") + self.logger.print("│││││└────> Read-Only") + self.logger.print("││││└─────> Normal") + self.logger.print("│││└──────> Hidden") + self.logger.print("││└───────> Compressed") + self.logger.print("│└────────> Archived") + self.logger.print("└─────────> Directory") else: - print("File attributes format:\n") - print("dachnrst") - print("\x1b[90m│││││││└──>\x1b[0m Temporary") - print("\x1b[90m││││││└───>\x1b[0m System") - print("\x1b[90m│││││└────>\x1b[0m Read-Only") - print("\x1b[90m││││└─────>\x1b[0m Normal") - print("\x1b[90m│││└──────>\x1b[0m Hidden") - print("\x1b[90m││└───────>\x1b[0m Compressed") - print("\x1b[90m│└────────>\x1b[0m Archived") - print("\x1b[90m└─────────>\x1b[0m Directory") + self.logger.print("File attributes format:\n") + self.logger.print("dachnrst") + self.logger.print("\x1b[90m│││││││└──>\x1b[0m Temporary") + self.logger.print("\x1b[90m││││││└───>\x1b[0m System") + self.logger.print("\x1b[90m│││││└────>\x1b[0m Read-Only") + self.logger.print("\x1b[90m││││└─────>\x1b[0m Normal") + self.logger.print("\x1b[90m│││└──────>\x1b[0m Hidden") + self.logger.print("\x1b[90m││└───────>\x1b[0m Compressed") + self.logger.print("\x1b[90m│└────────>\x1b[0m Archived") + self.logger.print("\x1b[90m└─────────>\x1b[0m Directory") diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 0137e01..953e9d6 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -100,19 +100,19 @@ def run(self): f = open(self.config.startup_script, 'r') for line in f.readlines(): try: - print("%s%s" % (self.__prompt(), line.strip())) + self.logger.print("%s%s" % (self.__prompt(), line.strip())) self.process_line(commandLine=line.strip()) except KeyboardInterrupt as e: - print() + self.logger.print() except EOFError as e: - print() + self.logger.print() running = False - except Exception as e: + except Exception as err: if self.config.debug: traceback.print_exc() - print("[!] Error: %s" % str(e)) + self.logger.error(str(err)) # Then interactive console if not self.config.not_interactive: @@ -121,16 +121,16 @@ def run(self): user_input = input(self.__prompt()).strip() self.process_line(commandLine=user_input) except KeyboardInterrupt as e: - print() + self.logger.print() except EOFError as e: - print() + self.logger.print() running = False - except Exception as e: + except Exception as err: if self.config.debug: traceback.print_exc() - print("[!] Error: %s" % str(e)) + self.logger.error(str(err)) def process_line(self, commandLine): # Split and parse the commandLine @@ -289,14 +289,14 @@ def process_line(self, commandLine): # Fallback to unknown command else: - print("Unknown command. Type \"help\" for help.") + self.logger.print("Unknown command. Type \"help\" for help.") # Commands ================================================================ def command_debug(self, arguments, command): try: - print("[debug] command = '%s'" % command) - print("[debug] arguments = %s" % arguments) + self.logger.print("[debug] command = '%s'" % command) + self.logger.print("[debug] arguments = %s" % arguments) except Exception as e: traceback.print_exc() @@ -329,12 +329,12 @@ def command_bat(self, arguments, command): lexer = "html" syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) if len(files_and_directories) > 1: - print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + self.logger.print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) Console().print(syntax) else: - print("[!] Could not detect charset of '%s'." % path_to_file) + self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @@ -347,7 +347,7 @@ def command_cd(self, arguments, command): try: self.sessionsManager.current_session.set_cwd(path=arguments[0]) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @@ -370,12 +370,12 @@ def command_cat(self, arguments, command): if encoding is not None: filecontent = rawcontents.decode(encoding).rstrip() if len(files_and_directories) > 1: - print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) - print(filecontent) + self.logger.print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + self.logger.print(filecontent) else: - print("[!] Could not detect charset of '%s'." % path_to_file) + self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) def command_close(self, arguments, command): # Command arguments required : No @@ -412,7 +412,7 @@ def command_get(self, arguments, command): # Get this single file self.sessionsManager.current_session.get_file(path=remotepath) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) def command_help(self, arguments, command): # Command arguments required : No @@ -445,7 +445,7 @@ def command_info(self, arguments, command): server=print_server_info ) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required def command_lbat(self, arguments, command): @@ -476,14 +476,14 @@ def command_lbat(self, arguments, command): lexer = "html" syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) if len(files_and_directories) > 1: - print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + self.logger.print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) Console().print(syntax) else: - print("[!] Could not detect charset of '%s'." % path_to_file) + self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) else: - print("[!] Local file '%s' does not exist." % path_to_file) + self.logger.error("[!] Local file '%s' does not exist." % path_to_file) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required def command_lcat(self, arguments, command): @@ -506,14 +506,14 @@ def command_lcat(self, arguments, command): if encoding is not None: filecontent = rawcontents.decode(encoding).rstrip() if len(files_and_directories) > 1: - print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) - print(filecontent) + self.logger.print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + self.logger.print(filecontent) else: - print("[!] Could not detect charset of '%s'." % path_to_file) + self.logger.error("[!] Could not detect charset of '%s'." % path_to_file) else: - print("[!] Local file '%s' does not exist." % path_to_file) + self.logger.error("[!] Local file '%s' does not exist." % path_to_file) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) @command_arguments_required def command_lcd(self, arguments, command): @@ -527,9 +527,9 @@ def command_lcd(self, arguments, command): if os.path.isdir(s=path): os.chdir(path=path) else: - print("[!] Path '%s' is not a directory." % path) + self.logger.error("Path '%s' is not a directory." % path) else: - print("[!] Directory '%s' does not exists." % path) + self.logger.error("Directory '%s' does not exists." % path) @command_arguments_required def command_lcp(self, arguments, command): @@ -544,9 +544,9 @@ def command_lcp(self, arguments, command): try: shutil.copyfile(src=src_path, dst=dst_path) except shutil.SameFileError as err: - print("[!] Error: %s" % err) + self.logger.error("[!] Error: %s" % err) else: - print("[!] File '%s' does not exists." % src_path) + self.logger.error("[!] File '%s' does not exists." % src_path) else: self.commandCompleterObject.print_help(command=command) @@ -562,7 +562,7 @@ def command_lls(self, arguments, command): for path in arguments: if len(arguments) > 1: - print("%s:" % path) + self.logger.print("%s:" % path) # lls if os.path.isdir(path): directory_contents = os.listdir(path=path) @@ -574,26 +574,26 @@ def command_lls(self, arguments, command): if os.path.isdir(s=entryname): if self.config.no_colors: - print("%s %10s %s %s%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) + self.logger.print("%s %10s %s %s%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) else: - print("%s %10s %s \x1b[1;96m%s\x1b[0m%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) + self.logger.print("%s %10s %s \x1b[1;96m%s\x1b[0m%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) else: if self.config.no_colors: - print("%s %10s %s %s" % (rights_str, size_str, date_str, entryname)) + self.logger.print("%s %10s %s %s" % (rights_str, size_str, date_str, entryname)) else: - print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, entryname)) + self.logger.print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, entryname)) # lls elif os.path.isfile(path): rights_str = unix_permissions(path) size_str = b_filesize(os.path.getsize(filename=path)) date_str = datetime.datetime.fromtimestamp(os.path.getmtime(filename=path)).strftime("%Y-%m-%d %H:%M") if self.config.no_colors: - print("%s %10s %s %s" % (rights_str, size_str, date_str, os.path.basename(path))) + self.logger.print("%s %10s %s %s" % (rights_str, size_str, date_str, os.path.basename(path))) else: - print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, os.path.basename(path))) + self.logger.print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, os.path.basename(path))) if len(arguments) > 1: - print() + self.logger.print() @command_arguments_required def command_lmkdir(self, arguments, command): @@ -618,7 +618,7 @@ def command_lpwd(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - print(os.getcwd()) + self.logger.print(os.getcwd()) @command_arguments_required def command_lrename(self, arguments, command): @@ -644,11 +644,11 @@ def command_lrm(self, arguments, command): try: os.remove(path=path) except Exception as e: - print("[!] Error removing file '%s' : %s" % path) + self.logger.error("Error removing file '%s' : %s" % path) else: - print("[!] Cannot delete '%s'. It is a directory, use 'lrmdir ' instead." % path) + self.logger.error("Cannot delete '%s'. It is a directory, use 'lrmdir ' instead." % path) else: - print("[!] Path '%s' does not exist." % path) + self.logger.error("Path '%s' does not exist." % path) @command_arguments_required def command_lrmdir(self, arguments, command): @@ -666,11 +666,11 @@ def command_lrmdir(self, arguments, command): try: shutil.rmtree(path=path) except Exception as e: - print("[!] Error removing directory '%s' : %s" % path) + self.logger.error("Error removing directory '%s' : %s" % path) else: - print("[!] Cannot delete '%s'. It is a file, use 'lrm ' instead." % path) + self.logger.error("Cannot delete '%s'. It is a file, use 'lrm ' instead." % path) else: - print("[!] Path '%s' does not exist." % path) + self.logger.error("Path '%s' does not exist." % path) def command_ltree(self, arguments, command): # Command arguments required : No @@ -701,7 +701,7 @@ def command_ls(self, arguments, command): for path in arguments: if len(arguments) > 1: - print("%s:" % path) + self.logger.print("%s:" % path) if self.sessionsManager.current_session.path_isdir(pathFromRoot=path): # Read the files @@ -717,7 +717,7 @@ def command_ls(self, arguments, command): windows_ls_entry(directory_contents[longname], self.config) if len(arguments) > 1: - print() + self.logger.print() @command_arguments_required @active_smb_connection_needed @@ -740,7 +740,7 @@ def command_module(self, arguments, command): arguments_string = ' '.join(arguments[1:]) module.run(arguments_string) else: - print("[!] Module '%s' does not exist." % module_name) + self.logger.error("Module '%s' does not exist." % module_name) @command_arguments_required @active_smb_connection_needed @@ -757,8 +757,7 @@ def command_mount(self, arguments, command): local_mount_point = arguments[1] - if self.config.debug: - print("[debug] Trying to mount remote '%s' onto local '%s'" % (remote_path, local_mount_point)) + self.logger.debug("Trying to mount remote '%s' onto local '%s'" % (remote_path, local_mount_point)) try: self.sessionsManager.current_session.mount(local_mount_point, remote_path) @@ -786,7 +785,7 @@ def command_put(self, arguments, command): # for localpath in files_and_directories: try: - print(localpath) + self.logger.print(localpath) if is_recursive and os.path.isdir(s=localpath): # Put files recursively self.sessionsManager.current_session.put_file_recursively(localpath=localpath) @@ -794,7 +793,7 @@ def command_put(self, arguments, command): # Put this single file self.sessionsManager.current_session.put_file(localpath=localpath) except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + self.logger.error("[!] SMB Error: %s" % e) def command_reconnect(self, arguments, command): # Command arguments required : No @@ -835,12 +834,12 @@ def command_rm(self, arguments, command): try: self.sessionsManager.current_session.rm(path=path_to_file) except Exception as e: - print("[!] Error removing file '%s' : %s" % path_to_file) + self.logger.error("Error removing file '%s' : %s" % path_to_file) else: - print("[!] Cannot delete '%s': This is a directory, use 'rmdir ' instead." % path_to_file) + self.logger.error("Cannot delete '%s': This is a directory, use 'rmdir ' instead." % path_to_file) # File does not exist else: - print("[!] Remote file '%s' does not exist." % path_to_file) + self.logger.error("Remote file '%s' does not exist." % path_to_file) @command_arguments_required @active_smb_connection_needed @@ -856,11 +855,11 @@ def command_rmdir(self, arguments, command): try: self.sessionsManager.current_session.rmdir(path=path_to_directory) except Exception as e: - print("[!] Error removing directory '%s' : %s" % path_to_directory) + self.logger.error("Error removing directory '%s' : %s" % path_to_directory) else: - print("[!] Cannot delete '%s': This is a file, use 'rm ' instead." % path_to_directory) + self.logger.error("Cannot delete '%s': This is a file, use 'rm ' instead." % path_to_directory) else: - print("[!] Remote directory '%s' does not exist." % path_to_directory) + self.logger.error("Remote directory '%s' does not exist." % path_to_directory) @active_smb_connection_needed @smb_share_is_set @@ -893,7 +892,7 @@ def print(self, end='\n'): path = "%s" % self.entry.get_longname() else: path = "\x1b[1m%s\x1b[0m" % self.entry.get_longname() - print("%10s %s" % (b_filesize(self.size), path), end=end) + self.logger.print("%10s %s" % (b_filesize(self.size), path), end=end) entries = [] if len(arguments) == 0: @@ -905,7 +904,7 @@ def print(self, end='\n'): if entry is not None: entries = [entry] else: - print("[!] Path '%s' does not exist." % ' '.join(arguments)) + self.logger.print("[!] Path '%s' does not exist." % ' '.join(arguments)) total = 0 for entry in entries: @@ -924,8 +923,8 @@ def print(self, end='\n'): total += rsop.size if len(entries) > 1: - print("──────────────────────") - print(" total %s" % b_filesize(total)) + self.logger.print("──────────────────────") + self.logger.print(" total %s" % b_filesize(total)) @active_smb_connection_needed def command_shares(self, arguments, command): @@ -983,7 +982,7 @@ def command_shares(self, arguments, command): Console().print(table) else: - print("[!] No share served on '%s'" % self.sessionsManager.current_session.host) + self.logger.error("No share served on '%s'" % self.sessionsManager.current_session.host) @active_smb_connection_needed @smb_share_is_set @@ -1007,8 +1006,7 @@ def command_umount(self, arguments, command): local_mount_point = arguments[0] - if self.config.debug: - print("[debug] Trying to unmount local mount point '%s'" % (local_mount_point)) + self.logger.debug("Trying to unmount local mount point '%s'" % (local_mount_point)) self.sessionsManager.current_session.umount(local_mount_point) @@ -1028,7 +1026,7 @@ def command_use(self, arguments, command): if sharename.lower() in shares: self.sessionsManager.current_session.set_share(sharename) else: - print("[!] No share named '%s' on '%s'" % (sharename, self.sessionsManager.current_session.host)) + self.logger.error("No share named '%s' on '%s'" % (sharename, self.sessionsManager.current_session.host)) # Private functions ======================================================= @@ -1048,8 +1046,7 @@ def __load_modules(self): self.modules.clear() modules_dir = os.path.normpath(os.path.dirname(__file__) + os.path.sep + ".." + os.path.sep + "modules") - if self.config.debug: - print("[debug] Loading modules from %s ..." % modules_dir) + self.logger.debug("Loading modules from %s ..." % modules_dir) sys.path.extend([modules_dir]) for file in os.listdir(modules_dir): @@ -1065,13 +1062,13 @@ def __load_modules(self): if self.config.debug: if len(self.modules.keys()) == 0: - print("[debug] Loaded 0 modules.") + self.logger.debug("Loaded 0 modules.") elif len(self.modules.keys()) == 1: - print("[debug] Loaded 1 module:") + self.logger.debug("Loaded 1 module:") else: - print("[debug] Loaded %d modules:" % len(self.modules.keys())) + self.logger.debug("Loaded %d modules:" % len(self.modules.keys())) for modulename in sorted(self.modules.keys()): - print("[debug] %s : \"%s\" (%s)" % (self.modules[modulename].name, self.modules[modulename].description, self.modules[modulename])) + self.logger.debug("%s : \"%s\" (%s)" % (self.modules[modulename].name, self.modules[modulename].description, self.modules[modulename])) if self.commandCompleterObject is not None: self.commandCompleterObject.commands["module"]["subcommands"] = list(self.modules.keys()) diff --git a/smbclientng/core/LocalFileIO.py b/smbclientng/core/LocalFileIO.py index dd251e1..176260d 100644 --- a/smbclientng/core/LocalFileIO.py +++ b/smbclientng/core/LocalFileIO.py @@ -27,8 +27,9 @@ class LocalFileIO(object): read(self, size): Reads data from the file up to the specified size and updates the progress bar if expected size is provided. """ - def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, debug=False): + def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, logger=None): super(LocalFileIO, self).__init__() + self.logger = logger self.mode = mode # Convert remote path format to local operating system path format self.path = path.replace(ntpath.sep, os.path.sep) @@ -44,12 +45,10 @@ def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, de self.dir += os.path.dirname(self.path) if not os.path.exists(self.dir): - if self.debug: - print("[debug] Creating local directory '%s'" % self.dir) + self.logger.debug("[debug] Creating local directory '%s'" % self.dir) os.makedirs(self.dir) - if self.debug: - print("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) + self.logger.debug("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) try: self.fd = open(self.dir + os.path.sep + os.path.basename(self.path), self.mode) @@ -61,8 +60,7 @@ def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, de if ntpath.sep in self.path: self.dir = os.path.dirname(self.path) - if self.debug: - print("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) + self.logger.debug("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) try: self.fd = open(self.path, self.mode) diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index dded3bf..63037d4 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -94,11 +94,9 @@ def close_smb_session(self): if self.connected: self.smbClient.close() self.connected = False - if self.config.debug: - print("[+] SMB connection closed successfully.") + self.logger.debug("[+] SMB connection closed successfully.") else: - if self.config.debug: - print("[!] No active SMB connection to close.") + self.logger.debug("[!] No active SMB connection to close.") else: raise Exception("SMB client is not initialized.") @@ -117,8 +115,8 @@ def init_smb_session(self): self.connected = False - if self.config.debug: - print("[debug] [>] Connecting to remote SMB server '%s' ... " % self.host) + self.logger.debug("[>] Connecting to remote SMB server '%s' ... " % self.host) + try: if is_port_open(self.host, self.port): self.smbClient = impacket.smbconnection.SMBConnection( @@ -131,13 +129,12 @@ def init_smb_session(self): except OSError as err: if self.config.debug: traceback.print_exc() - print("[!] %s" % err) + self.logger.error(err) self.smbClient = None if self.smbClient is not None: if self.credentials.use_kerberos: - if self.config.debug: - print("[debug] [>] Authenticating as '%s\\%s' with kerberos ... " % (self.credentials.domain, self.credentials.username)) + self.logger.debug("[>] Authenticating as '%s\\%s' with kerberos ... " % (self.credentials.domain, self.credentials.username)) try: self.connected = self.smbClient.kerberosLogin( user=self.credentials.username, @@ -151,12 +148,12 @@ def init_smb_session(self): except impacket.smbconnection.SessionError as err: if self.config.debug: traceback.print_exc() - print("[!] Could not login: %s" % err) + self.logger.error("Could not login: %s" % err) self.connected = False else: - if self.config.debug: - print("[debug] [>] Authenticating as '%s\\%s' with NTLM ... " % (self.credentials.domain, self.credentials.username)) + self.logger.debug("[>] Authenticating as '%s\\%s' with NTLM ... " % (self.credentials.domain, self.credentials.username)) + try: self.connected = self.smbClient.login( user=self.credentials.username, @@ -168,13 +165,13 @@ def init_smb_session(self): except impacket.smbconnection.SessionError as err: if self.config.debug: traceback.print_exc() - print("[!] Could not login: %s" % err) + self.logger.error("Could not login: %s" % err) self.connected = False if self.connected: - print("[+] Successfully authenticated to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + self.logger.print("[+] Successfully authenticated to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) else: - print("[!] Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + self.logger.error("Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) return self.connected @@ -261,7 +258,7 @@ def recurse_action(paths=[], depth=0, callback=None): ) depth = depth + 1 else: - print("[!] SMBSession.find(), callback function cannot be None.") + self.logger.error("SMBSession.find(), callback function cannot be None.") def get_file(self, path=None, keepRemotePath=False): """ @@ -310,8 +307,7 @@ def get_file(self, path=None, keepRemotePath=False): for entry in matching_entries: if entry.is_directory(): - if self.config.debug: - print("[debug] [>] Skipping '%s' because it is a directory." % (tmp_search_path + ntpath.sep + entry.get_longname())) + self.logger.debug("[>] Skipping '%s' because it is a directory." % (tmp_search_path + ntpath.sep + entry.get_longname())) else: try: if ntpath.sep in path: @@ -370,7 +366,7 @@ def recurse_action(base_dir="", path=[]): # Files if len(files) != 0: - print("[>] Retrieving files of '%s'" % remote_smb_path) + self.logger.print("[>] Retrieving files of '%s'" % remote_smb_path) for entry_file in files: if not entry_file.is_directory(): f = LocalFileIO( @@ -455,45 +451,45 @@ def info(self, share=True, server=True): if server: if self.config.no_colors: - print("[+] Server:") - print(" ├─NetBIOS:") - print(" │ ├─ NetBIOS Hostname ──────── : %s" % (self.smbClient.getServerName())) - print(" │ └─ NetBIOS Domain ────────── : %s" % (self.smbClient.getServerDomain())) - print(" ├─DNS:") - print(" │ ├─ DNS Hostname ──────────── : %s" % (self.smbClient.getServerDNSHostName())) - print(" │ └─ DNS Domain ────────────── : %s" % (self.smbClient.getServerDNSDomainName())) - print(" ├─OS:") - print(" │ ├─ OS Name ───────────────── : %s" % (self.smbClient.getServerOS())) - print(" │ └─ OS Version ────────────── : %s.%s.%s" % (self.smbClient.getServerOSMajor(), self.smbClient.getServerOSMinor(), self.smbClient.getServerOSBuild())) - print(" ├─Server:") - print(" │ ├─ Signing Required ──────── : %s" % (self.smbClient.isSigningRequired())) - print(" │ ├─ Login Required ────────── : %s" % (self.smbClient.isLoginRequired())) - print(" │ ├─ Supports NTLMv2 ───────── : %s" % (self.smbClient.doesSupportNTLMv2())) + self.logger.print("[+] Server:") + self.logger.print(" ├─NetBIOS:") + self.logger.print(" │ ├─ NetBIOS Hostname ──────── : %s" % (self.smbClient.getServerName())) + self.logger.print(" │ └─ NetBIOS Domain ────────── : %s" % (self.smbClient.getServerDomain())) + self.logger.print(" ├─DNS:") + self.logger.print(" │ ├─ DNS Hostname ──────────── : %s" % (self.smbClient.getServerDNSHostName())) + self.logger.print(" │ └─ DNS Domain ────────────── : %s" % (self.smbClient.getServerDNSDomainName())) + self.logger.print(" ├─OS:") + self.logger.print(" │ ├─ OS Name ───────────────── : %s" % (self.smbClient.getServerOS())) + self.logger.print(" │ └─ OS Version ────────────── : %s.%s.%s" % (self.smbClient.getServerOSMajor(), self.smbClient.getServerOSMinor(), self.smbClient.getServerOSBuild())) + self.logger.print(" ├─Server:") + self.logger.print(" │ ├─ Signing Required ──────── : %s" % (self.smbClient.isSigningRequired())) + self.logger.print(" │ ├─ Login Required ────────── : %s" % (self.smbClient.isLoginRequired())) + self.logger.print(" │ ├─ Supports NTLMv2 ───────── : %s" % (self.smbClient.doesSupportNTLMv2())) MaxReadSize = self.smbClient.getIOCapabilities()["MaxReadSize"] - print(" │ ├─ Max size of read chunk ── : %d bytes (%s)" % (MaxReadSize, b_filesize(MaxReadSize))) + self.logger.print(" │ ├─ Max size of read chunk ── : %d bytes (%s)" % (MaxReadSize, b_filesize(MaxReadSize))) MaxWriteSize = self.smbClient.getIOCapabilities()["MaxWriteSize"] - print(" │ └─ Max size of write chunk ─ : %d bytes (%s)" % (MaxWriteSize, b_filesize(MaxWriteSize))) - print(" └─") + self.logger.print(" │ └─ Max size of write chunk ─ : %d bytes (%s)" % (MaxWriteSize, b_filesize(MaxWriteSize))) + self.logger.print(" └─") else: - print("[+] Server:") - print(" ├─NetBIOS:") - print(" │ ├─ \x1b[94mNetBIOS Hostname\x1b[0m \x1b[90m────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerName())) - print(" │ └─ \x1b[94mNetBIOS Domain\x1b[0m \x1b[90m──────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDomain())) - print(" ├─DNS:") - print(" │ ├─ \x1b[94mDNS Hostname\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDNSHostName())) - print(" │ └─ \x1b[94mDNS Domain\x1b[0m \x1b[90m──────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDNSDomainName())) - print(" ├─OS:") - print(" │ ├─ \x1b[94mOS Name\x1b[0m \x1b[90m─────────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerOS())) - print(" │ └─ \x1b[94mOS Version\x1b[0m \x1b[90m──────────────\x1b[0m : \x1b[93m%s.%s.%s\x1b[0m" % (self.smbClient.getServerOSMajor(), self.smbClient.getServerOSMinor(), self.smbClient.getServerOSBuild())) - print(" ├─Server:") - print(" │ ├─ \x1b[94mSigning Required\x1b[0m \x1b[90m────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.isSigningRequired())) - print(" │ ├─ \x1b[94mLogin Required\x1b[0m \x1b[90m──────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.isLoginRequired())) - print(" │ ├─ \x1b[94mSupports NTLMv2\x1b[0m \x1b[90m─────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.doesSupportNTLMv2())) + self.logger.print("[+] Server:") + self.logger.print(" ├─NetBIOS:") + self.logger.print(" │ ├─ \x1b[94mNetBIOS Hostname\x1b[0m \x1b[90m────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerName())) + self.logger.print(" │ └─ \x1b[94mNetBIOS Domain\x1b[0m \x1b[90m──────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDomain())) + self.logger.print(" ├─DNS:") + self.logger.print(" │ ├─ \x1b[94mDNS Hostname\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDNSHostName())) + self.logger.print(" │ └─ \x1b[94mDNS Domain\x1b[0m \x1b[90m──────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerDNSDomainName())) + self.logger.print(" ├─OS:") + self.logger.print(" │ ├─ \x1b[94mOS Name\x1b[0m \x1b[90m─────────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.getServerOS())) + self.logger.print(" │ └─ \x1b[94mOS Version\x1b[0m \x1b[90m──────────────\x1b[0m : \x1b[93m%s.%s.%s\x1b[0m" % (self.smbClient.getServerOSMajor(), self.smbClient.getServerOSMinor(), self.smbClient.getServerOSBuild())) + self.logger.print(" ├─Server:") + self.logger.print(" │ ├─ \x1b[94mSigning Required\x1b[0m \x1b[90m────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.isSigningRequired())) + self.logger.print(" │ ├─ \x1b[94mLogin Required\x1b[0m \x1b[90m──────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.isLoginRequired())) + self.logger.print(" │ ├─ \x1b[94mSupports NTLMv2\x1b[0m \x1b[90m─────────\x1b[0m : \x1b[93m%s\x1b[0m" % (self.smbClient.doesSupportNTLMv2())) MaxReadSize = self.smbClient.getIOCapabilities()["MaxReadSize"] - print(" │ ├─ \x1b[94mMax size of read chunk\x1b[0m \x1b[90m──\x1b[0m : \x1b[93m%d bytes (%s)\x1b[0m" % (MaxReadSize, b_filesize(MaxReadSize))) + self.logger.print(" │ ├─ \x1b[94mMax size of read chunk\x1b[0m \x1b[90m──\x1b[0m : \x1b[93m%d bytes (%s)\x1b[0m" % (MaxReadSize, b_filesize(MaxReadSize))) MaxWriteSize = self.smbClient.getIOCapabilities()["MaxWriteSize"] - print(" │ └─ \x1b[94mMax size of write chunk\x1b[0m \x1b[90m─\x1b[0m : \x1b[93m%d bytes (%s)\x1b[0m" % (MaxWriteSize, b_filesize(MaxWriteSize))) - print(" └─") + self.logger.print(" │ └─ \x1b[94mMax size of write chunk\x1b[0m \x1b[90m─\x1b[0m : \x1b[93m%d bytes (%s)\x1b[0m" % (MaxWriteSize, b_filesize(MaxWriteSize))) + self.logger.print(" └─") if share and self.smb_share is not None: share_name = self.available_shares.get(self.smb_share.lower(), "")["name"] @@ -502,17 +498,17 @@ def info(self, share=True, server=True): share_type =', '.join([s.replace("STYPE_","") for s in share_type]) share_rawtype = self.available_shares.get(self.smb_share.lower(), "")["rawtype"] if self.config.no_colors: - print("\n[+] Share:") - print(" ├─ Name ──────────── : %s" % (share_name)) - print(" ├─ Description ───── : %s" % (share_comment)) - print(" ├─ Type ──────────── : %s" % (share_type)) - print(" └─ Raw type value ── : %s" % (share_rawtype)) + self.logger.print("\n[+] Share:") + self.logger.print(" ├─ Name ──────────── : %s" % (share_name)) + self.logger.print(" ├─ Description ───── : %s" % (share_comment)) + self.logger.print(" ├─ Type ──────────── : %s" % (share_type)) + self.logger.print(" └─ Raw type value ── : %s" % (share_rawtype)) else: - print("\n[+] Share:") - print(" ├─ \x1b[94mName\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_name)) - print(" ├─ \x1b[94mDescription\x1b[0m \x1b[90m─────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_comment)) - print(" ├─ \x1b[94mType\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_type)) - print(" └─ \x1b[94mRaw type value\x1b[0m \x1b[90m──\x1b[0m : \x1b[93m%s\x1b[0m" % (share_rawtype)) + self.logger.print("\n[+] Share:") + self.logger.print(" ├─ \x1b[94mName\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_name)) + self.logger.print(" ├─ \x1b[94mDescription\x1b[0m \x1b[90m─────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_comment)) + self.logger.print(" ├─ \x1b[94mType\x1b[0m \x1b[90m────────────\x1b[0m : \x1b[93m%s\x1b[0m" % (share_type)) + self.logger.print(" └─ \x1b[94mRaw type value\x1b[0m \x1b[90m──\x1b[0m : \x1b[93m%s\x1b[0m" % (share_rawtype)) def list_contents(self, path=None): """ @@ -578,7 +574,7 @@ def list_shares(self): "comment": sharecomment } else: - print("[!] Error: SMBSession.smbClient is None.") + self.logger.error("Error: SMBSession.smbClient is None.") return self.available_shares @@ -620,7 +616,7 @@ def mkdir(self, path=None): # Src: https://github.com/fortra/impacket/blob/269ce69872f0e8f2188a80addb0c39fedfa6dcb8/impacket/nt_errors.py#L268C9-L268C19 pass else: - print("[!] Failed to create directory '%s': %s" % (tmp_path, err)) + self.logger.error("Failed to create directory '%s': %s" % (tmp_path, err)) if self.config.debug: traceback.print_exc() else: @@ -664,11 +660,11 @@ def mount(self, local_mount_point, remote_path): else: command = None - print("[!] Unsupported platform for mounting SMB share.") + self.logger.error("Unsupported platform for mounting SMB share.") if command is not None: if self.config.debug: - print("[debug] Executing: %s" % command) + self.logger.debug("Executing: %s" % command) os.system(command) def path_exists(self, path=None): @@ -841,7 +837,7 @@ def put_file(self, localpath=None): f.close() except (BrokenPipeError, KeyboardInterrupt) as err: - print("[!] Interrupted.") + self.logger.error("Interrupted.") self.close_smb_session() self.init_smb_session() @@ -880,7 +876,7 @@ def put_file_recursively(self, localpath=None): # Iterate over the found files for local_dir_path in sorted(local_files.keys()): - print("[>] Putting files of '%s'" % local_dir_path) + self.logger.print("[>] Putting files of '%s'" % local_dir_path) # Create remote directory remote_dir_path = local_dir_path.replace(os.path.sep, ntpath.sep) @@ -903,7 +899,7 @@ def put_file_recursively(self, localpath=None): f.close() except (BrokenPipeError, KeyboardInterrupt) as err: - print("[!] Interrupted.") + self.logger.error("Interrupted.") self.close_smb_session() self.init_smb_session() @@ -913,9 +909,9 @@ def put_file_recursively(self, localpath=None): if self.config.debug: traceback.print_exc() else: - print("[!] The specified localpath is a file. Use 'put ' instead.") + self.logger.error("The specified localpath is a file. Use 'put ' instead.") else: - print("[!] The specified localpath does not exist.") + self.logger.error("The specified localpath does not exist.") def read_file(self, path=None): """ @@ -972,7 +968,7 @@ def rmdir(self, path=None): pathName=ntpath.normpath(self.smb_cwd + ntpath.sep + path), ) except Exception as err: - print("[!] Failed to remove directory '%s': %s" % (path, err)) + self.logger.error("Failed to remove directory '%s': %s" % (path, err)) if self.config.debug: traceback.print_exc() @@ -1025,7 +1021,7 @@ def rm(self, path=None): pathName=ntpath.normpath(tmp_search_path + ntpath.sep + entry.get_longname()), ) except Exception as err: - print("[!] Failed to remove file '%s': %s" % (path, err)) + self.logger.error("Failed to remove file '%s': %s" % (path, err)) if self.config.debug: traceback.print_exc() @@ -1057,9 +1053,9 @@ def recurse_action(base_dir="", path=[], prompt=[]): code, const, text = err.getErrorCode(), err.getErrorString()[0], err.getErrorString()[1] errmsg = "Error 0x%08x (%s): %s" % (code, const, text) if self.config.no_colors: - print("%s%s" % (''.join(prompt+[bars[2]]), errmsg)) + self.logger.print("%s%s" % (''.join(prompt+[bars[2]]), errmsg)) else: - print("%s\x1b[1;91m%s\x1b[0m" % (''.join(prompt+[bars[2]]), errmsg)) + self.logger.print("%s\x1b[1;91m%s\x1b[0m" % (''.join(prompt+[bars[2]]), errmsg)) return entries = [e for e in entries if e.get_longname() not in [".", ".."]] @@ -1074,9 +1070,9 @@ def recurse_action(base_dir="", path=[], prompt=[]): if index == 0: if entry.is_directory(): if self.config.no_colors: - print("%s%s\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s%s\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) else: - print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) recurse_action( base_dir=base_dir, path=path+[entry.get_longname()], @@ -1084,17 +1080,17 @@ def recurse_action(base_dir="", path=[], prompt=[]): ) else: if self.config.no_colors: - print("%s%s" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s%s" % (''.join(prompt+[bars[1]]), entry.get_longname())) else: - print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[1]]), entry.get_longname())) # This is the last entry elif index == len(entries): if entry.is_directory(): if self.config.no_colors: - print("%s%s\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s%s\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) else: - print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) recurse_action( base_dir=base_dir, path=path+[entry.get_longname()], @@ -1102,17 +1098,17 @@ def recurse_action(base_dir="", path=[], prompt=[]): ) else: if self.config.no_colors: - print("%s%s" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s%s" % (''.join(prompt+[bars[2]]), entry.get_longname())) else: - print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[2]]), entry.get_longname())) # These are entries in the middle else: if entry.is_directory(): if self.config.no_colors: - print("%s%s\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s%s\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) else: - print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[1]]), entry.get_longname())) recurse_action( base_dir=base_dir, path=path+[entry.get_longname()], @@ -1120,18 +1116,18 @@ def recurse_action(base_dir="", path=[], prompt=[]): ) else: if self.config.no_colors: - print("%s%s" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s%s" % (''.join(prompt+[bars[1]]), entry.get_longname())) else: - print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[1]]), entry.get_longname())) + self.logger.print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[1]]), entry.get_longname())) # elif len(entries) == 1: entry = entries[0] if entry.is_directory(): if self.config.no_colors: - print("%s%s\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s%s\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) else: - print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s\x1b[1;96m%s\x1b[0m\\" % (''.join(prompt+[bars[2]]), entry.get_longname())) recurse_action( base_dir=base_dir, path=path+[entry.get_longname()], @@ -1139,23 +1135,23 @@ def recurse_action(base_dir="", path=[], prompt=[]): ) else: if self.config.no_colors: - print("%s%s" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s%s" % (''.join(prompt+[bars[2]]), entry.get_longname())) else: - print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[2]]), entry.get_longname())) + self.logger.print("%s\x1b[1m%s\x1b[0m" % (''.join(prompt+[bars[2]]), entry.get_longname())) # Entrypoint try: if self.config.no_colors: - print("%s\\" % path) + self.logger.print("%s\\" % path) else: - print("\x1b[1;96m%s\x1b[0m\\" % path) + self.logger.print("\x1b[1;96m%s\x1b[0m\\" % path) recurse_action( base_dir=self.smb_cwd, path=[path], prompt=[""] ) except (BrokenPipeError, KeyboardInterrupt) as e: - print("[!] Interrupted.") + self.logger.error("Interrupted.") self.close_smb_session() self.init_smb_session() @@ -1182,14 +1178,13 @@ def umount(self, local_mount_point): else: command = None - print("[!] Unsupported platform for unmounting SMB share.") + self.logger.error("Unsupported platform for unmounting SMB share.") if command is not None: - if self.config.debug: - print("[debug] Executing: %s" % command) + self.logger.debug("Executing: %s" % command) os.system(command) else: - print("[!] Cannot unmount a non existing path.") + self.logger.error("Cannot unmount a non existing path.") # Other functions @@ -1254,7 +1249,7 @@ def set_share(self, shareName): # Connects the tree self.smb_tree_id = self.smbClient.connectTree(self.smb_share) else: - print("[!] Could not set share '%s', it does not exist remotely." % shareName) + self.logger.error("Could not set share '%s', it does not exist remotely." % shareName) else: self.smb_share = None @@ -1299,4 +1294,4 @@ def set_cwd(self, path=None): self.smb_cwd = ntpath.normpath(path) else: # Path does not exists or is not a directory on the remote - print("[!] Remote directory '%s' does not exist." % path) + self.logger.error("Remote directory '%s' does not exist." % path) diff --git a/smbclientng/core/SessionsManager.py b/smbclientng/core/SessionsManager.py index 9ed070f..8165b14 100644 --- a/smbclientng/core/SessionsManager.py +++ b/smbclientng/core/SessionsManager.py @@ -156,10 +156,10 @@ def process_command_line(self, arguments): if options.action == "interact": if options.session_id is not None: if options.session_id in self.sessions.keys(): - print("[+] Switching to session #%d" % options.session_id) + self.logger.info("Switching to session #%d" % options.session_id) self.switch_session(session_id=options.session_id) else: - print("[!] No session with id #%d" % options.session_id) + self.logger.error("No session with id #%d" % options.session_id) # elif options.action == "create": @@ -183,19 +183,19 @@ def process_command_line(self, arguments): if options.session_id is not None: for session_id in options.session_id: if session_id in self.sessions.keys(): - print("[+] Closing and deleting session #%d" % session_id) + self.logger.info("Closing and deleting session #%d" % session_id) self.delete_session(session_id=session_id) else: - print("[!] No session with id #%d" % session_id) + self.logger.error("No session with id #%d" % session_id) # elif options.action == "execute": if options.command is not None: if options.session_id is not None: if options.session_id in self.sessions.keys(): - print("[+] Executing '%s to session #%d" % (options.command, options.session_id)) + self.logger.info("Executing '%s to session #%d" % (options.command, options.session_id)) else: - print("[!] No session with id #%d" % options.session_id) + self.logger.error("No session with id #%d" % options.session_id) elif options.all == True: for session_id in self.sessions.keys(): pass From dfa66afe607d131ce321a958d3c6cefd5b309792 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 24 Jun 2024 17:59:59 +0200 Subject: [PATCH 6/6] Finished Logger class, fixes #56 --- smbclientng/core/Logger.py | 45 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/smbclientng/core/Logger.py b/smbclientng/core/Logger.py index 32008f1..71e75c3 100644 --- a/smbclientng/core/Logger.py +++ b/smbclientng/core/Logger.py @@ -50,6 +50,15 @@ def __init__(self, config, logfile=None): open(self.logfile, "w").close() def print(self, message=""): + """ + Prints a message to stdout and logs it to a file if logging is enabled. + + This method prints the provided message to the standard output and also logs it to a file if a log file path is specified during the Logger instance initialization. The message can include color codes for color-coded output, which can be disabled by setting the `nocolors` attribute to True. + + Args: + message (str): The message to be printed and logged. + """ + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) if self.config.no_colors: print(nocolor_message) @@ -58,6 +67,15 @@ def print(self, message=""): self.__write_to_logfile(nocolor_message) def info(self, message): + """ + Logs a message at the INFO level. + + This method logs the provided message at the INFO level. The message can include color codes for color-coded output, which can be disabled by setting the `nocolors` attribute to True. The message is also logged to a file if a log file path is specified during the Logger instance initialization. + + Args: + message (str): The message to be logged at the INFO level. + """ + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) if self.config.no_colors: print("[info] %s" % nocolor_message) @@ -66,6 +84,15 @@ def info(self, message): self.__write_to_logfile("[info] %s" % nocolor_message) def debug(self, message): + """ + Logs a message at the DEBUG level if debugging is enabled. + + This method logs the provided message at the DEBUG level if the `debug` attribute is set to True during the Logger instance initialization. The message can include color codes for color-coded output, which can be disabled by setting the `nocolors` attribute to True. + + Args: + message (str): The message to be logged. + """ + if self.config.debug == True: nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) if self.config.no_colors: @@ -75,6 +102,15 @@ def debug(self, message): self.__write_to_logfile("[debug] %s" % nocolor_message) def error(self, message): + """ + Logs an error message to the console and the log file. + + This method logs the provided error message to the standard error output and also logs it to a file if a log file path is specified during the Logger instance initialization. The message can include color codes for color-coded output, which can be disabled by setting the `nocolors` attribute to True. + + Args: + message (str): The error message to be logged. + """ + nocolor_message = re.sub(r"\x1b[\[]([0-9;]+)m", "", message) if self.config.no_colors: print("[error] %s" % nocolor_message) @@ -83,6 +119,15 @@ def error(self, message): self.__write_to_logfile("[error] %s" % nocolor_message) def __write_to_logfile(self, message): + """ + Writes the provided message to the log file specified during Logger instance initialization. + + This method appends the provided message to the log file specified by the `logfile` attribute. If no log file path is specified, this method does nothing. + + Args: + message (str): The message to be written to the log file. + """ + if self.logfile is not None: f = open(self.logfile, "a") f.write(message + "\n")