From c7a318ee44940429c2d64e3cb759b26ab897b876 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Mon, 17 Jun 2024 21:12:00 +0200 Subject: [PATCH 1/9] Implemented shlex argument parsing and quotation on use command --- smbclientng/core/CommandCompleter.py | 38 ++-- smbclientng/core/InteractiveShell.py | 298 ++++++++++++++++----------- 2 files changed, 199 insertions(+), 137 deletions(-) diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index 280020e..aa358fc 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -7,6 +7,7 @@ import ntpath import os +import shlex class CommandCompleter(object): @@ -319,18 +320,24 @@ def complete(self, text, state): if command in self.commands.keys(): if command == "use": # Choose SMB Share to connect to - self.matches = [ - command + " " + s.lower() - for s in self.smbSession.list_shares().keys() - if s.lower().startswith(remainder.lower()) - ] + shares = self.smbSession.list_shares() + matching_entries = [] + for sharename in shares.keys(): + if sharename.lower().startswith(remainder.lower()): + matching_entries.append(shares[sharename]["name"]) + + # Add quoting for shlex + matching_entries = [shlex.quote(s) for s in matching_entries] + + # Final matches + self.matches = [command + " " + m for m in matching_entries] elif command in ["cd", "dir", "ls", "mkdir", "rmdir", "tree"]: # Choose remote directory path = "" if '\\' in remainder.strip() or '/' in remainder.strip(): - path = remainder.strip().replace('/', ntpath.sep) - path = ntpath.sep.join(path.split(ntpath.sep)[:-1]) + path = remainder.strip().replace(ntpath.sep, '/') + path = '/'.join(path.split('/')[:-1]) directory_contents = self.smbSession.list_contents(path=path).items() @@ -338,9 +345,12 @@ def complete(self, text, state): for _, entry in directory_contents: if entry.is_directory() and entry.get_longname() not in [".",".."]: if len(path) != 0: - matching_entries.append(path + ntpath.sep + entry.get_longname() + ntpath.sep) + matching_entries.append(path + '/' + entry.get_longname() + '/') else: - matching_entries.append(entry.get_longname() + ntpath.sep) + matching_entries.append(entry.get_longname() + '/') + + # Add quoting for shlex + matching_entries = [shlex.quote(s) for s in matching_entries] self.matches = [ command + " " + s @@ -352,8 +362,8 @@ def complete(self, text, state): # Choose local files and directories path = "" if '\\' in remainder.strip() or '/' in remainder.strip(): - path = remainder.strip().replace('/', ntpath.sep) - path = ntpath.sep.join(path.split(ntpath.sep)[:-1]) + path = remainder.strip().replace(ntpath.sep, '/') + path = '/'.join(path.split('/')[:-1]) directory_contents = self.smbSession.list_contents(path=path).items() @@ -362,12 +372,12 @@ def complete(self, text, state): if entry.get_longname() not in [".",".."]: if len(path) != 0: if entry.is_directory(): - matching_entries.append(path + ntpath.sep + entry.get_longname() + ntpath.sep) + matching_entries.append(path + '/' + entry.get_longname() + '/') else: - matching_entries.append(path + ntpath.sep + entry.get_longname()) + matching_entries.append(path + '/' + entry.get_longname()) else: if entry.is_directory(): - matching_entries.append(entry.get_longname() + ntpath.sep) + matching_entries.append(entry.get_longname() + '/') else: matching_entries.append(entry.get_longname()) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index ef13be2..df261fd 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -13,6 +13,7 @@ import os import readline import shutil +import shlex import sys import traceback from rich.console import Console @@ -93,22 +94,31 @@ def run(self): running = True while running: try: - user_input = input(self.__prompt()).strip().split(" ") - command, arguments = user_input[0].lower(), user_input[1:] + user_input = input(self.__prompt()).strip() + tokens = shlex.split(user_input) + + if len(tokens) == 0: + command = "" + arguments = [] + elif len(tokens) == 1: + command = tokens[0].lower() + arguments = [] + else: + command = tokens[0].lower() + arguments = tokens[1:] # Exit the command line if command == "exit": running = False - + # Skip + elif command.strip() == "": + pass + # Execute the command elif command in self.commandCompleterObject.commands.keys(): self.process_command( command=command, arguments=arguments ) - - elif command.strip() == "": - pass - # Fallback to unknown command else: print("Unknown command. Type \"help\" for help.") @@ -262,7 +272,8 @@ def process_command(self, command, arguments=[]): def command_debug(self, arguments, command): try: - pass + print("[debug] command = '%s'" % command) + print("[debug] arguments = %s" % arguments) except Exception as e: traceback.print_exc() @@ -274,26 +285,28 @@ def command_bat(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) - try: - rawcontents = self.smbSession.read_file(path=path) - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - lexer = Syntax.guess_lexer(path=ntpath.basename(path), code=filecontent) - # Some trickery for the files undetected by the lexer - if lexer == "default": - if '' in filecontent: - lexer = "html" - syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) - Console().print(syntax) - else: - print("[!] Could not detect charset of '%s'." % path) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + for path_to_file in arguments: + print("[>] %s" % path_to_file) + # Read the file + try: + rawcontents = self.smbSession.read_file(path=path_to_file) + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + lexer = Syntax.guess_lexer(path=ntpath.basename(path_to_file), code=filecontent) + # Some trickery for the files undetected by the lexer + if lexer == "default": + if '' in filecontent: + lexer = "html" + syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) + Console().print(syntax) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @@ -303,9 +316,8 @@ def command_cd(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) try: - self.smbSession.set_cwd(path=path) + self.smbSession.set_cwd(path=arguments[0]) except impacket.smbconnection.SessionError as e: print("[!] SMB Error: %s" % e) @@ -317,18 +329,20 @@ def command_cat(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) - try: - rawcontents = self.smbSession.read_file(path=path) - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - print(filecontent) - else: - print("[!] Could not detect charset of '%s'." % path) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + for path_to_file in arguments: + print("[>] %s" % path_to_file) + # Read the file + try: + rawcontents = self.smbSession.read_file(path=path_to_file) + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + print(filecontent) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) def command_close(self, arguments, command): # Command arguments required : No @@ -395,36 +409,74 @@ def command_info(self, arguments, command): except impacket.smbconnection.SessionError as e: print("[!] SMB Error: %s" % e) + @command_arguments_required + def command_lbat(self, arguments, command): + # Command arguments required : Yes + # Active SMB connection needed : No + # SMB share needed : No + + for path_to_file in arguments: + print("[>] %s" % path_to_file) + # Read the file + try: + if os.path.exists(path=path_to_file): + f = open(path_to_file, 'rb') + rawcontents = f.read() + # + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + lexer = Syntax.guess_lexer(path=ntpath.basename(path_to_file), code=filecontent) + # Some trickery for the files undetected by the lexer + if lexer == "default": + if '' in filecontent: + lexer = "html" + syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) + Console().print(syntax) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + else: + print("[!] Local file '%s' does not exist." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) + @command_arguments_required def command_lcat(self, arguments, command): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No - path = ' '.join(arguments) - try: - if os.path.exists(path=path): - f = open(path, 'rb') - rawcontents = f.read() - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - print(filecontent) - else: - print("[!] Could not detect charset of '%s'." % path) - else: - print("[!] Local file '%s' does not exist." % path) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + for path_to_file in arguments: + print("[>] %s" % path_to_file) + # Read the file + try: + if os.path.exists(path=path_to_file): + f = open(path_to_file, 'rb') + rawcontents = f.read() + # + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + print(filecontent) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + else: + print("[!] Local file '%s' does not exist." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) @command_arguments_required def command_lcd(self, arguments, command): # Command arguments required : Yes # Active SMB connection needed : No # SMB share needed : No + + path = arguments[0] - path = ' '.join(arguments) if os.path.exists(path=path): if os.path.isdir(s=path): os.chdir(path=path) @@ -452,37 +504,6 @@ def command_lcp(self, arguments, command): else: self.commandCompleterObject.print_help(command=command) - @command_arguments_required - def command_lbat(self, arguments, command): - # Command arguments required : Yes - # Active SMB connection needed : No - # SMB share needed : No - - path = ' '.join(arguments) - try: - if os.path.exists(path=path): - f = open(path, 'rb') - rawcontents = f.read() - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - lexer = Syntax.guess_lexer(path=ntpath.basename(path), code=filecontent) - # Some trickery for the files undetected by the lexer - if lexer == "default": - if '' in filecontent: - lexer = "html" - syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) - Console().print(syntax) - else: - print("[!] Could not detect charset of '%s'." % path) - else: - print("[!] Local file '%s' does not exist." % path) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) - def command_lls(self, arguments, command): # Command arguments required : No # Active SMB connection needed : No @@ -568,7 +589,8 @@ def command_lrm(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - path = ' '.join(arguments) + path = arguments[0] + if os.path.exists(path): if not os.path.isdir(s=path): try: @@ -586,7 +608,11 @@ def command_lrmdir(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - path = ' '.join(arguments) + if len(arguments) == 0: + path = '.' + else: + path = arguments[0] + if os.path.exists(path): if os.path.isdir(s=path): try: @@ -603,10 +629,15 @@ def command_ltree(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No + if len(arguments) == 0: + path = '.' + else: + path = arguments[0] + if len(arguments) == 0: local_tree(path='.', config=self.config) else: - local_tree(path=' '.join(arguments), config=self.config) + local_tree(path=path, config=self.config) @active_smb_connection_needed @smb_share_is_set @@ -615,8 +646,13 @@ def command_ls(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes + if len(arguments) == 0: + path = '.' + else: + path = arguments[0] + # Read the files - directory_contents = self.smbSession.list_contents(path=' '.join(arguments)) + directory_contents = self.smbSession.list_contents(path=path) for longname in sorted(directory_contents.keys(), key=lambda x:x.lower()): windows_ls_entry(directory_contents[longname], self.config) @@ -629,8 +665,7 @@ def command_mkdir(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) - self.smbSession.mkdir(path=path) + self.smbSession.mkdir(path=arguments[0]) @command_arguments_required @active_smb_connection_needed @@ -640,7 +675,8 @@ def command_module(self, arguments, command): if module_name in self.modules.keys(): module = self.modules[module_name](self.smbSession, self.config) - module.run(' '.join(arguments[1:])) + arguments_string = ' '.join(arguments[1:]) + module.run(arguments_string) else: print("[!] Module '%s' does not exist." % module_name) @@ -722,19 +758,22 @@ def command_rm(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) - if '*' in path: - self.smbSession.rm(path=path) - elif self.smbSession.path_exists(path): - if self.smbSession.path_isfile(path): - try: - self.smbSession.rm(path=path) - except Exception as e: - print("[!] Error removing file '%s' : %s" % path) + for path_to_file in arguments: + # Wildcard + if '*' in path_to_file: + self.smbSession.rm(path=path_to_file) + # File + elif self.smbSession.path_exists(path_to_file): + if self.smbSession.path_isfile(path_to_file): + try: + self.smbSession.rm(path=path_to_file) + except Exception as e: + print("[!] Error removing file '%s' : %s" % path_to_file) + else: + print("[!] Cannot delete '%s': This is a directory, use 'rmdir ' instead." % path_to_file) + # File does not exist else: - print("[!] Cannot delete '%s': This is a directory, use 'rmdir ' instead." % path) - else: - print("[!] Remote file '%s' does not exist." % path) + print("[!] Remote file '%s' does not exist." % path_to_file) @command_arguments_required @active_smb_connection_needed @@ -744,17 +783,17 @@ def command_rmdir(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - path = ' '.join(arguments) - if self.smbSession.path_exists(path): - if self.smbSession.path_isdir(path): - try: - self.smbSession.rmdir(path=path) - except Exception as e: - print("[!] Error removing directory '%s' : %s" % path) + for path_to_directory in arguments: + if self.smbSession.path_exists(path_to_directory): + if self.smbSession.path_isdir(path_to_directory): + try: + self.smbSession.rmdir(path=path_to_directory) + except Exception as e: + print("[!] Error removing directory '%s' : %s" % path_to_directory) + else: + print("[!] Cannot delete '%s': This is a file, use 'rm ' instead." % path_to_directory) else: - print("[!] Cannot delete '%s': This is a file, use 'rm ' instead." % path) - else: - print("[!] Remote directory '%s' does not exist." % path) + print("[!] Remote directory '%s' does not exist." % path_to_directory) @active_smb_connection_needed @smb_share_is_set @@ -775,13 +814,13 @@ def update(self, entry, fullpath, depth): self.print(end='\r') def print(self, end='\n'): - # + # Directory if self.entry.is_directory(): if self.config.no_colors: path = "%s\\" % self.entry.get_longname() else: path = "\x1b[1;96m%s\x1b[0m\\" % self.entry.get_longname() - # + # File else: if self.config.no_colors: path = "%s" % self.entry.get_longname() @@ -889,7 +928,7 @@ def command_tree(self, arguments, command): if len(arguments) == 0: self.smbSession.tree(path='.') else: - self.smbSession.tree(path=' '.join(arguments)) + self.smbSession.tree(path=arguments[0]) @command_arguments_required @active_smb_connection_needed @@ -913,10 +952,12 @@ def command_use(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : No - sharename = ' '.join(arguments) + sharename = arguments[0] + # Reload the list of shares shares = self.smbSession.list_shares() shares = [s.lower() for s in shares.keys()] + if sharename.lower() in shares: self.smbSession.set_share(sharename) else: @@ -925,6 +966,17 @@ def command_use(self, arguments, command): # Private functions ======================================================= def __load_modules(self): + """ + Dynamically loads all Python modules from the 'modules' directory and stores them in the 'modules' dictionary. + Each module is expected to be a Python file that contains a class with the same name as the file (minus the .py extension). + The class must have at least two attributes: 'name' and 'description'. + + This method clears any previously loaded modules, constructs the path to the modules directory, and iterates over + each file in that directory. If the file is a Python file (ends with .py and is not '__init__.py'), it attempts to + import the module and access the class within it to add to the 'modules' dictionary. + + If debug mode is enabled in the configuration, it prints debug information about the loading process and the loaded modules. + """ self.modules.clear() From c7810657c7603ecb7c6c1309d513c5301a4c58ea Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Tue, 18 Jun 2024 13:13:51 +0200 Subject: [PATCH 2/9] Added autocomplete types on commands --- smbclientng/core/CommandCompleter.py | 108 ++++++++++++++++++--------- 1 file changed, 72 insertions(+), 36 deletions(-) diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index aa358fc..e007f5b 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -32,252 +32,288 @@ class CommandCompleter(object): "Pretty prints the contents of a remote file.", "Syntax: 'bat '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "cat": { "description": [ "Get the contents of a remote file.", "Syntax: 'cat '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "cd": { "description": [ "Change the current working directory.", "Syntax: 'cd '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "close": { "description": [ "Closes the SMB connection to the remote machine.", "Syntax: 'close'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "connect": { "description": [ "Connect to the remote machine (useful if connection timed out).", "Syntax: 'connect'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "debug": { "description": [ "Command for dev debugging.", "Syntax: 'debug'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "dir": { "description": [ "List the contents of the current working directory.", "Syntax: 'dir'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "exit": { "description": [ "Exits the smbclient-ng script.", "Syntax: 'exit'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "get": { "description": [ "Get a remote file.", "Syntax: 'get [-r] '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "help": { "description": [ "Displays this help message.", "Syntax: 'help'" ], - "subcommands": ["format"] + "subcommands": ["format"], + "autocomplete": [] }, "info": { "description": [ "Get information about the server and or the share.", "Syntax: 'info [server|share]'" ], - "subcommands": ["server", "share"] + "subcommands": ["server", "share"], + "autocomplete": [] }, "lbat": { "description": [ "Pretty prints the contents of a local file.", "Syntax: 'lbat '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "lcat": { "description": [ "Print the contents of a local file.", "Syntax: 'lcat '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "lcd": { "description": [ "Changes the current local directory.", "Syntax: 'lcd '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "lcp": { "description": [ "Create a copy of a local file.", "Syntax: 'lcp '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "lls": { "description": [ "Lists the contents of the current local directory.", "Syntax: 'lls'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "lmkdir": { "description": [ "Creates a new local directory.", "Syntax: 'lmkdir '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "lpwd": { "description": [ "Shows the current local directory.", "Syntax: 'lpwd'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "lrename": { "description": [ "Renames a local file.", "Syntax: 'lrename '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "lrm": { "description": [ "Removes a local file.", "Syntax: 'lrm '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "lrmdir": { "description": [ "Removes a local directory.", "Syntax: 'lrmdir '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "ls": { "description": [ "List the contents of the current remote working directory.", "Syntax: 'ls'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "ltree": { "description": [ "Displays a tree view of the local directories.", "Syntax: 'ltree [directory]'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "mkdir": { "description": [ "Creates a new remote directory.", "Syntax: 'mkdir '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "module": { "description": [ "Loads a specific module for additional functionalities.", "Syntax: 'module '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "mount": { "description": [ "Creates a mount point of the remote share on the local machine.", "Syntax: 'mount '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "put": { "description": [ "Put a local file or directory in a remote directory.", "Syntax: 'put [-r] '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "reconnect": { "description": [ "Reconnect to the remote machine (useful if connection timed out).", "Syntax: 'reconnect'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "reset": { "description": [ "Reset the TTY output, useful if it was broken after printing a binary file on stdout.", "Syntax: 'reset'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": [] }, "rmdir": { "description": [ "Removes a remote directory.", "Syntax: 'rmdir '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "rm": { "description": [ "Removes a remote file.", "Syntax: 'rm '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["file"] }, "sizeof": { "description": [ "Recursively compute the size of a folder.", "Syntax: 'sizeof [directory|file]'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "shares": { "description": [ "Lists the SMB shares served by the remote machine.", "Syntax: 'shares'" ], - "subcommands": ["rights"] + "subcommands": ["rights"], + "autocomplete": [] }, "tree": { "description": [ "Displays a tree view of the remote directories.", "Syntax: 'tree [directory]'" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "umount": { "description": [ "Removes a mount point of the remote share on the local machine.", "Syntax: 'umount '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["directory"] }, "use": { "description": [ "Use a SMB share.", "Syntax: 'use '" ], - "subcommands": [] + "subcommands": [], + "autocomplete": ["share"] }, } From bb5cb4493d1e4271036d2354800333060db244f4 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:19:07 +0200 Subject: [PATCH 3/9] Added autocomplete tags to autocomplete categories --- smbclientng/core/CommandCompleter.py | 123 +++++++++++++++++---------- 1 file changed, 76 insertions(+), 47 deletions(-) diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index e007f5b..d5f90c2 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -33,7 +33,7 @@ class CommandCompleter(object): "Syntax: 'bat '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["remote_file"] }, "cat": { "description": [ @@ -41,7 +41,7 @@ class CommandCompleter(object): "Syntax: 'cat '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["remote_file"] }, "cd": { "description": [ @@ -49,7 +49,7 @@ class CommandCompleter(object): "Syntax: 'cd '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "close": { "description": [ @@ -81,7 +81,7 @@ class CommandCompleter(object): "Syntax: 'dir'" ], "subcommands": [], - "autocomplete": [] + "autocomplete": ["remote_directory"] }, "exit": { "description": [ @@ -97,7 +97,7 @@ class CommandCompleter(object): "Syntax: 'get [-r] '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["remote_file"] }, "help": { "description": [ @@ -121,7 +121,7 @@ class CommandCompleter(object): "Syntax: 'lbat '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["local_file"] }, "lcat": { "description": [ @@ -129,7 +129,7 @@ class CommandCompleter(object): "Syntax: 'lcat '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["local_file"] }, "lcd": { "description": [ @@ -137,7 +137,7 @@ class CommandCompleter(object): "Syntax: 'lcd '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["local_directory"] }, "lcp": { "description": [ @@ -145,7 +145,7 @@ class CommandCompleter(object): "Syntax: 'lcp '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["remote_file"] }, "lls": { "description": [ @@ -153,7 +153,7 @@ class CommandCompleter(object): "Syntax: 'lls'" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["local_directory"] }, "lmkdir": { "description": [ @@ -161,7 +161,7 @@ class CommandCompleter(object): "Syntax: 'lmkdir '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["local_directory"] }, "lpwd": { "description": [ @@ -177,7 +177,7 @@ class CommandCompleter(object): "Syntax: 'lrename '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["local_file"] }, "lrm": { "description": [ @@ -185,7 +185,7 @@ class CommandCompleter(object): "Syntax: 'lrm '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["local_file"] }, "lrmdir": { "description": [ @@ -193,7 +193,7 @@ class CommandCompleter(object): "Syntax: 'lrmdir '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["local_directory"] }, "ls": { "description": [ @@ -201,7 +201,7 @@ class CommandCompleter(object): "Syntax: 'ls'" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "ltree": { "description": [ @@ -209,7 +209,7 @@ class CommandCompleter(object): "Syntax: 'ltree [directory]'" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["local_directory"] }, "mkdir": { "description": [ @@ -217,7 +217,7 @@ class CommandCompleter(object): "Syntax: 'mkdir '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "module": { "description": [ @@ -233,7 +233,7 @@ class CommandCompleter(object): "Syntax: 'mount '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "put": { "description": [ @@ -241,7 +241,7 @@ class CommandCompleter(object): "Syntax: 'put [-r] '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["local_file"] }, "reconnect": { "description": [ @@ -265,7 +265,7 @@ class CommandCompleter(object): "Syntax: 'rmdir '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "rm": { "description": [ @@ -273,7 +273,7 @@ class CommandCompleter(object): "Syntax: 'rm '" ], "subcommands": [], - "autocomplete": ["file"] + "autocomplete": ["remote_file"] }, "sizeof": { "description": [ @@ -281,7 +281,7 @@ class CommandCompleter(object): "Syntax: 'sizeof [directory|file]'" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "shares": { "description": [ @@ -297,7 +297,7 @@ class CommandCompleter(object): "Syntax: 'tree [directory]'" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "umount": { "description": [ @@ -305,7 +305,7 @@ class CommandCompleter(object): "Syntax: 'umount '" ], "subcommands": [], - "autocomplete": ["directory"] + "autocomplete": ["remote_directory"] }, "use": { "description": [ @@ -344,7 +344,8 @@ def complete(self, text, state): # No text typed yet, need the list of commands available if len(text) == 0: self.matches = [s for s in self.commands.keys()] - + + # Parsing a command elif len(text) != 0: # This is for the main command if text.count(" ") == 0: @@ -353,8 +354,12 @@ def complete(self, text, state): # This is for subcommands elif text.count(" ") >= 1: command, remainder = text.split(" ", 1) + if command in self.commands.keys(): - if command == "use": + self.matches = [] + + # Autocomplete shares + if "share" in self.commands[command]["autocomplete"]: # Choose SMB Share to connect to shares = self.smbSession.list_shares() matching_entries = [] @@ -366,9 +371,10 @@ def complete(self, text, state): matching_entries = [shlex.quote(s) for s in matching_entries] # Final matches - self.matches = [command + " " + m for m in matching_entries] + self.matches += [command + " " + m for m in matching_entries] - elif command in ["cd", "dir", "ls", "mkdir", "rmdir", "tree"]: + # Autocomplete directory + if "remote_directory" in self.commands[command]["autocomplete"]: # Choose remote directory path = "" if '\\' in remainder.strip() or '/' in remainder.strip(): @@ -388,42 +394,41 @@ def complete(self, text, state): # Add quoting for shlex matching_entries = [shlex.quote(s) for s in matching_entries] - self.matches = [ + self.matches += [ command + " " + s for s in matching_entries if s.lower().startswith(remainder.lower()) ] - elif command in ["bat", "cat", "debug", "get", "rm"]: - # Choose local files and directories + # Autocomplete file + if "remote_file" in self.commands[command]["autocomplete"]: + # Choose remote file path = "" if '\\' in remainder.strip() or '/' in remainder.strip(): path = remainder.strip().replace(ntpath.sep, '/') - path = '/'.join(path.split('/')[:-1]) + path = '/'.join(path.split('/')[:-1]) directory_contents = self.smbSession.list_contents(path=path).items() matching_entries = [] for _, entry in directory_contents: - if entry.get_longname() not in [".",".."]: + if not entry.is_directory() and entry.get_longname() not in [".",".."]: if len(path) != 0: - if entry.is_directory(): - matching_entries.append(path + '/' + entry.get_longname() + '/') - else: - matching_entries.append(path + '/' + entry.get_longname()) + matching_entries.append(path + '/' + entry.get_longname()) else: - if entry.is_directory(): - matching_entries.append(entry.get_longname() + '/') - else: - matching_entries.append(entry.get_longname()) + matching_entries.append(entry.get_longname()) + + # Add quoting for shlex + matching_entries = [shlex.quote(s) for s in matching_entries] - self.matches = [ + self.matches += [ command + " " + s for s in matching_entries if s.lower().startswith(remainder.lower()) ] - elif command in ["lcd", "lcp", "lls", "lrm", "put", "lmkdir", "lrm", "lrmdir"]: + # Autocomplete local_directory + if "local_directory" in self.commands[command]["autocomplete"]: # Choose directory path = "" if os.path.sep in remainder.strip(): @@ -441,18 +446,42 @@ def complete(self, text, state): entry_path = path + os.path.sep + entry if os.path.isdir(entry_path): matching_entries.append(entry_path + os.path.sep) - else: - matching_entries.append(entry_path) - self.matches = [ + self.matches += [ command + " " + s for s in matching_entries if s.startswith(remainder) ] + + # Autocomplete local_file + if "local_file" in self.commands[command]["autocomplete"]: + # Choose file + path = "" + if os.path.sep in remainder.strip(): + path = path.split(os.path.sep)[:-1] + path = os.path.sep.join(path) + # Current dir + if len(path.strip()) == 0: + path = "." + + directory_contents = os.listdir(path=path + os.path.sep) + matching_entries = [] + for entry in directory_contents: + if entry not in [".",".."]: + entry_path = path + os.path.sep + entry + if not os.path.isdir(entry_path): + matching_entries.append(entry_path) + + self.matches += [ + command + " " + s + for s in matching_entries + if s.startswith(remainder) + ] + else: # Generic case for subcommands - self.matches = [ + self.matches += [ command + " " + s for s in self.commands[command]["subcommands"] if s.startswith(remainder) From 6a4f667ef811a7dec9a48a51d86b7e2227b80a98 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 10:22:09 +0200 Subject: [PATCH 4/9] Added multiple arguments to put command --- smbclientng/core/InteractiveShell.py | 41 ++++++++++++++++++++-------- smbclientng/core/SMBSession.py | 12 ++++++-- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index df261fd..f5decb5 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -712,20 +712,37 @@ def command_put(self, arguments, command): # Command arguments required : Yes # Active SMB connection needed : Yes # SMB share needed : Yes + + is_recursive = False + while '-r' in arguments: + is_recursive = True + arguments.remove('-r') + print("is_recursive: %s" % is_recursive) + + # Parse wildcards + files_and_directories = [] + for argument in arguments: + if argument.endswith('*'): + path = argument.rstrip('*') + if os.path.sep in path: + path = "." + os.path.sep + path + for entry in os.listdir(path=path): + files_and_directories.append(path.rstrip(os.path.sep) + os.path.sep + entry) + else: + files_and_directories.append(argument) - # Put files recursively - if arguments[0] == "-r": - localpath = ' '.join(arguments[1:]) - try: - self.smbSession.put_file_recursively(localpath=localpath) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) - - # Put a single file - else: - localpath = ' '.join(arguments) + # + for localpath in files_and_directories: try: - self.smbSession.put_file(localpath=localpath) + print(localpath) + if is_recursive and os.path.isdir(s=localpath): + # Put files recursively + print("Put files recursively") + self.smbSession.put_file_recursively(localpath=localpath) + else: + # Put this single file + print("Put this single file") + self.smbSession.put_file(localpath=localpath) except impacket.smbconnection.SessionError as e: print("[!] SMB Error: %s" % e) diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index 9378d70..c61ab19 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -781,9 +781,17 @@ def put_file(self, localpath=None): # Parse path localpath = localpath.replace('/', os.path.sep) if os.path.sep in localpath: - tmp_search_path = os.path.normpath(os.getcwd() + os.path.sep + os.path.dirname(localpath)) + if localpath.startswith(os.path.sep): + # Absolute path + tmp_search_path = os.path.normpath(localpath) + else: + # Relative path + tmp_search_path = os.path.normpath(os.getcwd() + os.path.sep + os.path.dirname(localpath)) else: tmp_search_path = os.path.normpath(os.getcwd() + os.path.sep) + + print(localpath) + # Parse filename filename = os.path.basename(localpath) @@ -847,7 +855,7 @@ def put_file_recursively(self, localpath=None): """ if os.path.exists(localpath): - if os.path.isfile(localpath): + if os.path.isdir(localpath): # Iterate over all files and directories within the local path local_files = {} for root, dirs, files in os.walk(localpath): From b237aa008aa3425136aae29b084b09aeeedcbf28 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 10:33:27 +0200 Subject: [PATCH 5/9] Fixed recursive put --- smbclientng/core/LocalFileIO.py | 44 +++++++++++++++++++++++---------- smbclientng/core/SMBSession.py | 2 +- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/smbclientng/core/LocalFileIO.py b/smbclientng/core/LocalFileIO.py index 77acf15..dd251e1 100644 --- a/smbclientng/core/LocalFileIO.py +++ b/smbclientng/core/LocalFileIO.py @@ -51,7 +51,10 @@ def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, de if self.debug: print("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) - self.fd = open(self.dir + os.path.sep + os.path.basename(self.path), self.mode) + try: + self.fd = open(self.dir + os.path.sep + os.path.basename(self.path), self.mode) + except PermissionError as err: + self.fd = None # Write to remote (read local) elif self.mode in ["rb"]: @@ -60,10 +63,15 @@ def __init__(self, mode, path=None, expected_size=None, keepRemotePath=False, de if self.debug: print("[debug] Openning local '%s' with mode '%s'" % (self.path, self.mode)) - self.fd = open(self.path, self.mode) + + try: + self.fd = open(self.path, self.mode) + except PermissionError as err: + self.fd = None - if self.expected_size is None: - self.expected_size = os.path.getsize(filename=self.path) + if self.fd is not None: + if self.expected_size is None: + self.expected_size = os.path.getsize(filename=self.path) # Create progress bar if self.expected_size is not None: @@ -99,9 +107,12 @@ def write(self, data): int: The number of bytes written. """ - if self.expected_size is not None: - self.__progress.update(self.__task, advance=len(data)) - return self.fd.write(data) + if self.fd is not None: + if self.expected_size is not None: + self.__progress.update(self.__task, advance=len(data)) + return self.fd.write(data) + else: + return 0 def read(self, size): """ @@ -116,10 +127,13 @@ def read(self, size): bytes: The data read from the file. """ - read_data = self.fd.read(size) - if self.expected_size is not None: - self.__progress.update(self.__task, advance=len(read_data)) - return read_data + if self.fd is not None: + read_data = self.fd.read(size) + if self.expected_size is not None: + self.__progress.update(self.__task, advance=len(read_data)) + return read_data + else: + return b"" def close(self, remove=False): """ @@ -132,10 +146,14 @@ def close(self, remove=False): remove (bool): If True, the file at the path will be removed after closing the file descriptor. """ - self.fd.close() + if self.fd is not None: + self.fd.close() if remove: - os.remove(path=self.path) + try: + os.remove(path=self.path) + except (PermissionError, FileNotFoundError) as err: + pass if self.expected_size is not None: self.__progress.stop() diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index c61ab19..a5f54a8 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -886,7 +886,7 @@ def put_file_recursively(self, localpath=None): ) f.close() - except BrokenPipeError as err: + except (BrokenPipeError, PermissionError) as err: f.set_error(message="[bold red]Failed uploading '%s': %s" % (f.path, err)) f.close(remove=True) break From 14e5820c73bbfe02565fff9edc1d8d5b9c7fcda6 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 12:22:29 +0200 Subject: [PATCH 6/9] Implemented multifiles and wildcards in get and put, fixes #18 --- smbclientng/core/InteractiveShell.py | 43 +++++++-------- smbclientng/core/utils.py | 79 +++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 26 deletions(-) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index f5decb5..14955d5 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -20,7 +20,7 @@ from rich.table import Table from rich.syntax import Syntax from smbclientng.core.CommandCompleter import CommandCompleter -from smbclientng.core.utils import b_filesize, unix_permissions, windows_ls_entry, local_tree +from smbclientng.core.utils import b_filesize, unix_permissions, windows_ls_entry, local_tree, resolve_local_files, resolve_remote_files ## Decorators @@ -361,18 +361,23 @@ def command_get(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - # Get files recursively - if arguments[0] == "-r": - path = ' '.join(arguments[1:]).replace('/', ntpath.sep) - try: - self.smbSession.get_file_recursively(path=path) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) - # Get a single file - else: - path = ' '.join(arguments).replace('/', ntpath.sep) + is_recursive = False + while '-r' in arguments: + is_recursive = True + arguments.remove('-r') + + # Parse wildcards + files_and_directories = resolve_remote_files(self.smbSession, arguments) + + # + for remotepath in files_and_directories: try: - self.smbSession.get_file(path=path) + if is_recursive and self.smbSession.path_isdir(remotepath): + # Get files recursively + self.smbSession.get_file_recursively(path=remotepath) + else: + # Get this single file + self.smbSession.get_file(path=remotepath) except impacket.smbconnection.SessionError as e: print("[!] SMB Error: %s" % e) @@ -717,19 +722,9 @@ def command_put(self, arguments, command): while '-r' in arguments: is_recursive = True arguments.remove('-r') - print("is_recursive: %s" % is_recursive) # Parse wildcards - files_and_directories = [] - for argument in arguments: - if argument.endswith('*'): - path = argument.rstrip('*') - if os.path.sep in path: - path = "." + os.path.sep + path - for entry in os.listdir(path=path): - files_and_directories.append(path.rstrip(os.path.sep) + os.path.sep + entry) - else: - files_and_directories.append(argument) + files_and_directories = resolve_local_files(arguments) # for localpath in files_and_directories: @@ -737,11 +732,9 @@ def command_put(self, arguments, command): print(localpath) if is_recursive and os.path.isdir(s=localpath): # Put files recursively - print("Put files recursively") self.smbSession.put_file_recursively(localpath=localpath) else: # Put this single file - print("Put this single file") self.smbSession.put_file(localpath=localpath) except impacket.smbconnection.SessionError as e: print("[!] SMB Error: %s" % e) diff --git a/smbclientng/core/utils.py b/smbclientng/core/utils.py index d5b15ad..e50b06c 100644 --- a/smbclientng/core/utils.py +++ b/smbclientng/core/utils.py @@ -6,8 +6,9 @@ import datetime -import os +import fnmatch import ntpath +import os import re import stat @@ -322,3 +323,79 @@ def recurse_action(base_dir="", path=[], prompt=[]): except (BrokenPipeError, KeyboardInterrupt) as e: print("[!] Interrupted.") + +def resolve_local_files(arguments): + """ + Resolves local file paths based on the provided arguments. + + This function takes a list of arguments, which can include wildcard patterns, and resolves them to actual file paths. + If an argument contains a wildcard ('*'), it attempts to match files in the specified directory against the pattern. + If the argument does not contain a wildcard, it is added to the list of resolved files as is. + + Args: + arguments (list): A list of file path arguments, which may include wildcard patterns. + + Returns: + list: A list of resolved file paths that match the provided arguments. + """ + + resolved_files = [] + for arg in arguments: + if '*' in arg: + try: + path = os.path.dirname(arg) or '.' + pattern = os.path.basename(arg) + for entry in os.listdir(path): + if fnmatch.fnmatch(entry, pattern): + resolved_files.append(os.path.join(path, entry)) + except FileNotFoundError as err: + pass + else: + resolved_files.append(arg) + resolved_files = sorted(list(set(resolved_files))) + return resolved_files + + +def resolve_remote_files(smbSession, arguments): + """ + Resolves remote file paths based on the provided arguments using an SMB session. + + This function takes a list of arguments, which can include wildcard patterns, and resolves them to actual remote file paths. + If an argument contains a wildcard ('*'), it attempts to match files in the specified remote directory against the pattern. + If the argument does not contain a wildcard, it is added to the list of resolved files as is. + + Args: + smbsession (SMBSession): The SMB session through which to access the files. + arguments (list): A list of file path arguments, which may include wildcard patterns. + + Returns: + list: A list of resolved remote file paths that match the provided arguments. + """ + + resolved_files = [] + for arg in arguments: + if '*' in arg: + if arg == '*': + path = smbSession.smb_cwd + elif arg.startswith(ntpath.sep): + path = ntpath.dirname(arg) + else: + path = ntpath.normpath(smbSession.smb_cwd + ntpath.sep + ntpath.dirname(arg)) + + try: + contents = smbSession.smbClient.listPath( + shareName=smbSession.smb_share, + path=path + ntpath.sep + '*' + ) + contents = [e for e in contents if e.get_longname() not in ['.', '..']] + + for entry in contents: + if fnmatch.fnmatch(entry.get_longname(), ntpath.basename(arg)): + resolved_files.append(ntpath.join(path, entry.get_longname())) + + except Exception as err: + pass + else: + resolved_files.append(arg) + resolved_files = sorted(list(set(resolved_files))) + return resolved_files From 421be0967c10fa41c1da3da7fec646f8b83853a6 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 13:08:58 +0200 Subject: [PATCH 7/9] Fixed check of file existence in remote --- smbclientng/core/InteractiveShell.py | 98 ++++++++++++++++------------ smbclientng/core/SMBSession.py | 74 ++++++++++++++++----- 2 files changed, 114 insertions(+), 58 deletions(-) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 14955d5..33bce9f 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -285,28 +285,33 @@ def command_bat(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - for path_to_file in arguments: - print("[>] %s" % path_to_file) - # Read the file - try: - rawcontents = self.smbSession.read_file(path=path_to_file) - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - lexer = Syntax.guess_lexer(path=ntpath.basename(path_to_file), code=filecontent) - # Some trickery for the files undetected by the lexer - if lexer == "default": - if '' in filecontent: - lexer = "html" - syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) - Console().print(syntax) - else: - print("[!] Could not detect charset of '%s'." % path_to_file) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + # Parse wildcards + files_and_directories = resolve_local_files(arguments) + + for path_to_file in files_and_directories: + if self.smbSession.path_isfile(pathFromRoot=path_to_file): + # Read the file + try: + rawcontents = self.smbSession.read_file(path=path_to_file) + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + lexer = Syntax.guess_lexer(path=ntpath.basename(path_to_file), code=filecontent) + # Some trickery for the files undetected by the lexer + if lexer == "default": + if '' in filecontent: + lexer = "html" + syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) + if len(files_and_directories) > 1: + print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + Console().print(syntax) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) @command_arguments_required @active_smb_connection_needed @@ -329,20 +334,25 @@ def command_cat(self, arguments, command): # Active SMB connection needed : Yes # SMB share needed : Yes - for path_to_file in arguments: - print("[>] %s" % path_to_file) - # Read the file - try: - rawcontents = self.smbSession.read_file(path=path_to_file) - if rawcontents is not None: - encoding = charset_normalizer.detect(rawcontents)["encoding"] - if encoding is not None: - filecontent = rawcontents.decode(encoding).rstrip() - print(filecontent) - else: - print("[!] Could not detect charset of '%s'." % path_to_file) - except impacket.smbconnection.SessionError as e: - print("[!] SMB Error: %s" % e) + # Parse wildcards + files_and_directories = resolve_local_files(arguments) + + for path_to_file in files_and_directories: + if self.smbSession.path_isfile(pathFromRoot=path_to_file): + # Read the file + try: + rawcontents = self.smbSession.read_file(path=path_to_file) + if rawcontents is not None: + encoding = charset_normalizer.detect(rawcontents)["encoding"] + if encoding is not None: + filecontent = rawcontents.decode(encoding).rstrip() + if len(files_and_directories) > 1: + print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) + print(filecontent) + else: + print("[!] Could not detect charset of '%s'." % path_to_file) + except impacket.smbconnection.SessionError as e: + print("[!] SMB Error: %s" % e) def command_close(self, arguments, command): # Command arguments required : No @@ -420,8 +430,10 @@ def command_lbat(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - for path_to_file in arguments: - print("[>] %s" % path_to_file) + # Parse wildcards + files_and_directories = resolve_remote_files(self.smbSession, arguments) + + for path_to_file in files_and_directories: # Read the file try: if os.path.exists(path=path_to_file): @@ -440,6 +452,8 @@ def command_lbat(self, arguments, command): elif '' in filecontent: lexer = "html" syntax = Syntax(code=filecontent, line_numbers=True, lexer=lexer) + if len(files_and_directories) > 1: + print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) Console().print(syntax) else: print("[!] Could not detect charset of '%s'." % path_to_file) @@ -454,8 +468,10 @@ def command_lcat(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - for path_to_file in arguments: - print("[>] %s" % path_to_file) + # Parse wildcards + files_and_directories = resolve_remote_files(self.smbSession, arguments) + + for path_to_file in files_and_directories: # Read the file try: if os.path.exists(path=path_to_file): @@ -466,6 +482,8 @@ def command_lcat(self, arguments, command): encoding = charset_normalizer.detect(rawcontents)["encoding"] if encoding is not None: filecontent = rawcontents.decode(encoding).rstrip() + if len(files_and_directories) > 1: + print("\x1b[1;93m[>] %s\x1b[0m" % (path_to_file+' ').ljust(80,'=')) print(filecontent) else: print("[!] Could not detect charset of '%s'." % path_to_file) diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index a5f54a8..5ecdd71 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -170,12 +170,29 @@ def close_smb_session(self): # Operations def read_file(self, path=None): - if self.path_isfile(path=path): - tmp_file_path = self.smb_cwd + ntpath.sep + path - matches = self.smbClient.listPath( - shareName=self.smb_share, - path=tmp_file_path - ) + """ + Reads a file from the SMB share. + + This method attempts to read the contents of a file specified by the `path` parameter from the SMB share. + It constructs the full path to the file, checks if the path is a valid file, and then reads the file content + into a byte stream which is returned to the caller. + + Args: + path (str, optional): The path of the file to be read from the SMB share. Defaults to None. + + Returns: + bytes: The content of the file as a byte stream, or None if the file does not exist or an error occurs. + """ + + if self.path_isfile(pathFromRoot=path): + path = path.replace('/', ntpath.sep) + if path.startswith(ntpath.sep): + # Absolute path + tmp_file_path = ntpath.normpath(path) + else: + # Relative path + tmp_file_path = ntpath.normpath(self.smb_cwd + ntpath.sep + path) + tmp_file_path = tmp_file_path.lstrip(ntpath.sep) fh = io.BytesIO() try: @@ -188,9 +205,25 @@ def read_file(self, path=None): fh.close() return rawdata else: - print("[!] Remote path '%s' is not a file." % path) + return None def find(self, paths=[], callback=None): + """ + Finds files and directories on the SMB share based on the provided paths and executes a callback function on each entry. + + This method traverses the specified paths on the SMB share, recursively exploring directories and invoking the callback + function on each file or directory found. The callback function is called with three arguments: the entry object, the + full path of the entry, and the current depth of recursion. + + Args: + paths (list, optional): A list of paths to start the search from. Defaults to an empty list. + callback (function, optional): A function to be called on each entry found. The function should accept three arguments: + the entry object, the full path of the entry, and the current depth of recursion. Defaults to None. + + Note: + If the callback function is None, the method will print an error message and return without performing any action. + """ + def recurse_action(paths=[], depth=0, callback=None): if callback is None: return [] @@ -660,6 +693,7 @@ def path_exists(self, path=None): if path is not None: path = path.replace('*','') + path = path.replace('/', ntpath.sep) try: contents = self.smbClient.listPath( shareName=self.smb_share, @@ -685,12 +719,11 @@ def path_isdir(self, pathFromRoot=None): bool: True if the path is a directory, False otherwise or if an error occurs. """ - if pathFromRoot is not None: - # Replace slashes if any - path = pathFromRoot.replace('/', ntpath.sep) - + if pathFromRoot is not None: # Strip wildcards to avoid injections - path = path.replace('*','') + path = pathFromRoot.replace('*','') + # Replace slashes if any + path = path.replace('/', ntpath.sep) # Normalize path and strip leading backslash path = ntpath.normpath(path + ntpath.sep).lstrip(ntpath.sep) @@ -715,7 +748,7 @@ def path_isdir(self, pathFromRoot=None): else: return False - def path_isfile(self, path=None): + def path_isfile(self, pathFromRoot=None): """ Checks if the specified path is a file on the SMB share. @@ -729,14 +762,19 @@ def path_isfile(self, path=None): bool: True if the path is a file, False otherwise or if an error occurs. """ - if path is not None: - path = path.replace('*','') - search_dir = ntpath.normpath(self.smb_cwd + ntpath.sep + path) - search_dir = ntpath.dirname(search_dir) + ntpath.sep + '*' + if pathFromRoot is not None: + # Strip wildcards to avoid injections + path = pathFromRoot.replace('*','') + # Replace slashes if any + path = path.replace('/', ntpath.sep) + + # Normalize path and strip leading backslash + path = ntpath.normpath(path + ntpath.sep).lstrip(ntpath.sep) + try: contents = self.smbClient.listPath( shareName=self.smb_share, - path=search_dir + path=ntpath.dirname(path) + ntpath.sep + '*' ) # Filter on files contents = [ From 059254551f33d172a7c2c420f8be856002fb903a Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 13:25:15 +0200 Subject: [PATCH 8/9] Fixed #18 --- smbclientng/core/InteractiveShell.py | 114 +++++++++++++++------------ smbclientng/core/SMBSession.py | 6 +- 2 files changed, 69 insertions(+), 51 deletions(-) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 33bce9f..9c98880 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -533,40 +533,44 @@ def command_lls(self, arguments, command): # SMB share needed : No if len(arguments) == 0: - path = '.' + arguments = ['.'] else: - path = ' '.join(arguments) - - # lls - if os.path.isdir(path): - directory_contents = os.listdir(path=path) - for entryname in sorted(directory_contents): - path_to_file = path + os.path.sep + entryname - rights_str = unix_permissions(path_to_file) - size_str = b_filesize(os.path.getsize(filename=path_to_file)) - date_str = datetime.datetime.fromtimestamp(os.path.getmtime(filename=path_to_file)).strftime("%Y-%m-%d %H:%M") - - if os.path.isdir(s=entryname): - if self.config.no_colors: - print("%s %10s %s %s%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) + arguments = resolve_local_files(arguments) + + for path in arguments: + if len(arguments) > 1: + print("%s:" % path) + # lls + if os.path.isdir(path): + directory_contents = os.listdir(path=path) + for entryname in sorted(directory_contents): + path_to_file = path + os.path.sep + entryname + rights_str = unix_permissions(path_to_file) + size_str = b_filesize(os.path.getsize(filename=path_to_file)) + date_str = datetime.datetime.fromtimestamp(os.path.getmtime(filename=path_to_file)).strftime("%Y-%m-%d %H:%M") + + if os.path.isdir(s=entryname): + if self.config.no_colors: + print("%s %10s %s %s%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) + else: + print("%s %10s %s \x1b[1;96m%s\x1b[0m%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) else: - print("%s %10s %s \x1b[1;96m%s\x1b[0m%s" % (rights_str, size_str, date_str, entryname, os.path.sep)) + if self.config.no_colors: + print("%s %10s %s %s" % (rights_str, size_str, date_str, entryname)) + else: + print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, entryname)) + # lls + elif os.path.isfile(path): + rights_str = unix_permissions(path) + size_str = b_filesize(os.path.getsize(filename=path)) + date_str = datetime.datetime.fromtimestamp(os.path.getmtime(filename=path)).strftime("%Y-%m-%d %H:%M") + if self.config.no_colors: + print("%s %10s %s %s" % (rights_str, size_str, date_str, os.path.basename(path))) else: - if self.config.no_colors: - print("%s %10s %s %s" % (rights_str, size_str, date_str, entryname)) - else: - print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, entryname)) - # lls - elif os.path.isfile(path): - rights_str = unix_permissions(path) - size_str = b_filesize(os.path.getsize(filename=path)) - date_str = datetime.datetime.fromtimestamp(os.path.getmtime(filename=path)).strftime("%Y-%m-%d %H:%M") - if self.config.no_colors: - print("%s %10s %s %s" % (rights_str, size_str, date_str, os.path.basename(path))) - else: - print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, os.path.basename(path))) - else: - print("[!] No such file or directory.") + print("%s %10s %s \x1b[1m%s\x1b[0m" % (rights_str, size_str, date_str, os.path.basename(path))) + + if len(arguments) > 1: + print() @command_arguments_required def command_lmkdir(self, arguments, command): @@ -574,19 +578,17 @@ def command_lmkdir(self, arguments, command): # Active SMB connection needed : No # SMB share needed : No - path = ' '.join(arguments) - - # Split each dir - if os.path.sep in path: - path = path.strip(os.path.sep).split(os.path.sep) - else: - path = [path] + for path in arguments: + if os.path.sep in path: + path = path.strip(os.path.sep).split(os.path.sep) + else: + path = [path] - # Create each dir in the path - for depth in range(1, len(path)+1): - tmp_path = os.path.sep.join(path[:depth]) - if not os.path.exists(tmp_path): - os.mkdir(path=tmp_path) + # Create each dir in the path + for depth in range(1, len(path)+1): + tmp_path = os.path.sep.join(path[:depth]) + if not os.path.exists(tmp_path): + os.mkdir(path=tmp_path) def command_lpwd(self, arguments, command): # Command arguments required : No @@ -670,15 +672,29 @@ def command_ls(self, arguments, command): # SMB share needed : Yes if len(arguments) == 0: - path = '.' + arguments = ['.'] else: - path = arguments[0] + arguments = resolve_remote_files(self.smbSession, arguments) - # Read the files - directory_contents = self.smbSession.list_contents(path=path) + for path in arguments: + if len(arguments) > 1: + print("%s:" % path) - for longname in sorted(directory_contents.keys(), key=lambda x:x.lower()): - windows_ls_entry(directory_contents[longname], self.config) + if self.smbSession.path_isdir(pathFromRoot=path): + # Read the files + directory_contents = self.smbSession.list_contents(path=path) + else: + entry = self.smbSession.get_entry(path=path) + if entry is not None: + directory_contents = {entry.get_longname(): entry} + else: + directory_contents = {} + + for longname in sorted(directory_contents.keys(), key=lambda x:x.lower()): + windows_ls_entry(directory_contents[longname], self.config) + + if len(arguments) > 1: + print() @command_arguments_required @active_smb_connection_needed diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index 5ecdd71..9928d61 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -435,13 +435,15 @@ def get_entry(self, path=None): """ if self.path_exists(path=path): - matches = self.smbClient.listPath(shareName=self.smb_share, path=path) + matches = self.smbClient.listPath( + shareName=self.smb_share, + path=path + ) if len(matches) == 1: return matches[0] else: return None - else: return None From 952734d163010fae9a29829c9786db3069c3f38c Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Fri, 21 Jun 2024 13:27:12 +0200 Subject: [PATCH 9/9] Fixed #43, Fixed #42, Fixed #41, Fixed #40, Fixed #39 --- smbclientng/core/InteractiveShell.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 9c98880..003ec8c 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -472,7 +472,7 @@ def command_lcat(self, arguments, command): files_and_directories = resolve_remote_files(self.smbSession, arguments) for path_to_file in files_and_directories: - # Read the file + # Read the file try: if os.path.exists(path=path_to_file): f = open(path_to_file, 'rb')