diff --git a/smbclientng/core/CommandCompleter.py b/smbclientng/core/CommandCompleter.py index 241bfde..baf4178 100644 --- a/smbclientng/core/CommandCompleter.py +++ b/smbclientng/core/CommandCompleter.py @@ -220,6 +220,14 @@ class CommandCompleter(object): "subcommands": [], "autocomplete": ["remote_directory"] }, + "acls": { + "description": [ + "List ACLs of files and folders in cwd.", + "Syntax: 'acls'" + ], + "subcommands": [], + "autocomplete": ["remote_directory"] + }, "ltree": { "description": [ "Displays a tree view of the local directories.", diff --git a/smbclientng/core/InteractiveShell.py b/smbclientng/core/InteractiveShell.py index 4108d85..9e7cf32 100644 --- a/smbclientng/core/InteractiveShell.py +++ b/smbclientng/core/InteractiveShell.py @@ -11,6 +11,8 @@ from impacket.smbconnection import SessionError as SMBConnectionSessionError from impacket.smb3 import SessionError as SMB3SessionError from importlib import import_module +import impacket.smbconnection +from impacket.smb3structs import * import ntpath import os import readline @@ -216,6 +218,10 @@ def process_line(self, commandLine: str): elif command in ["ls", "dir"]: self.command_ls(arguments, command) + # List directory contents in a share + elif command in ["acls"]: + self.command_acls(arguments, command) + # Shows the content of a local file elif command == "lcat": self.command_lcat(arguments, command) @@ -505,7 +511,7 @@ def command_lbat(self, arguments: list[str], command: str): # SMB share needed : No # Parse wildcards - files_and_directories = resolve_remote_files(self.sessionsManager.current_session, arguments) + files_and_directories = resolve_local_files(arguments) for path_to_file in files_and_directories: # Read the file @@ -543,7 +549,7 @@ def command_lcat(self, arguments: list[str], command: str): # SMB share needed : No # Parse wildcards - files_and_directories = resolve_remote_files(self.sessionsManager.current_session, arguments) + files_and_directories = resolve_local_files(arguments) for path_to_file in files_and_directories: # Read the file @@ -769,7 +775,41 @@ def command_ls(self, arguments: list[str], command: str): if len(arguments) > 1: self.logger.print() - + + @active_smb_connection_needed + @smb_share_is_set + def command_acls(self, arguments: list[str], command: str): + # Command arguments required : No + # Active SMB connection needed : Yes + # SMB share needed : Yes + + if len(arguments) == 0: + arguments = ['.'] + + smbClient = self.sessionsManager.current_session.smbClient + sharename = self.sessionsManager.current_session.smb_share + foldername = self.sessionsManager.current_session.smb_cwd + tree_id = smbClient.connectTree(sharename) + + for entry in smbClient.listPath(sharename, ntpath.join(foldername, '*')): + self.logger.print(windows_ls_entry(entry, self.config)) + filename = entry.get_longname() + + if filename in [".",".."]: + continue + filename = ntpath.join(foldername, filename) + try: + file_id = smbClient.getSMBServer().create(tree_id, filename, READ_CONTROL | FILE_READ_ATTRIBUTES, 0, FILE_DIRECTORY_FILE if entry.is_directory() else FILE_NON_DIRECTORY_FILE, FILE_OPEN, 0) + except Exception as err: + self.logger.debug(f"Could not get attributes for file {filename}: {str(err)}") + continue + + file_info = smbClient.getSMBServer().queryInfo(tree_id, file_id, infoType=SMB2_0_INFO_SECURITY, fileInfoClass=SMB2_SEC_INFO_00, additionalInformation=OWNER_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION | GROUP_SECURITY_INFORMATION, flags=0) + + self.sessionsManager.current_session.printSecurityDescriptorTable(file_info, filename) + + self.logger.print() + @command_arguments_required @active_smb_connection_needed @smb_share_is_set @@ -1009,17 +1049,18 @@ def command_shares(self, arguments: list[str], command: str): do_check_rights = True test_write = False - self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.") - self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.") - self.logger.print("| Do you want to continue? [N/y] ", end='') - user_response = input() - self.logger.write_to_logfile(user_response) - while user_response.lower().strip() not in ['y', 'n']: - self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='') + if do_check_rights: + self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.") + self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.") + self.logger.print("| Do you want to continue? [N/y] ", end='') user_response = input() self.logger.write_to_logfile(user_response) - if user_response.lower().strip() == 'y': - test_write = True + while user_response.lower().strip() not in ['y', 'n']: + self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='') + user_response = input() + self.logger.write_to_logfile(user_response) + if user_response.lower().strip() == 'y': + test_write = True shares = self.sessionsManager.current_session.list_shares() if len(shares.keys()) != 0: @@ -1031,6 +1072,10 @@ def command_shares(self, arguments: list[str], command: str): if do_check_rights: table.add_column("Rights") + security_descriptor = list(shares.values())[0].get("security_descriptor") + if security_descriptor is not None: + table.add_column("Security Descriptor") + for sharename in sorted(shares.keys()): types = ', '.join([s.replace("STYPE_","") for s in shares[sharename]["type"]]) @@ -1047,22 +1092,34 @@ def command_shares(self, arguments: list[str], command: str): str_comment = "[bold bright_yellow]" + shares[sharename]["comment"] + "[/bold bright_yellow]" if do_check_rights: - access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write) - str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" - if access_rights["readable"] and access_rights["writable"]: - str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]" - elif access_rights["readable"]: - str_access_rights = "[bold green]READ[/bold green]" - elif access_rights["writable"]: - # Without READ?? This should not happen IMHO - str_access_rights = "[bold red]WRITE[/bold red]" - else: + try: + access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write) str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" + if access_rights["readable"] and access_rights["writable"]: + str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]" + elif access_rights["readable"]: + str_access_rights = "[bold green]READ[/bold green]" + elif access_rights["writable"]: + # Without READ?? This should not happen IMHO + str_access_rights = "[bold red]WRITE[/bold red]" + else: + str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]" + except: + str_access_rights = "" + + if security_descriptor is not None: + sd_table = self.sessionsManager.current_session.securityDescriptorTable(b''.join(shares[sharename].get("security_descriptor")), "sharename", prefix="", table_colors=True) if do_check_rights: - table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights) + if security_descriptor is not None: + table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights, sd_table) + else: + table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights) else: - table.add_row(str_sharename, str_hidden, str_types, str_comment) + if security_descriptor is not None: + table.add_row(str_sharename, str_hidden, str_types, str_comment, sd_table) + else: + table.add_row(str_sharename, str_hidden, str_types, str_comment) Console().print(table) else: diff --git a/smbclientng/core/SIDResolver.py b/smbclientng/core/SIDResolver.py new file mode 100644 index 0000000..7d93d5e --- /dev/null +++ b/smbclientng/core/SIDResolver.py @@ -0,0 +1,57 @@ +from __future__ import annotations +from impacket.dcerpc.v5 import transport, lsat, lsad, rpcrt +from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED +from impacket.dcerpc.v5.lsat import DCERPCSessionError +from impacket.nt_errors import STATUS_SOME_NOT_MAPPED, STATUS_NONE_MAPPED +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from impacket.smbconnection import SMBConnection + +class SIDResolver: + def __init__(self, smbConnection: SMBConnection): + self.__smbConnection = smbConnection + self.__dce = self.__get_lsarpc_binding() + self.__dce.connect() + self.__dce.bind(lsat.MSRPC_UUID_LSAT) + self.cache = dict() + + def close(self): + self.__dce.disconnect() + + def __get_lsarpc_binding(self) -> rpcrt.DCERPC_v5: + rpctransport = transport.SMBTransport(445, filename = "lsarpc") + rpctransport.set_smb_connection(self.__smbConnection) + return rpctransport.get_dce_rpc() + + def resolve_sids(self, sids: set[str]) -> None: + unresolved_sids = list(sids.difference(self.cache.keys())) + if len(unresolved_sids) == 0: + return + resp = lsad.hLsarOpenPolicy2(self.__dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES) + policyHandle = resp['PolicyHandle'] + try: + resp = lsat.hLsarLookupSids(self.__dce, policyHandle, unresolved_sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta) + except DCERPCSessionError as err: + #Catch error when some could not be resolved: + if err.error_code == STATUS_SOME_NOT_MAPPED and err.packet != None: + resp = err.packet + elif err.error_code == STATUS_NONE_MAPPED: + return + else: + raise err + for i, item in enumerate(resp['TranslatedNames']['Names']): + domain = resp['ReferencedDomains']['Domains'][item['DomainIndex']]['Name'] + if len(item['Name']) == 0: + domain = domain + "\\" if len(domain) != 0 else "" + cur_sid = f"{domain}Unknown (RID={unresolved_sids[i].split('-')[-1]})" + elif len(domain) == 0: + cur_sid = item['Name'] + else: + cur_sid = "{}\\{}".format(domain, item['Name']) + self.cache[unresolved_sids[i]] = cur_sid + + def get_sid(self, sid: str) -> str: + if not sid in self.cache: + self.resolve_sids({sid}) + return self.cache.get(sid) or sid \ No newline at end of file diff --git a/smbclientng/core/SMBSession.py b/smbclientng/core/SMBSession.py index 9ddfa37..0b01d89 100644 --- a/smbclientng/core/SMBSession.py +++ b/smbclientng/core/SMBSession.py @@ -6,8 +6,10 @@ from __future__ import annotations import io +import impacket.smbconnection from typing import Optional from impacket.smbconnection import SMBConnection, SessionError +from impacket.dcerpc.v5 import transport, rpcrt, srvs from impacket.ldap import ldaptypes from impacket.nt_errors import STATUS_OBJECT_NAME_COLLISION import ntpath @@ -18,6 +20,7 @@ import traceback from smbclientng.core.LocalFileIO import LocalFileIO from smbclientng.core.utils import b_filesize, STYPE_MASK, is_port_open, smb_entry_iterator +from smbclientng.core.SIDResolver import SIDResolver from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -73,13 +76,16 @@ class SMBSession(object): smbClient: Optional[SMBConnection] = None connected: bool = False - + available_shares: dict[str,dict] = {} smb_share: Optional[str] = None smb_cwd: str = "" smb_tree_id: Optional[int] = None - def __init__(self, host: str, port: int, timeout: int, credentials: Credentials, advertisedName: Optional[str], config: Config, logger: Logger): + dce_srvsvc: Optional[rpcrt.DCERPC_v5] = None + sid_resolver: SIDResolver + + def __init__(self, host, port, timeout, credentials, advertisedName=None, config=None, logger=None): super(SMBSession, self).__init__() # Objects self.config = config @@ -228,6 +234,20 @@ def init_smb_session(self) -> bool: else: self.logger.error("Failed to authenticate to '%s' as '%s\\%s'!" % (self.host, self.credentials.domain, self.credentials.username)) + if self.connected: + try: + self.sid_resolver = SIDResolver(self.smbClient) + except Exception as err: + self.logger.error(f"SIDResolver could not be initialized: {err}") + try: + rpctransport = transport.SMBTransport(self.smbClient.getRemoteName(), self.smbClient.getRemoteHost(), filename=r'\srvsvc', + smb_connection=self.smbClient) + self.dce_srvsvc = rpctransport.get_dce_rpc() + self.dce_srvsvc.connect() + self.dce_srvsvc.bind(srvs.MSRPC_UUID_SRVS) + except Exception as err: + self.logger.error(f"Could not initialize connection to srvsvc: {err}") + return self.connected def ping_smb_session(self) -> bool: @@ -613,6 +633,85 @@ def list_contents(self, path: Optional[str] = None) -> dict[str, SharedFile]: contents[entry.get_longname()] = entry return contents + + def printSecurityDescriptorTable(self, security_descriptor: str, subject: str, prefix: str = " "*13, table_colors: bool = False): + self.logger.print(self.securityDescriptorTable(security_descriptor, subject, prefix, table_colors)) + + def securityDescriptorTable(self, security_descriptor: str, subject: str, prefix: str = " "*13, table_colors: bool = False) -> str: + if security_descriptor is not None and len(security_descriptor) == 0: + return "" + out_sd = "" + sd = ldaptypes.SR_SECURITY_DESCRIPTOR() + sd.fromString(security_descriptor) + try: + self.sid_resolver.resolve_sids(set( + ([sd['OwnerSid'].formatCanonical()] if len(sd['OwnerSid']) != 0 else []) + + ([sd['GroupSid'].formatCanonical()] if len(sd['GroupSid']) != 0 else []) + + [acl['Ace']['Sid'].formatCanonical() for acl in sd['Dacl']['Data'] if len(acl['Ace']['Sid']) != 0])) + except Exception as err: + self.logger.debug(f"Could not resolve SID for {subject}: {str(err)}") + traceback.print_exc() + max_resolved_sid_length = max([len(i) for i in self.sid_resolver.cache.values()] + [0]) + + if len(sd['OwnerSid']) != 0: + resolved_owner_sid = self.sid_resolver.get_sid(sd['OwnerSid'].formatCanonical()) + resolved_group_sid = self.sid_resolver.get_sid(sd['GroupSid'].formatCanonical()) + + if self.config.no_colors: + out_sd += f"{prefix}Owner: {resolved_owner_sid}\n" + out_sd += f"{prefix}Group: {resolved_group_sid}" + else: + if table_colors: + out_sd += f"{prefix}Owner: [bold yellow]{resolved_owner_sid}[/bold yellow]\n" + out_sd += f"{prefix}Group: [bold yellow]{resolved_group_sid}[/bold yellow]" + else: + out_sd += f"{prefix}Owner: \x1b[1m{resolved_owner_sid}\x1b[0m\n" + out_sd += f"{prefix}Group: \x1b[1m{resolved_group_sid}\x1b[0m" + + for i, acl in enumerate(sd['Dacl']['Data']): + resolved_sid = acl['Ace']['Sid'].formatCanonical() if len(acl['Ace']['Sid']) != 0 else "" + if resolved_sid in ["S-1-5-32-544", "S-1-5-18"]: + continue + + flags = [] + for flag in ["GENERIC_READ", "GENERIC_WRITE", "GENERIC_EXECUTE", "GENERIC_ALL", "MAXIMUM_ALLOWED", "ACCESS_SYSTEM_SECURITY", "WRITE_OWNER", "WRITE_DACL", "DELETE", "READ_CONTROL", "SYNCHRONIZE"]: + if len(acl['Ace']['Mask']) != 0 and acl['Ace']['Mask'].hasPriv(getattr(ldaptypes.ACCESS_MASK, flag)): + flags.append(flag) + if len(flags) == 0: + continue + try: + resolved_sid = self.sid_resolver.get_sid(resolved_sid) if resolved_sid else "" + except Exception as err: + self.logger.debug(f"Could not resolve SID {resolved_sid} for {subject}: {str(err)}") + + acl_string = prefix + inbetween = "" + if len(resolved_sid) < max_resolved_sid_length+1: + inbetween = " "*(max_resolved_sid_length+1-len(resolved_sid)) + + if self.config.no_colors: + acl_string += f"{resolved_sid}" + ' | '.join(flags) + else: + acl_string += "Allowed: " if acl['TypeName'] == "ACCESS_ALLOWED_ACE" else "Denied: " + if table_colors: + acl_string += f"[bold yellow]{resolved_sid}[/bold yellow]" + else: + acl_string += f"\x1b[1m{resolved_sid}\x1b[0m" + acl_string += inbetween + acl_string += ' | '.join(flags) + out_sd += "\n" + acl_string + return out_sd.lstrip("\n") + + def listSharesDetailed(self) -> dict: + """ + get a list of available shares at the connected target + + :return: a list containing dict entries for each share + :raise SessionError: if error + """ + # Get the shares through RPC + resp = srvs.hNetrShareEnum(self.dce_srvsvc, 502, serverName="\\\\" + self.smbClient.getRemoteHost()) + return resp['InfoStruct']['ShareInfo']['Level502']['Buffer'] def list_shares(self) -> dict[str,dict]: """ @@ -630,22 +729,41 @@ def list_shares(self) -> dict[str,dict]: if self.connected: if self.smbClient is not None: - resp = self.smbClient.listShares() - - for share in resp: - # SHARE_INFO_1 structure (lmshare.h) - # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_1 - sharename = share["shi1_netname"][:-1] - sharecomment = share["shi1_remark"][:-1] - sharetype = share["shi1_type"] - - self.available_shares[sharename.lower()] = { - "name": sharename, - "type": STYPE_MASK(sharetype), - "rawtype": sharetype, - "comment": sharecomment - } - + try: + resp = self.listSharesDetailed() + for share in resp: + # SHARE_INFO_502 structure (lmshare.h) + # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_502 + sharename = share["shi502_netname"][:-1] + sharecomment = share["shi502_remark"][:-1] + sharetype = share["shi502_type"] + sharesd = share["shi502_security_descriptor"] + + self.available_shares[sharename.lower()] = { + "name": sharename, + "type": STYPE_MASK(sharetype), + "rawtype": sharetype, + "comment": sharecomment, + "security_descriptor": sharesd + } + except Exception as err: + self.logger.debug(f"Could not get detailed share info: {str(err)}") + resp = self.smbClient.listShares() + + for share in resp: + # SHARE_INFO_1 structure (lmshare.h) + # https://learn.microsoft.com/en-us/windows/win32/api/lmshare/ns-lmshare-share_info_1 + sharename = share["shi1_netname"][:-1] + sharecomment = share["shi1_remark"][:-1] + sharetype = share["shi1_type"] + + self.available_shares[sharename.lower()] = { + "name": sharename, + "type": STYPE_MASK(sharetype), + "rawtype": sharetype, + "comment": sharecomment + } + else: self.logger.error("Error: SMBSession.smbClient is None.")