diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index 3ffba05..7c2aa28 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -213,6 +213,14 @@ class CommandCompleter(object): "subcommands": [], "autocomplete": ["remote_directory"] }, + "acls": { + "description": [ + "List ACLs of files and folders in cwd.", + "Syntax: 'acls'" + ], + "subcommands": [], + "autocomplete": ["remote_directory"] + }, "ltree": { "description": [ "Displays a tree view of the local directories.", diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 3958601..828d7ec 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -9,6 +9,8 @@ import datetime import impacket from importlib import import_module +import impacket.smbconnection +from impacket.smb3structs import * import ntpath import os import readline @@ -201,6 +203,10 @@ def process_line(self, commandLine): elif command in ["ls", "dir"]: self.command_ls(arguments, command) + # List directory contents in a share + elif command in ["acls"]: + self.command_acls(arguments, command) + # Shows the content of a local file elif command == "lcat": self.command_lcat(arguments, command) @@ -753,7 +759,41 @@ def command_ls(self, arguments, command): if len(arguments) > 1: self.logger.print() - + + @active_smb_connection_needed + @smb_share_is_set + def command_acls(self, arguments: list[str], command: str): + # Command arguments required : No + # Active SMB connection needed : Yes + # SMB share needed : Yes + + if len(arguments) == 0: + arguments = ['.'] + + smbClient = self.sessionsManager.current_session.smbClient + sharename = self.sessionsManager.current_session.smb_share + foldername = self.sessionsManager.current_session.smb_cwd + tree_id = smbClient.connectTree(sharename) + + for entry in smbClient.listPath(sharename, ntpath.join(foldername, '*')): + self.logger.print(windows_ls_entry(entry, self.config)) + filename = entry.get_longname() + + if filename in [".",".."]: + continue + filename = ntpath.join(foldername, filename) + try: + file_id = smbClient.getSMBServer().create(tree_id, filename, READ_CONTROL | FILE_READ_ATTRIBUTES, 0, FILE_DIRECTORY_FILE if entry.is_directory() else FILE_NON_DIRECTORY_FILE, FILE_OPEN, 0) + except Exception as err: + self.logger.debug(f"Could not get attributes for file {filename}: {str(err)}") + continue + + file_info = smbClient.getSMBServer().queryInfo(tree_id, file_id, infoType=SMB2_0_INFO_SECURITY, fileInfoClass=SMB2_SEC_INFO_00, additionalInformation=OWNER_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION | GROUP_SECURITY_INFORMATION, flags=0) + + self.sessionsManager.current_session.printSecurityDescriptorTable(file_info, filename) + + self.logger.print() + @command_arguments_required @active_smb_connection_needed @smb_share_is_set @@ -970,8 +1010,6 @@ def command_sizeof(self, arguments, command): self.logger.error(f"Failed to access '{path}': {e}") except (BrokenPipeError, KeyboardInterrupt): self.logger.error("Interrupted.") - self.close_smb_session() - self.init_smb_session() return except Exception as e: self.logger.error(f"Error while processing '{path}': {e}") @@ -994,17 +1032,18 @@ def command_shares(self, arguments, command): do_check_rights = True test_write = False - self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.") - self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.") - self.logger.print("| Do you want to continue? [N/y] ", end='') - user_response = input() - self.logger.write_to_logfile(user_response) - while user_response.lower().strip() not in ['y', 'n']: - self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='') + if do_check_rights: + self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.") + self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.") + self.logger.print("| Do you want to continue? [N/y] ", end='') user_response = input() self.logger.write_to_logfile(user_response) - if user_response.lower().strip() == 'y': - test_write = True + while user_response.lower().strip() not in ['y', 'n']: + self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='') + user_response = input() + self.logger.write_to_logfile(user_response) + if user_response.lower().strip() == 'y': + test_write = True shares = self.sessionsManager.current_session.list_shares() if len(shares.keys()) != 0: @@ -1016,6 +1055,10 @@ def command_shares(self, arguments, command): if do_check_rights: table.add_column("Rights") + security_descriptor = list(shares.values())[0].get("security_descriptor") + if security_descriptor is not None: + table.add_column("Security Descriptor") + for sharename in sorted(shares.keys()): types = ', '.join([s.replace("STYPE_","") for s in shares[sharename]["type"]]) @@ -1032,22 +1075,34 @@ def command_shares(self, arguments, command): str_comment = "[bold bright_yellow]" + shares[sharename]["comment"] + "[/bold bright_yellow]" if do_check_rights: - access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write) - str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" - if access_rights["readable"] and access_rights["writable"]: - str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]" - elif access_rights["readable"]: - str_access_rights = "[bold green]READ[/bold green]" - elif access_rights["writable"]: - # Without READ?? This should not happen IMHO - str_access_rights = "[bold red]WRITE[/bold red]" - else: + try: + access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write) str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" + if access_rights["readable"] and access_rights["writable"]: + str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]" + elif access_rights["readable"]: + str_access_rights = "[bold green]READ[/bold green]" + elif access_rights["writable"]: + # Without READ?? This should not happen IMHO + str_access_rights = "[bold red]WRITE[/bold red]" + else: + str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" + except: + str_access_rights = "" + + if security_descriptor is not None: + sd_table = self.sessionsManager.current_session.securityDescriptorTable(b''.join(shares[sharename].get("security_descriptor")), "sharename", prefix="", table_colors=True) if do_check_rights: - table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights) + if security_descriptor is not None: + table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights, sd_table) + else: + table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights) else: - table.add_row(str_sharename, str_hidden, str_types, str_comment) + if security_descriptor is not None: + table.add_row(str_sharename, str_hidden, str_types, str_comment, sd_table) + else: + table.add_row(str_sharename, str_hidden, str_types, str_comment) Console().print(table) else: diff --git a/smbclientng/core/SIDResolver.py b/smbclientng/core/SIDResolver.py new file mode 100644 index 0000000..7d93d5e --- /dev/null +++ b/smbclientng/core/SIDResolver.py @@ -0,0 +1,57 @@ +from __future__ import annotations +from impacket.dcerpc.v5 import transport, lsat, lsad, rpcrt +from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED +from impacket.dcerpc.v5.lsat import DCERPCSessionError +from impacket.nt_errors import STATUS_SOME_NOT_MAPPED, STATUS_NONE_MAPPED +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from impacket.smbconnection import SMBConnection + +class SIDResolver: + def __init__(self, smbConnection: SMBConnection): + self.__smbConnection = smbConnection + self.__dce = self.__get_lsarpc_binding() + self.__dce.connect() + self.__dce.bind(lsat.MSRPC_UUID_LSAT) + self.cache = dict() + + def close(self): + self.__dce.disconnect() + + def __get_lsarpc_binding(self) -> rpcrt.DCERPC_v5: + rpctransport = transport.SMBTransport(445, filename = "lsarpc") + rpctransport.set_smb_connection(self.__smbConnection) + return rpctransport.get_dce_rpc() + + def resolve_sids(self, sids: set[str]) -> None: + unresolved_sids = list(sids.difference(self.cache.keys())) + if len(unresolved_sids) == 0: + return + resp = lsad.hLsarOpenPolicy2(self.__dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES) + policyHandle = resp['PolicyHandle'] + try: + resp = lsat.hLsarLookupSids(self.__dce, policyHandle, unresolved_sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta) + except DCERPCSessionError as err: + #Catch error when some could not be resolved: + if err.error_code == STATUS_SOME_NOT_MAPPED and err.packet != None: + resp = err.packet + elif err.error_code == STATUS_NONE_MAPPED: + return + else: + raise err + for i, item in enumerate(resp['TranslatedNames']['Names']): + domain = resp['ReferencedDomains']['Domains'][item['DomainIndex']]['Name'] + if len(item['Name']) == 0: + domain = domain + "\\" if len(domain) != 0 else "" + cur_sid = f"{domain}Unknown (RID={unresolved_sids[i].split('-')[-1]})" + elif len(domain) == 0: + cur_sid = item['Name'] + else: + cur_sid = "{}\\{}".format(domain, item['Name']) + self.cache[unresolved_sids[i]] = cur_sid + + def get_sid(self, sid: str) -> str: + if not sid in self.cache: + self.resolve_sids({sid}) + return self.cache.get(sid) or sid \ No newline at end of file diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index db56a99..63e83c5 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -7,6 +7,11 @@ import io import impacket.smbconnection +from typing import Optional +from impacket.smbconnection import SMBConnection, SessionError +from impacket.dcerpc.v5 import transport, rpcrt, srvs +from impacket.ldap import ldaptypes +from impacket.nt_errors import STATUS_OBJECT_NAME_COLLISION import ntpath import os import random @@ -15,6 +20,8 @@ import traceback from smbclientng.core.LocalFileIO import LocalFileIO from smbclientng.core.utils import b_filesize, STYPE_MASK, is_port_open, smb_entry_iterator +from smbclientng.core.SIDResolver import SIDResolver +from typing import TYPE_CHECKING class SMBSession(object): @@ -53,6 +60,8 @@ class SMBSession(object): test_rights(sharename): Tests read and write access rights on a share. """ + dce_srvsvc: Optional[rpcrt.DCERPC_v5] = None + sid_resolver: SIDResolver def __init__(self, host, port, timeout, credentials, advertisedName=None, config=None, logger=None): super(SMBSession, self).__init__() # Objects @@ -211,6 +220,20 @@ def init_smb_session(self): else: self.logger.error("Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + if self.connected: + try: + self.sid_resolver = SIDResolver(self.smbClient) + except Exception as err: + self.logger.error(f"SIDResolver could not be initialized: {err}") + try: + rpctransport = transport.SMBTransport(self.smbClient.getRemoteName(), self.smbClient.getRemoteHost(), filename=r'\srvsvc', + smb_connection=self.smbClient) + self.dce_srvsvc = rpctransport.get_dce_rpc() + self.dce_srvsvc.connect() + self.dce_srvsvc.bind(srvs.MSRPC_UUID_SRVS) + except Exception as err: + self.logger.error(f"Could not initialize connection to srvsvc: {err}") + return self.connected def ping_smb_session(self): @@ -594,6 +617,85 @@ def list_contents(self, path=None): contents[entry.get_longname()] = entry return contents + + def printSecurityDescriptorTable(self, security_descriptor: str, subject: str, prefix: str = " "*13, table_colors: bool = False): + self.logger.print(self.securityDescriptorTable(security_descriptor, subject, prefix, table_colors)) + + def securityDescriptorTable(self, security_descriptor: str, subject: str, prefix: str = " "*13, table_colors: bool = False) -> str: + if security_descriptor is not None and len(security_descriptor) == 0: + return "" + out_sd = "" + sd = ldaptypes.SR_SECURITY_DESCRIPTOR() + sd.fromString(security_descriptor) + try: + self.sid_resolver.resolve_sids(set( + ([sd['OwnerSid'].formatCanonical()] if len(sd['OwnerSid']) != 0 else []) + + ([sd['GroupSid'].formatCanonical()] if len(sd['GroupSid']) != 0 else []) + + [acl['Ace']['Sid'].formatCanonical() for acl in sd['Dacl']['Data'] if len(acl['Ace']['Sid']) != 0])) + except Exception as err: + self.logger.debug(f"Could not resolve SID for {subject}: {str(err)}") + traceback.print_exc() + max_resolved_sid_length = max([len(i) for i in self.sid_resolver.cache.values()] + [0]) + + if len(sd['OwnerSid']) != 0: + resolved_owner_sid = self.sid_resolver.get_sid(sd['OwnerSid'].formatCanonical()) + resolved_group_sid = self.sid_resolver.get_sid(sd['GroupSid'].formatCanonical()) + + if self.config.no_colors: + out_sd += f"{prefix}Owner: {resolved_owner_sid}\n" + out_sd += f"{prefix}Group: {resolved_group_sid}" + else: + if table_colors: + out_sd += f"{prefix}Owner: [bold yellow]{resolved_owner_sid}[/bold yellow]\n" + out_sd += f"{prefix}Group: [bold yellow]{resolved_group_sid}[/bold yellow]" + else: + out_sd += f"{prefix}Owner: \x1b[1m{resolved_owner_sid}\x1b[0m\n" + out_sd += f"{prefix}Group: \x1b[1m{resolved_group_sid}\x1b[0m" + + for i, acl in enumerate(sd['Dacl']['Data']): + resolved_sid = acl['Ace']['Sid'].formatCanonical() if len(acl['Ace']['Sid']) != 0 else "" + if resolved_sid in ["S-1-5-32-544", "S-1-5-18"]: + continue + + flags = [] + for flag in ["GENERIC_READ", "GENERIC_WRITE", "GENERIC_EXECUTE", "GENERIC_ALL", "MAXIMUM_ALLOWED", "ACCESS_SYSTEM_SECURITY", "WRITE_OWNER", "WRITE_DACL", "DELETE", "READ_CONTROL", "SYNCHRONIZE"]: + if len(acl['Ace']['Mask']) != 0 and acl['Ace']['Mask'].hasPriv(getattr(ldaptypes.ACCESS_MASK, flag)): + flags.append(flag) + if len(flags) == 0: + continue + try: + resolved_sid = self.sid_resolver.get_sid(resolved_sid) if resolved_sid else "" + except Exception as err: + self.logger.debug(f"Could not resolve SID {resolved_sid} for {subject}: {str(err)}") + + acl_string = prefix + inbetween = "" + if len(resolved_sid) < max_resolved_sid_length+1: + inbetween = " "*(max_resolved_sid_length+1-len(resolved_sid)) + + if self.config.no_colors: + acl_string += f"{resolved_sid}" + ' | '.join(flags) + else: + acl_string += "Allowed: " if acl['TypeName'] == "ACCESS_ALLOWED_ACE" else "Denied: " + if table_colors: + acl_string += f"[bold yellow]{resolved_sid}[/bold yellow]" + else: + acl_string += f"\x1b[1m{resolved_sid}\x1b[0m" + acl_string += inbetween + acl_string += ' | '.join(flags) + out_sd += "\n" + acl_string + return out_sd.lstrip("\n") + + def listSharesDetailed(self) -> dict: + """ + get a list of available shares at the connected target + + :return: a list containing dict entries for each share + :raise SessionError: if error + """ + # Get the shares through RPC + resp = srvs.hNetrShareEnum(self.dce_srvsvc, 502, serverName="\\\\" + self.smbClient.getRemoteHost()) + return resp['InfoStruct']['ShareInfo']['Level502']['Buffer'] def list_shares(self): """ @@ -611,21 +713,41 @@ def list_shares(self): if self.connected: if self.smbClient is not None: - resp = self.smbClient.listShares() - - for share in resp: - # SHARE_INFO_1 structure (lmshare.h) - # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_1 - sharename = share["shi1_netname"][:-1] - sharecomment = share["shi1_remark"][:-1] - sharetype = share["shi1_type"] - - self.available_shares[sharename.lower()] = { - "name": sharename, - "type": STYPE_MASK(sharetype), - "rawtype": sharetype, - "comment": sharecomment - } + try: + resp = self.listSharesDetailed() + for share in resp: + # SHARE_INFO_502 structure (lmshare.h) + # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_502 + sharename = share["shi502_netname"][:-1] + sharecomment = share["shi502_remark"][:-1] + sharetype = share["shi502_type"] + sharesd = share["shi502_security_descriptor"] + + self.available_shares[sharename.lower()] = { + "name": sharename, + "type": STYPE_MASK(sharetype), + "rawtype": sharetype, + "comment": sharecomment, + "security_descriptor": sharesd + } + except Exception as err: + self.logger.debug(f"Could not get detailed share info: {str(err)}") + resp = self.smbClient.listShares() + + for share in resp: + # SHARE_INFO_1 structure (lmshare.h) + # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_1 + sharename = share["shi1_netname"][:-1] + sharecomment = share["shi1_remark"][:-1] + sharetype = share["shi1_type"] + + self.available_shares[sharename.lower()] = { + "name": sharename, + "type": STYPE_MASK(sharetype), + "rawtype": sharetype, + "comment": sharecomment + } + else: self.logger.error("Error: SMBSession.smbClient is None.")