Skip to content

Commit

Permalink
Added acls command to list ACLs of files and folders, and show securi…
Browse files Browse the repository at this point in the history
…ty descriptor of shares if available
  • Loading branch information
rtpt-romankarwacik committed Nov 26, 2024
1 parent ebcf32f commit 0a42a34
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 39 deletions.
8 changes: 8 additions & 0 deletions smbclientng/core/CommandCompleter.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class CommandCompleter(object):
"subcommands": [],
"autocomplete": ["remote_directory"]
},
"acls": {
"description": [
"List ACLs of files and folders in cwd.",
"Syntax: 'acls'"
],
"subcommands": [],
"autocomplete": ["remote_directory"]
},
"ltree": {
"description": [
"Displays a tree view of the local directories.",
Expand Down
106 changes: 82 additions & 24 deletions smbclientng/core/InteractiveShell.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import datetime
import impacket
from importlib import import_module
import impacket.smbconnection
from impacket.smb3structs import *
import ntpath
import os
import readline
Expand Down Expand Up @@ -201,6 +203,10 @@ def process_line(self, commandLine):
elif command in ["ls", "dir"]:
self.command_ls(arguments, command)

# List directory contents in a share
elif command in ["acls"]:
self.command_acls(arguments, command)

# Shows the content of a local file
elif command == "lcat":
self.command_lcat(arguments, command)
Expand Down Expand Up @@ -753,7 +759,41 @@ def command_ls(self, arguments, command):

if len(arguments) > 1:
self.logger.print()


@active_smb_connection_needed
@smb_share_is_set
def command_acls(self, arguments: list[str], command: str):
# Command arguments required : No
# Active SMB connection needed : Yes
# SMB share needed : Yes

if len(arguments) == 0:
arguments = ['.']

smbClient = self.sessionsManager.current_session.smbClient
sharename = self.sessionsManager.current_session.smb_share
foldername = self.sessionsManager.current_session.smb_cwd
tree_id = smbClient.connectTree(sharename)

for entry in smbClient.listPath(sharename, ntpath.join(foldername, '*')):
self.logger.print(windows_ls_entry(entry, self.config))
filename = entry.get_longname()

if filename in [".",".."]:
continue
filename = ntpath.join(foldername, filename)
try:
file_id = smbClient.getSMBServer().create(tree_id, filename, READ_CONTROL | FILE_READ_ATTRIBUTES, 0, FILE_DIRECTORY_FILE if entry.is_directory() else FILE_NON_DIRECTORY_FILE, FILE_OPEN, 0)
except Exception as err:
self.logger.debug(f"Could not get attributes for file {filename}: {str(err)}")
continue

file_info = smbClient.getSMBServer().queryInfo(tree_id, file_id, infoType=SMB2_0_INFO_SECURITY, fileInfoClass=SMB2_SEC_INFO_00, additionalInformation=OWNER_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION | GROUP_SECURITY_INFORMATION, flags=0)

self.sessionsManager.current_session.printSecurityDescriptorTable(file_info, filename)

self.logger.print()

@command_arguments_required
@active_smb_connection_needed
@smb_share_is_set
Expand Down Expand Up @@ -970,8 +1010,6 @@ def command_sizeof(self, arguments, command):
self.logger.error(f"Failed to access '{path}': {e}")
except (BrokenPipeError, KeyboardInterrupt):
self.logger.error("Interrupted.")
self.close_smb_session()
self.init_smb_session()
return
except Exception as e:
self.logger.error(f"Error while processing '{path}': {e}")
Expand All @@ -994,17 +1032,18 @@ def command_shares(self, arguments, command):
do_check_rights = True
test_write = False

self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.")
self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.")
self.logger.print("| Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
while user_response.lower().strip() not in ['y', 'n']:
self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='')
if do_check_rights:
self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.")
self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.")
self.logger.print("| Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
if user_response.lower().strip() == 'y':
test_write = True
while user_response.lower().strip() not in ['y', 'n']:
self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
if user_response.lower().strip() == 'y':
test_write = True

shares = self.sessionsManager.current_session.list_shares()
if len(shares.keys()) != 0:
Expand All @@ -1016,6 +1055,13 @@ def command_shares(self, arguments, command):
if do_check_rights:
table.add_column("Rights")

if do_check_rights:
table.add_column("Rights")

security_descriptor = list(shares.values())[0].get("security_descriptor")
if security_descriptor is not None:
table.add_column("Security Descriptor")

for sharename in sorted(shares.keys()):
types = ', '.join([s.replace("STYPE_","") for s in shares[sharename]["type"]])

Expand All @@ -1032,22 +1078,34 @@ def command_shares(self, arguments, command):
str_comment = "[bold bright_yellow]" + shares[sharename]["comment"] + "[/bold bright_yellow]"

if do_check_rights:
access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write)
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
if access_rights["readable"] and access_rights["writable"]:
str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]"
elif access_rights["readable"]:
str_access_rights = "[bold green]READ[/bold green]"
elif access_rights["writable"]:
# Without READ?? This should not happen IMHO
str_access_rights = "[bold red]WRITE[/bold red]"
else:
try:
access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write)
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
if access_rights["readable"] and access_rights["writable"]:
str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]"
elif access_rights["readable"]:
str_access_rights = "[bold green]READ[/bold green]"
elif access_rights["writable"]:
# Without READ?? This should not happen IMHO
str_access_rights = "[bold red]WRITE[/bold red]"
else:
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
except:
str_access_rights = ""

