Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement] Added acls command, show share ACLs #118

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions smbclientng/core/CommandCompleter.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class CommandCompleter(object):
"subcommands": [],
"autocomplete": ["remote_directory"]
},
"acls": {
"description": [
"List ACLs of files and folders in cwd.",
"Syntax: 'acls'"
],
"subcommands": [],
"autocomplete": ["remote_directory"]
},
"ltree": {
"description": [
"Displays a tree view of the local directories.",
Expand Down
103 changes: 79 additions & 24 deletions smbclientng/core/InteractiveShell.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import datetime
import impacket
from importlib import import_module
import impacket.smbconnection
from impacket.smb3structs import *
import ntpath
import os
import readline
Expand Down Expand Up @@ -201,6 +203,10 @@ def process_line(self, commandLine):
elif command in ["ls", "dir"]:
self.command_ls(arguments, command)

# List directory contents in a share
elif command in ["acls"]:
self.command_acls(arguments, command)

# Shows the content of a local file
elif command == "lcat":
self.command_lcat(arguments, command)
Expand Down Expand Up @@ -753,7 +759,41 @@ def command_ls(self, arguments, command):

if len(arguments) > 1:
self.logger.print()


@active_smb_connection_needed
@smb_share_is_set
def command_acls(self, arguments: list[str], command: str):
# Command arguments required : No
# Active SMB connection needed : Yes
# SMB share needed : Yes

if len(arguments) == 0:
arguments = ['.']

smbClient = self.sessionsManager.current_session.smbClient
sharename = self.sessionsManager.current_session.smb_share
foldername = self.sessionsManager.current_session.smb_cwd
tree_id = smbClient.connectTree(sharename)

for entry in smbClient.listPath(sharename, ntpath.join(foldername, '*')):
self.logger.print(windows_ls_entry(entry, self.config))
filename = entry.get_longname()

if filename in [".",".."]:
continue
filename = ntpath.join(foldername, filename)
try:
file_id = smbClient.getSMBServer().create(tree_id, filename, READ_CONTROL | FILE_READ_ATTRIBUTES, 0, FILE_DIRECTORY_FILE if entry.is_directory() else FILE_NON_DIRECTORY_FILE, FILE_OPEN, 0)
except Exception as err:
self.logger.debug(f"Could not get attributes for file {filename}: {str(err)}")
continue

file_info = smbClient.getSMBServer().queryInfo(tree_id, file_id, infoType=SMB2_0_INFO_SECURITY, fileInfoClass=SMB2_SEC_INFO_00, additionalInformation=OWNER_SECURITY_INFORMATION | DACL_SECURITY_INFORMATION | GROUP_SECURITY_INFORMATION, flags=0)

self.sessionsManager.current_session.printSecurityDescriptorTable(file_info, filename)

self.logger.print()

@command_arguments_required
@active_smb_connection_needed
@smb_share_is_set
Expand Down Expand Up @@ -970,8 +1010,6 @@ def command_sizeof(self, arguments, command):
self.logger.error(f"Failed to access '{path}': {e}")
except (BrokenPipeError, KeyboardInterrupt):
self.logger.error("Interrupted.")
self.close_smb_session()
self.init_smb_session()
return
except Exception as e:
self.logger.error(f"Error while processing '{path}': {e}")
Expand All @@ -994,17 +1032,18 @@ def command_shares(self, arguments, command):
do_check_rights = True
test_write = False

self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.")
self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.")
self.logger.print("| Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
while user_response.lower().strip() not in ['y', 'n']:
self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='')
if do_check_rights:
self.logger.print("WARNING: Checking WRITE access to shares in offensive tools implies creating a folder and trying to delete it.")
self.logger.print("| If you have CREATE_CHILD rights but no DELETE_CHILD rights, the folder cannot be deleted and will remain on the target.")
self.logger.print("| Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
if user_response.lower().strip() == 'y':
test_write = True
while user_response.lower().strip() not in ['y', 'n']:
self.logger.print("| Invalid response, Do you want to continue? [N/y] ", end='')
user_response = input()
self.logger.write_to_logfile(user_response)
if user_response.lower().strip() == 'y':
test_write = True

shares = self.sessionsManager.current_session.list_shares()
if len(shares.keys()) != 0:
Expand All @@ -1016,6 +1055,10 @@ def command_shares(self, arguments, command):
if do_check_rights:
table.add_column("Rights")

security_descriptor = list(shares.values())[0].get("security_descriptor")
if security_descriptor is not None:
table.add_column("Security Descriptor")

for sharename in sorted(shares.keys()):
types = ', '.join([s.replace("STYPE_","") for s in shares[sharename]["type"]])

Expand All @@ -1032,22 +1075,34 @@ def command_shares(self, arguments, command):
str_comment = "[bold bright_yellow]" + shares[sharename]["comment"] + "[/bold bright_yellow]"

if do_check_rights:
access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write)
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
if access_rights["readable"] and access_rights["writable"]:
str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]"
elif access_rights["readable"]:
str_access_rights = "[bold green]READ[/bold green]"
elif access_rights["writable"]:
# Without READ?? This should not happen IMHO
str_access_rights = "[bold red]WRITE[/bold red]"
else:
try:
access_rights = self.sessionsManager.current_session.test_rights(sharename=shares[sharename]["name"], test_write=test_write)
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
if access_rights["readable"] and access_rights["writable"]:
str_access_rights = "[bold green]READ[/bold green], [bold red]WRITE[/bold red]"
elif access_rights["readable"]:
str_access_rights = "[bold green]READ[/bold green]"
elif access_rights["writable"]:
# Without READ?? This should not happen IMHO
str_access_rights = "[bold red]WRITE[/bold red]"
else:
str_access_rights = "[bold yellow]NO ACCESS[/bold yellow]"
except:
str_access_rights = ""

if security_descriptor is not None:
sd_table = self.sessionsManager.current_session.securityDescriptorTable(b''.join(shares[sharename].get("security_descriptor")), "sharename", prefix="", table_colors=True)

if do_check_rights:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights)
if security_descriptor is not None:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights, sd_table)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment, str_access_rights)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment)
if security_descriptor is not None:
table.add_row(str_sharename, str_hidden, str_types, str_comment, sd_table)
else:
table.add_row(str_sharename, str_hidden, str_types, str_comment)