if security_descriptor is not None:
sd_table = self.sessionsManager.current_session.securityDescriptorTable(b''.join(shares[sharename].get("security_descriptor")), "sharename", prefix="", table_colors=True)

if do_check_rights:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights)
if security_descriptor is not None:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights, sd_table)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment)
if security_descriptor is not None:
table.add_row(str_sharename, str_hidden, str_types, str_comment, sd_table)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment)

Console().print(table)
else:
Expand Down
57 changes: 57 additions & 0 deletions smbclientng/core/SIDResolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations
from impacket.dcerpc.v5 import transport, lsat, lsad, rpcrt
from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED
from impacket.dcerpc.v5.lsat import DCERPCSessionError
from impacket.nt_errors import STATUS_SOME_NOT_MAPPED, STATUS_NONE_MAPPED
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from impacket.smbconnection import SMBConnection

class SIDResolver:
def __init__(self, smbConnection: SMBConnection):
self.__smbConnection = smbConnection
self.__dce = self.__get_lsarpc_binding()
self.__dce.connect()
self.__dce.bind(lsat.MSRPC_UUID_LSAT)
self.cache = dict()

def close(self):
self.__dce.disconnect()

def __get_lsarpc_binding(self) -> rpcrt.DCERPC_v5:
rpctransport = transport.SMBTransport(445, filename = "lsarpc")
rpctransport.set_smb_connection(self.__smbConnection)
return rpctransport.get_dce_rpc()

def resolve_sids(self, sids: set[str]) -> None:
unresolved_sids = list(sids.difference(self.cache.keys()))
if len(unresolved_sids) == 0:
return
resp = lsad.hLsarOpenPolicy2(self.__dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)
policyHandle = resp['PolicyHandle']
try:
resp = lsat.hLsarLookupSids(self.__dce, policyHandle, unresolved_sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)
except DCERPCSessionError as err:
#Catch error when some could not be resolved:
if err.error_code == STATUS_SOME_NOT_MAPPED and err.packet != None:
resp = err.packet
elif err.error_code == STATUS_NONE_MAPPED:
return
else:
raise err
for i, item in enumerate(resp['TranslatedNames']['Names']):
domain = resp['ReferencedDomains']['Domains'][item['DomainIndex']]['Name']
if len(item['Name']) == 0:
domain = domain + "\\" if len(domain) != 0 else ""
cur_sid = f"{domain}Unknown (RID={unresolved_sids[i].split('-')[-1]})"
elif len(domain) == 0:
cur_sid = item['Name']
else:
cur_sid = "{}\\{}".format(domain, item['Name'])
self.cache[unresolved_sids[i]] = cur_sid

def get_sid(self, sid: str) -> str:
if not sid in self.cache:
self.resolve_sids({sid})
return self.cache.get(sid) or sid
Loading

0 comments on commit 0a42a34

Please sign in to comment.