Console().print(table)
else:
Expand Down
57 changes: 57 additions & 0 deletions smbclientng/core/SIDResolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations
from impacket.dcerpc.v5 import transport, lsat, lsad, rpcrt
from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED
from impacket.dcerpc.v5.lsat import DCERPCSessionError
from impacket.nt_errors import STATUS_SOME_NOT_MAPPED, STATUS_NONE_MAPPED
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from impacket.smbconnection import SMBConnection

class SIDResolver:
def __init__(self, smbConnection: SMBConnection):
self.__smbConnection = smbConnection
self.__dce = self.__get_lsarpc_binding()
self.__dce.connect()
self.__dce.bind(lsat.MSRPC_UUID_LSAT)
self.cache = dict()

def close(self):
self.__dce.disconnect()

def __get_lsarpc_binding(self) -> rpcrt.DCERPC_v5:
rpctransport = transport.SMBTransport(445, filename = "lsarpc")
rpctransport.set_smb_connection(self.__smbConnection)
return rpctransport.get_dce_rpc()

def resolve_sids(self, sids: set[str]) -> None:
unresolved_sids = list(sids.difference(self.cache.keys()))
if len(unresolved_sids) == 0:
return
resp = lsad.hLsarOpenPolicy2(self.__dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)
policyHandle = resp['PolicyHandle']
try:
resp = lsat.hLsarLookupSids(self.__dce, policyHandle, unresolved_sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)
except DCERPCSessionError as err:
#Catch error when some could not be resolved:
if err.error_code == STATUS_SOME_NOT_MAPPED and err.packet != None:
resp = err.packet
elif err.error_code == STATUS_NONE_MAPPED:
return
else:
raise err
for i, item in enumerate(resp['TranslatedNames']['Names']):
domain = resp['ReferencedDomains']['Domains'][item['DomainIndex']]['Name']
if len(item['Name']) == 0:
domain = domain + "\\" if len(domain) != 0 else ""
cur_sid = f"{domain}Unknown (RID={unresolved_sids[i].split('-')[-1]})"
elif len(domain) == 0:
cur_sid = item['Name']
else:
cur_sid = "{}\\{}".format(domain, item['Name'])
self.cache[unresolved_sids[i]] = cur_sid

def get_sid(self, sid: str) -> str:
if not sid in self.cache:
self.resolve_sids({sid})
return self.cache.get(sid) or sid
Loading
Loading