diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 039805d..b3ba36f 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -17,10 +17,10 @@ jobs: - name: Checkout pysap uses: actions/checkout@v3 - - name: Setup Python 3.10 + - name: Setup Python 3.12 uses: actions/setup-python@v4 with: - python-version: "3.10" + python-version: "3.12" - name: Install Python dependencies run: | @@ -46,7 +46,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: ["3.7", "3.8", "3.9", "3.10"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.12"] experimental: [false] continue-on-error: ${{ matrix.experimental }} @@ -83,7 +83,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.10"] + python-version: ["3.12"] steps: - name: Checkout pysap diff --git a/docs/protocols/SAPRouter.ipynb b/docs/protocols/SAPRouter.ipynb index 2ad10a2..fe6f482 100755 --- a/docs/protocols/SAPRouter.ipynb +++ b/docs/protocols/SAPRouter.ipynb @@ -129,7 +129,7 @@ "outputs": [], "source": [ "router_string = [SAPRouterRouteHop(hostname=\"8.8.8.8\", port=3299),\n", - " SAPRouterRouteHop(hostname=\"10.0.0.1\", port=3200, password=\"S3cr3t\")]\n", + " SAPRouterRouteHop(hostname=\"127.0.0.1\", port=3200, password=\"S3cr3t\")]\n", "router_string_lens = map(len, map(str, router_string))\n", "p = SAPRouter(type=SAPRouter.SAPROUTER_ROUTE,\n", " route_entries=len(router_string),\n", diff --git a/examples/diag_capturer.py b/examples/diag_capturer.py index ac3db78..59db1c7 100755 --- a/examples/diag_capturer.py +++ b/examples/diag_capturer.py @@ -106,7 +106,7 @@ def parse_fields(self, pkt): if atom.etype in [121, 122, 123, 130, 131, 132]: text = atom.field1_text or atom.field2_text text = text.strip() - if "@\Q" in text: + if "@\\Q" in text: parts = text.split("@") try: text = "%s (hint: %s)" % (parts[2], parts[1]) diff --git a/examples/diag_login_brute_force.py b/examples/diag_login_brute_force.py index a0dda54..a52237d 100755 --- a/examples/diag_login_brute_force.py +++ b/examples/diag_login_brute_force.py @@ -19,7 +19,7 @@ # Standard imports import logging -from string import letters +from string import ascii_letters from random import choice from argparse import ArgumentParser # External imports @@ -124,7 +124,7 @@ def make_login(username, password, client): def get_rand(length): - return ''.join(choice(letters) for _ in range(length)) + return ''.join(choice(ascii_letters) for _ in range(length)) def is_duplicate_login(response): diff --git a/examples/diag_render_login_screen.py b/examples/diag_render_login_screen.py index 30305ba..72171b1 100755 --- a/examples/diag_render_login_screen.py +++ b/examples/diag_render_login_screen.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 # encoding: utf-8 # pysap - Python library for crafting SAP's network protocols packets # @@ -21,9 +21,11 @@ import logging from collections import defaultdict from argparse import ArgumentParser + # External imports from scapy.config import conf from scapy.packet import bind_layers + # Custom imports import pysap from pysap.SAPNI import SAPNI @@ -31,13 +33,10 @@ from pysap.SAPDiag import SAPDiag, SAPDiagDP from pysap.SAPDiagClient import SAPDiagConnection -# Try to import wx for failing gracefully if not found -try: - import wx # TODO: Change wx to Tkinter - has_wx = True -except ImportError: - has_wx = False - +# Tkinter imports +import tkinter as tk +from tkinter import ttk +from tkinter import messagebox # Bind the SAPDiag layer bind_layers(SAPNI, SAPDiag,) @@ -46,33 +45,24 @@ bind_layers(SAPDiag, SAPDiagItem,) bind_layers(SAPDiagItem, SAPDiagItem,) - # Set the verbosity to 0 conf.verb = 0 - # Command line options parser def parse_options(): - - description = "This example script renders the login screen provided by an SAP Netweaver Application Server using "\ - "wxPython." - + description = "This example script renders the login screen provided by an SAP Netweaver Application Server using Tkinter." usage = "%(prog)s [options] -d " parser = ArgumentParser(usage=usage, description=description, epilog=pysap.epilog) target = parser.add_argument_group("Target") - target.add_argument("-d", "--remote-host", dest="remote_host", - help="Remote host") - target.add_argument("-p", "--remote-port", dest="remote_port", type=int, default=3200, - help="Remote port [%(default)d]") - target.add_argument("--route-string", dest="route_string", - help="Route string for connecting through a SAP Router") + target.add_argument("-d", "--remote-host", dest="remote_host", help="Remote host") + target.add_argument("-p", "--remote-port", dest="remote_port", type=int, default=3200, help="Remote port [%(default)d]") + target.add_argument("--route-string", dest="route_string", help="Route string for connecting through a SAP Router") misc = parser.add_argument_group("Misc options") misc.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Verbose output") - misc.add_argument("--terminal", dest="terminal", default=None, - help="Terminal name") + misc.add_argument("--terminal", dest="terminal", default=None, help="Terminal name") options = parser.parse_args() @@ -81,87 +71,92 @@ def parse_options(): return options +class DiagScreen(tk.Tk): + def __init__(self, windows_title, height, width, session_title, dbname, cpuname): + super().__init__() -class DiagScreen(wx.Frame): - def __init__(self, parent, windows_title, height, width, session_title, dbname, cpuname): - wx.Frame.__init__(self, parent, title=windows_title) - - self.maincontainer = wx.BoxSizer(wx.VERTICAL) - - self.session_title = wx.StaticBox(self, label=session_title) + self.title(windows_title) + self.geometry(f"{width}x{height}") - self.container = wx.StaticBoxSizer(self.session_title, wx.VERTICAL) - self.maincontainer.Add(self.container, flag=wx.EXPAND | wx.ALL, border=10) + self.session_title = ttk.Label(self, text=session_title) + self.session_title.pack(pady=10) - self.buttonbar = wx.ToolBar(self) - self.container.Add(self.buttonbar, flag=wx.EXPAND | wx.ALL, border=10) + self.content = ttk.Frame(self) + self.content.pack(expand=True, fill=tk.BOTH, padx=10, pady=10) - self.content = wx.GridBagSizer() - self.container.Add(self.content) - self.SetSizer(self.container) + self.buttonbar = ttk.Frame(self) + self.buttonbar.pack(fill=tk.X, padx=10, pady=5) - self.menubar = wx.MenuBar() - self.SetMenuBar(self.menubar) + self.menubar = tk.Menu(self) + self.config(menu=self.menubar) - self.toolbar = self.CreateToolBar() - self.toolbar.Realize() + self.statusbar = ttk.Label(self, text=f"{dbname} | {cpuname}") + self.statusbar.pack(side=tk.BOTTOM, fill=tk.X) - self.statusbar = self.CreateStatusBar() - self.statusbar.SetFields(["", dbname, cpuname]) - - self.menus = defaultdict(defaultdict) + self.menus = defaultdict(dict) def add_text(self, x, y, maxlength, text, tooltip=None): - text_control = wx.StaticText(self, label=text) + label = ttk.Label(self.content, text=text) + label.grid(row=y, column=x, padx=5, pady=5, sticky='w') if tooltip: - text_control.SetTooltip(tooltip) - self.content.Add(text_control, pos=(y, x), flag=wx.TOP | wx.LEFT | wx.BOTTOM, border=5) + ToolTip(label, tooltip) def add_text_box(self, x, y, maxlength, text, invisible=0): + entry = ttk.Entry(self.content) + entry.grid(row=y, column=x, padx=5, pady=5) + entry.insert(0, text) if invisible: - textbox_control = wx.TextCtrl(self, style=wx.TE_PASSWORD) - else: - textbox_control = wx.TextCtrl(self) - textbox_control.SetMaxLength(maxlength) - textbox_control.SetValue(text) - self.content.Add(textbox_control, pos=(y, x), flag=wx.TOP | wx.LEFT | wx.BOTTOM, border=5) + entry.config(show="*") def add_button(self, text): - button = wx.Button(self.buttonbar, wx.ID_ANY, text) - self.buttonbar.AddControl(button) - - def add_toolbar(self, text): - toolbar = wx.Button(self.toolbar, wx.ID_ANY, text) - self.toolbar.AddControl(toolbar) + ttk.Button(self.buttonbar, text=text).pack(side=tk.LEFT, padx=2, pady=2) def add_menu(self, pos1, text): - self.menus[pos1][0] = wx.Menu() - self.menubar.Append(self.menus[pos1][0], text) + menu = tk.Menu(self.menubar, tearoff=0) + self.menubar.add_cascade(label=text, menu=menu) + self.menus[pos1][0] = menu def add_child_menu(self, text, pos1, pos2=0, pos3=0, pos4=0, sel=0, men=0, sep=0): - # XXX: Support menus of level 4, need to use another structure for storing the menus and their handles if pos4 > 0: return if sep: - self.menus[pos1][0].AppendSeparator() + self.menus[pos1][0].add_separator() else: if men: - self.menus[pos1][pos2] = wx.Menu() - item = self.menus[pos1][0].AppendMenu(wx.ID_ANY, text, self.menus[pos1][pos2]) + submenu = tk.Menu(self.menus[pos1][0], tearoff=0) + self.menus[pos1][0].add_cascade(label=text, menu=submenu) + self.menus[pos1][pos2] = submenu else: if pos3 > 0: - item = self.menus[pos1][pos2].Append(wx.ID_ANY, text) + self.menus[pos1][pos2].add_command(label=text, state=tk.NORMAL if sel == 1 else tk.DISABLED) else: - item = self.menus[pos1][0].Append(wx.ID_ANY, text) - item.Enable(sel == 1) - + self.menus[pos1][0].add_command(label=text, state=tk.NORMAL if sel == 1 else tk.DISABLED) + +class ToolTip: + def __init__(self, widget, text): + self.widget = widget + self.text = text + self.widget.bind("", self.enter) + self.widget.bind("", self.leave) + self.tooltip = None + + def enter(self, event=None): + x = y = 0 + x, y, _, _ = self.widget.bbox("insert") + x += self.widget.winfo_rootx() + 25 + y += self.widget.winfo_rooty() + 25 + self.tooltip = tk.Toplevel(self.widget) + self.tooltip.wm_overrideredirect(True) + self.tooltip.wm_geometry(f"+{x}+{y}") + label = ttk.Label(self.tooltip, text=self.text, background="#ffffe0", relief="solid", borderwidth=1) + label.pack(ipadx=1) + + def leave(self, event=None): + if self.tooltip: + self.tooltip.destroy() + self.tooltip = None def render_diag_screen(screen, verbose): - """ - Renders the Dynt Atom items of a message - - """ - def get_item_value(screen, item_type, item_id, item_sid, i=0): item = screen.get_item(item_type, item_id, item_sid) if item: @@ -181,15 +176,14 @@ def get_item_value(screen, item_type, item_id, item_sid, i=0): toolbars = get_item_value(screen, "APPL4", "MNUENTRY", "MENU_KYB") if verbose: - print("[*] DB Name: " + dbname) - print("[*] CPU Name: " + cpuname) - print("[*] Client: " + client) - print("[*] Session Icon: " + session_icon) - print("[*] Session Title: " + session_title) - print("[*] Window Size: " + areasize.window_height + " x " + areasize.window_width) + print(f"[*] DB Name: {dbname}") + print(f"[*] CPU Name: {cpuname}") + print(f"[*] Client: {client}") + print(f"[*] Session Icon: {session_icon}") + print(f"[*] Session Title: {session_title}") + print(f"[*] Window Size: {areasize.window_height} x {areasize.window_width}") - app = wx.App(False) - login_frame = DiagScreen(None, "%s (%s)" % (session_icon, client), areasize.window_height, areasize.window_width, session_title, dbname, cpuname) + app = DiagScreen(f"{session_icon} ({client})", areasize.window_height, areasize.window_width, session_title, dbname, cpuname) # Render the atoms (control boxes and labels) atoms = screen.get_item(["APPL", "APPL4"], "DYNT", "DYNT_ATOM") @@ -207,18 +201,18 @@ def get_item_value(screen, item_type, item_id, item_sid, i=0): if text is not None: if atom_item.etype in [123, 132]: # DIAG_DGOTYP_KEYWORD_1 or DIAG_DGOTYP_KEYWORD_2 - if text.find("@\Q") >= 0: + if text.find("@\\Q") >= 0: tooltip = text.split("@")[1][2:] text = text.split("@")[2] else: tooltip = None if verbose: - print("[*] Found text label at %d,%d: \"%s\" (maxlength=%d) (tooltip=\"%s\")" % (atom_item.col, atom_item.row, text.strip(), maxnrchars, tooltip)) - login_frame.add_text(atom_item.col, atom_item.row, maxnrchars, text) + print(f"[*] Found text label at {atom_item.col},{atom_item.row}: \"{text.strip()}\" (maxlength={maxnrchars}) (tooltip=\"{tooltip}\")") + app.add_text(atom_item.col, atom_item.row, maxnrchars, text, tooltip) elif atom_item.etype in [121, 130]: # DIAG_DGOTYP_EFIELD_1 or DIAG_DGOTYP_EFIELD_2 if verbose: - print("[*] Found text box at %d,%d: \"%s\" (maxlength=%d)" % (atom_item.col, atom_item.row, text.strip(), maxnrchars)) - login_frame.add_text_box(atom_item.col, atom_item.row, maxnrchars, text.strip(), atom_item.attr_DIAG_BSD_INVISIBLE == 1) + print(f"[*] Found text box at {atom_item.col},{atom_item.row}: \"{text.strip()}\" (maxlength={maxnrchars})") + app.add_text_box(atom_item.col, atom_item.row, maxnrchars, text.strip(), atom_item.attr_DIAG_BSD_INVISIBLE == 1) else: print("[*] Found label without text") @@ -226,47 +220,40 @@ def get_item_value(screen, item_type, item_id, item_sid, i=0): if menus: for menu in menus.entries: if verbose: - print("[*] Found menu item: \"%s\"" % menu.text) - login_frame.add_menu(menu.position_1, menu.text) + print(f"[*] Found menu item: \"{menu.text}\"") + app.add_menu(menu.position_1, menu.text) # Render the submenus if menudetails: for menu in menudetails.entries: if verbose: - print("[*] Found child menu item: \"%s\", pos %d, %d, %d, %d" % (menu.text, menu.position_1, menu.position_2, menu.position_3, menu.position_4)) - login_frame.add_child_menu(menu.text, menu.position_1, menu.position_2, menu.position_3, menu.position_4, menu.flag_TERM_SEL, menu.flag_TERM_MEN, menu.flag_TERM_SEP) + print(f"[*] Found child menu item: \"{menu.text}\", pos {menu.position_1}, {menu.position_2}, {menu.position_3}, {menu.position_4}") + app.add_child_menu(menu.text, menu.position_1, menu.position_2, menu.position_3, menu.position_4, menu.flag_TERM_SEL, menu.flag_TERM_MEN, menu.flag_TERM_SEP) # Render the buttonbar if buttonbars: for button in buttonbars.entries: if verbose: - print("[*] Found button item: \"%s\"" % button.text) - login_frame.add_button(button.text) + print(f"[*] Found button item: \"{button.text}\"") + app.add_button(button.text) # Render the toolbar if toolbars: for toolbar in toolbars.entries: if verbose: - print("[*] Found toolbar item: \"%s\"" % toolbar.text) - login_frame.add_toolbar(toolbar.text) - - login_frame.Show(True) - app.MainLoop() + print(f"[*] Found toolbar item: \"{toolbar.text}\"") + # Note: Toolbar rendering is not implemented in this version + app.mainloop() -# Main function def main(): options = parse_options() - if not has_wx: - print("[-] Required library not found. Please install it from https://wxpython.org/") - return - if options.verbose: logging.basicConfig(level=logging.DEBUG) # Create the connection to the SAP Netweaver server - print("[*] Connecting to %s port %d" % (options.remote_host, options.remote_port)) + print(f"[*] Connecting to {options.remote_host} port {options.remote_port}") connection = SAPDiagConnection(options.remote_host, options.remote_port, terminal=options.terminal, @@ -281,6 +268,9 @@ def main(): # Close the connection connection.close() - if __name__ == "__main__": - main() + try: + main() + except KeyboardInterrupt: + print("[*] Canceled by the user ...") + exit(0) \ No newline at end of file diff --git a/pysap/SAPCAR.py b/pysap/SAPCAR.py index 68ea5d7..53b2f6e 100644 --- a/pysap/SAPCAR.py +++ b/pysap/SAPCAR.py @@ -28,7 +28,7 @@ from scapy.packet import Packet from scapy.fields import (ByteField, ByteEnumField, LEIntField, FieldLenField, PacketField, StrFixedLenField, PacketListField, - ConditionalField, LESignedIntField, StrField, LELongField) + ConditionalField, LESignedIntField, StrField, LELongField, MultipleTypeField) # Custom imports from pysap.utils.fields import (PacketNoPadded, StrNullFixedLenField, PacketListStopField) from pysapcompress import (decompress, compress, ALG_LZH, CompressError, @@ -59,9 +59,15 @@ class SAPCARCompressedBlobFormat(PacketNoPadded): ByteEnumField("algorithm", 0x12, {0x12: "LZH", 0x10: "LZC"}), StrFixedLenField("magic_bytes", b"\x1f\x9d", 2), ByteField("special", 2), - ConditionalField(StrField("blob", None, remain=4), lambda x: x.compressed_length <= 8), - ConditionalField(StrFixedLenField("blob", None, length_from=lambda x: x.compressed_length - 8), - lambda x: x.compressed_length > 8), + MultipleTypeField( + [ + (StrField("blob", None, remain=4), + lambda pkt: pkt.compressed_length <= 8), + (StrFixedLenField("blob", None, length_from=lambda pkt: pkt.compressed_length - 8), + lambda pkt: pkt.compressed_length > 8), + ], + StrField("blob", None) + ) ] @@ -582,7 +588,7 @@ def open(self, enforce_checksum=False): # Validate the checksum if required if enforce_checksum: - if checksum != self.calculate_checksum(out_file.getvalue()): + if checksum != self.calculate_checksum(out_file.getvalue().decode()): raise SAPCARInvalidChecksumException("Invalid checksum found") out_file.seek(0) @@ -690,7 +696,7 @@ def version(self, version): if version not in list(sapcar_archive_file_versions.keys()): raise ValueError("Invalid version") # If version is different, we should convert each file - if version != self._sapcar.version.decode(): + if version != self._sapcar.version: fils = [] for fil in list(self.files.values()): new_file = SAPCARArchiveFile.from_archive_file(fil, version=version) diff --git a/pysap/SAPCredv2.py b/pysap/SAPCredv2.py index d549093..96aad26 100644 --- a/pysap/SAPCredv2.py +++ b/pysap/SAPCredv2.py @@ -48,7 +48,7 @@ log_cred = logging.getLogger("pysap.cred") -cred_key_fmt = "240657rsga&/%srwthgrtawe45hhtrtrsr35467b2dx3456j67mv67f89656f75" +cred_key_fmt = b"240657rsga&/%srwthgrtawe45hhtrtrsr35467b2dx3456j67mv67f89656f75" """Fixed key embedded in CommonCryptoLib for encrypted credentials""" @@ -170,14 +170,19 @@ def lps_type_str(self): @property def cipher_format_version(self): cipher = self.cipher.val_readable - if len(cipher) >= 36 and ord(cipher[0]) in [0, 1]: - return ord(cipher[0]) + if len(cipher) >= 36: + first_byte = cipher[0] + if isinstance(first_byte, int): + return first_byte if first_byte in [0, 1] else 0 + elif isinstance(first_byte, str): + return ord(first_byte) if ord(first_byte) in [0, 1] else 0 return 0 @property def cipher_algorithm(self): if self.cipher_format_version == 1: - return ord(self.cipher.val_readable[1]) + second_byte = self.cipher.val_readable[1] + return ord(second_byte) if isinstance(second_byte, str) else second_byte return 0 def decrypt(self, username): @@ -213,7 +218,7 @@ def decrypt_simple(self, username): # Construct the key using the key format and the username key = (cred_key_fmt % username)[:24] # Set empty IV - iv = "\x00" * 8 + iv = b"\x00" * 8 # Decrypt the cipher text with the derived key and IV decryptor = Cipher(algorithms.TripleDES(key), modes.CBC(iv), backend=default_backend()).decryptor() @@ -225,12 +230,15 @@ def xor(self, string, start): """XOR a given string using a fixed key and a starting number.""" key = 0x15a4e35 x = start - y = "" + result = bytearray() for c in string: x *= key x += 1 - y += chr(ord(c) ^ (x & 0xff)) - return y + if isinstance(c, int): + result.append(c ^ (x & 0xff)) + else: + result.append(ord(c) ^ (x & 0xff)) + return bytes(result) def derive_key(self, key, blob, header, username): """Derive a key using SAP's algorithm. The key is derived using SHA256 and xor from an @@ -240,10 +248,22 @@ def derive_key(self, key, blob, header, username): digest.update(key) digest.update(blob[0:4]) digest.update(header.salt) - digest.update(self.xor(username, ord(header.salt[0]))) - digest.update("" * 0x20) + + # Handle both string and integer types for salt[0] + salt_value = header.salt[0] + if isinstance(salt_value, str): + salt_value = ord(salt_value) + + digest.update(self.xor(username, salt_value)) + digest.update(b"" * 0x20) hashed = digest.finalize() - derived_key = self.xor(hashed, ord(header.salt[1])) + + # Handle both string and integer types for salt[1] + salt_value = header.salt[1] + if isinstance(salt_value, str): + salt_value = ord(salt_value) + + derived_key = self.xor(hashed, salt_value) # Validate and select proper algorithm if header.algorithm == CIPHER_ALGORITHM_3DES: @@ -340,7 +360,8 @@ def pse_file_path(self): @property def lps_type(self): - return ord(self.cipher.val_readable[1]) + value = self.cipher.val_readable[1] + return value if isinstance(value, int) else ord(value) @property def lps_type_str(self): @@ -352,7 +373,8 @@ def lps_type_str(self): @property def cipher_format_version(self): - return ord(self.cipher.val_readable[0]) + value = self.cipher.val_readable[0] + return value if isinstance(value, int) else ord(value) @property def cipher_algorithm(self): @@ -377,7 +399,11 @@ def decrypt(self, username=None): plain = cipher.decrypt() # Get the pin from the raw data - plain_size = ord(plain[0]) + if isinstance(plain[0], int): + plain_size = plain[0] + else: + plain_size = ord(plain[0]) + pin = plain[plain_size + 1:] # Create a plain credential container diff --git a/pysap/SAPDiag.py b/pysap/SAPDiag.py index d969685..80c0e4b 100644 --- a/pysap/SAPDiag.py +++ b/pysap/SAPDiag.py @@ -62,16 +62,16 @@ class SAPDiagDP(Packet): ByteField("new_stat", 0), SignedIntField("unused1", -1), SignedShortField("rq_id", -1), - StrFixedLenField("unused2", "\x20" * 40, 40), - StrFixedLenField("terminal", "\x00" * 15, 15), - StrFixedLenField("unused3", "\x00" * 10, 10), - StrFixedLenField("unused4", "\x20" * 20, 20), + StrFixedLenField("unused2", b"\x20" * 40, 40), + StrFixedLenField("terminal", b"\x00" * 15, 15), + StrFixedLenField("unused3", b"\x00" * 10, 10), + StrFixedLenField("unused4", b"\x20" * 20, 20), IntField("unused5", 0), IntField("unused6", 0), SignedIntField("unused7", -1), IntField("unused8", 0), ByteField("unused9", 0x01), - StrFixedLenField("unused10", "\x00" * 57, 57)] + StrFixedLenField("unused10", b"\x00" * 57, 57)] # Diag Item Types @@ -637,7 +637,7 @@ class SAPDiagError(PacketNoPadded): name = "SAP Diag Error" # TODO: Need to figure out the meaning of the packets fields_desc = [ - StrNullFixedLenField("msg", "**DPTMMSG**", length=12), + StrNullFixedLenField("msg", b"**DPTMMSG**", length=12), StrField("padd", None), ] diff --git a/pysap/SAPDiagItems.py b/pysap/SAPDiagItems.py index 8ff1e0c..337aaa9 100644 --- a/pysap/SAPDiagItems.py +++ b/pysap/SAPDiagItems.py @@ -314,7 +314,6 @@ class SAPDiagSupportBits(Packet): bind_diagitem(SAPDiagSupportBits, "APPL", 0x04, 0x0b) bind_diagitem(SAPDiagSupportBits, "APPL", 0x06, 0x11) - # Support Bits for common SAP Software versions # # SAPGUI 7.02 Java rev 5: ff7ffe2ddab737d674087e1305971597eff23f8d0770ff0f0000000000000000 @@ -331,14 +330,22 @@ class SAPDiagSupportBits(Packet): # SAP NetWeaver AS ABAP 7.50 SP02: ff7ffe2dd8b737f674087e9305971597ebf2bf8f4b71ff9f8606000000000000 # SAP NetWeaver AS ABAP 7.52 SP01: ff7ffa0d78b737de76186e9325b71597eb73feebdb51fd91ce24010000000000 -support_data_sapgui_701_win = SAPDiagSupportBits(unhex("ff7ffa0d78b737def6196e9325bf1593ef73feebdb5501000000000000000000")) -support_data_sapgui_702_win = SAPDiagSupportBits(unhex("ff7ffa0d78b737def6196e9325bf1593ef73feebdb51ed010000000000000000")) -support_data_sapgui_730_win = SAPDiagSupportBits(unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51ed910200000000000000")) -support_data_sapgui_740_win = SAPDiagSupportBits(unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51ed91ca00000000000000")) -support_data_sapgui_750_win = SAPDiagSupportBits(unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51fd91ce2c010000000000")) -support_data_sapgui_702_java2 = SAPDiagSupportBits(unhex("ff7ffe2ddab737d674087e1305971597eff23f8d0770ff030000000000000000")) -support_data_sapgui_702_java5 = SAPDiagSupportBits(unhex("ff7ffe2ddab737d674087e1305971597eff23f8d0770ff0f0000000000000000")) -support_data_sapgui_740_java8 = SAPDiagSupportBits(unhex("ff7ffe2ddab737f674087e9305971597eff2bf8f4f71ff9f8606000000000000")) +support_data_sapgui_701_win = SAPDiagSupportBits( + unhex("ff7ffa0d78b737def6196e9325bf1593ef73feebdb5501000000000000000000")) +support_data_sapgui_702_win = SAPDiagSupportBits( + unhex("ff7ffa0d78b737def6196e9325bf1593ef73feebdb51ed010000000000000000")) +support_data_sapgui_730_win = SAPDiagSupportBits( + unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51ed910200000000000000")) +support_data_sapgui_740_win = SAPDiagSupportBits( + unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51ed91ca00000000000000")) +support_data_sapgui_750_win = SAPDiagSupportBits( + unhex("ff7ffa0d78b737def6196e9325bf1597ef73feebdb51fd91ce2c010000000000")) +support_data_sapgui_702_java2 = SAPDiagSupportBits( + unhex("ff7ffe2ddab737d674087e1305971597eff23f8d0770ff030000000000000000")) +support_data_sapgui_702_java5 = SAPDiagSupportBits( + unhex("ff7ffe2ddab737d674087e1305971597eff23f8d0770ff0f0000000000000000")) +support_data_sapgui_740_java8 = SAPDiagSupportBits( + unhex("ff7ffe2ddab737f674087e9305971597eff2bf8f4f71ff9f8606000000000000")) support_data_sapnw_701 = SAPDiagSupportBits(unhex("ff7ffe2dd8b737d674087e1305971597ebf22f8d03300f000000000000000000")) support_data_sapnw_702 = SAPDiagSupportBits(unhex("ff7ffe2dd8b737d674087e1305971597ebf23f8d0370ff0f0000000000000000")) support_data_sapnw_750 = SAPDiagSupportBits(unhex("ff7ffe2dd8b737f674087e9305971597ebf2bf8f4b71ff9f8606000000000000")) @@ -349,7 +356,6 @@ class SAPDiagSupportBits(Packet): item_sid="SUPPORTDATA", item_value=support_data_sapgui_702_java5) - # Dynt Atom item types diag_atom_etypes = { 101: "DIAG_DGOTYP_EFIELD", @@ -402,61 +408,70 @@ class SAPDiagDyntAtomItem(PacketNoPadded): ShortField("col", 0), # Attr flags BitField("attr_DIAG_BSD_COMBOSTYLE", 0, 1), # 80 - BitField("attr_DIAG_BSD_YES3D", 0, 1), # 40 - BitField("attr_DIAG_BSD_PROPFONT", 0, 1), # 20 - BitField("attr_DIAG_BSD_MATCHCODE", 0, 1), # 10 - BitField("attr_DIAG_BSD_JUSTRIGHT", 0, 1), # 08 - BitField("attr_DIAG_BSD_INTENSIFY", 0, 1), # 04 - BitField("attr_DIAG_BSD_INVISIBLE", 0, 1), # 02 - BitField("attr_DIAG_BSD_PROTECTED", 0, 1), # 01 + BitField("attr_DIAG_BSD_YES3D", 0, 1), # 40 + BitField("attr_DIAG_BSD_PROPFONT", 0, 1), # 20 + BitField("attr_DIAG_BSD_MATCHCODE", 0, 1), # 10 + BitField("attr_DIAG_BSD_JUSTRIGHT", 0, 1), # 08 + BitField("attr_DIAG_BSD_INTENSIFY", 0, 1), # 04 + BitField("attr_DIAG_BSD_INVISIBLE", 0, 1), # 02 + BitField("attr_DIAG_BSD_PROTECTED", 0, 1), # 01 # DIAG_DGOTYP_FNAME - ConditionalField(StrLenField("name_text", "", length_from=lambda pkt:pkt.atom_length - 13), lambda pkt:pkt.etype == 114), + ConditionalField(StrLenField("name_text", "", length_from=lambda pkt: pkt.atom_length - 13), + lambda pkt: pkt.etype == 114), # DIAG_DGOTYP_PUSHBUTTON_2 */ - ConditionalField(ByteField("pushbutton_v_length", 0), lambda pkt:pkt.etype in [115]), - ConditionalField(ByteField("pushbutton_v_height", 0), lambda pkt:pkt.etype in [115]), - ConditionalField(ShortField("pushbutton_function_code_offset", 0), lambda pkt:pkt.etype in [115]), - ConditionalField(ShortField("pushbutton_text_offset", 0), lambda pkt:pkt.etype in [115]), - ConditionalField(StrField("pushbutton_text", ""), lambda pkt:pkt.etype in [115]), - ConditionalField(StrField("pushbutton_function_code", ""), lambda pkt:pkt.etype in [115]), + ConditionalField(ByteField("pushbutton_v_length", 0), lambda pkt: pkt.etype in [115]), + ConditionalField(ByteField("pushbutton_v_height", 0), lambda pkt: pkt.etype in [115]), + ConditionalField(ShortField("pushbutton_function_code_offset", 0), lambda pkt: pkt.etype in [115]), + ConditionalField(ShortField("pushbutton_text_offset", 0), lambda pkt: pkt.etype in [115]), + ConditionalField(StrField("pushbutton_text", ""), lambda pkt: pkt.etype in [115]), + ConditionalField(StrField("pushbutton_function_code", ""), lambda pkt: pkt.etype in [115]), # DIAG_DGOTYP_TABSTRIP_BUTTON - ConditionalField(ByteField("tabstripbutton_v_length", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(ByteField("tabstripbutton_v_height", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(ByteField("tabstripbutton_page_id", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(ShortField("tabstripbutton_function_code_offset", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(ShortField("tabstripbutton_text_offset", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(ShortField("tabstripbutton_id_offset", 0), lambda pkt:pkt.etype in [116]), - ConditionalField(StrNullField("tabstripbutton_text", ""), lambda pkt:pkt.etype in [116]), - ConditionalField(StrNullField("tabstripbutton_function_code", ""), lambda pkt:pkt.etype in [116]), - ConditionalField(StrNullField("tabstripbutton_id", ""), lambda pkt:pkt.etype in [116]), + ConditionalField(ByteField("tabstripbutton_v_length", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(ByteField("tabstripbutton_v_height", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(ByteField("tabstripbutton_page_id", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(ShortField("tabstripbutton_function_code_offset", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(ShortField("tabstripbutton_text_offset", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(ShortField("tabstripbutton_id_offset", 0), lambda pkt: pkt.etype in [116]), + ConditionalField(StrNullField("tabstripbutton_text", ""), lambda pkt: pkt.etype in [116]), + ConditionalField(StrNullField("tabstripbutton_function_code", ""), lambda pkt: pkt.etype in [116]), + ConditionalField(StrNullField("tabstripbutton_id", ""), lambda pkt: pkt.etype in [116]), # DIAG_DGOTYP_XMLPROP - ConditionalField(StrLenField("xmlprop_text", "", length_from=lambda pkt:pkt.atom_length - 13), lambda pkt:pkt.etype == 120), + ConditionalField(StrLenField("xmlprop_text", "", length_from=lambda pkt: pkt.atom_length - 13), + lambda pkt: pkt.etype == 120), # DIAG_DGOTYP_EFIELD_1 or DIAG_DGOTYP_OFIELD_1 or DIAG_DGOTYP_KEYWORD_1 - ConditionalField(ByteField("field1_flag1", 0), lambda pkt:pkt.etype in [121, 122, 123]), - ConditionalField(FieldLenField("field1_dlen", None, fmt="B", length_of="field1_text"), lambda pkt:pkt.etype in [121, 122, 123]), - ConditionalField(ByteField("field1_mlen", 0), lambda pkt:pkt.etype in [121, 122, 123]), - ConditionalField(ShortField("field1_maxnrchars", 0), lambda pkt:pkt.etype in [121, 122, 123]), - ConditionalField(StrLenField("field1_text", "", length_from=lambda pkt:pkt.field1_dlen), lambda pkt:pkt.etype in [121, 122, 123]), + ConditionalField(ByteField("field1_flag1", 0), lambda pkt: pkt.etype in [121, 122, 123]), + ConditionalField(FieldLenField("field1_dlen", None, fmt="B", length_of="field1_text"), + lambda pkt: pkt.etype in [121, 122, 123]), + ConditionalField(ByteField("field1_mlen", 0), lambda pkt: pkt.etype in [121, 122, 123]), + ConditionalField(ShortField("field1_maxnrchars", 0), lambda pkt: pkt.etype in [121, 122, 123]), + ConditionalField(StrLenField("field1_text", "", length_from=lambda pkt: pkt.field1_dlen), + lambda pkt: pkt.etype in [121, 122, 123]), # DIAG_DGOTYP_FRAME_1 - ConditionalField(ShortField("frame_drows", 0), lambda pkt:pkt.etype in [127]), - ConditionalField(ShortField("frame_dcols", 0), lambda pkt:pkt.etype in [127]), - ConditionalField(StrLenField("frame_text", "", length_from=lambda pkt:pkt.atom_length - 17), lambda pkt:pkt.etype in [127]), + ConditionalField(ShortField("frame_drows", 0), lambda pkt: pkt.etype in [127]), + ConditionalField(ShortField("frame_dcols", 0), lambda pkt: pkt.etype in [127]), + ConditionalField(StrLenField("frame_text", "", length_from=lambda pkt: pkt.atom_length - 17), + lambda pkt: pkt.etype in [127]), # DIAG_DGOTYP_RADIOBUTTON_3 - ConditionalField(ByteField("radiobutton_button", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(ShortField("radiobutton_visible_label_length", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(ShortField("radiobutton_event_id_off", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(ByteField("radiobutton_event_id_len", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(ShortField("radiobutton_text_off", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(ShortField("radiobutton_text_length", 0), lambda pkt:pkt.etype in [129]), - ConditionalField(StrLenField("radiobutton_text", "", length_from=lambda pkt:pkt.radiobutton_event_id_len + pkt.radiobutton_text_length), lambda pkt:pkt.etype in [129]), + ConditionalField(ByteField("radiobutton_button", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(ShortField("radiobutton_visible_label_length", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(ShortField("radiobutton_event_id_off", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(ByteField("radiobutton_event_id_len", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(ShortField("radiobutton_text_off", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(ShortField("radiobutton_text_length", 0), lambda pkt: pkt.etype in [129]), + ConditionalField(StrLenField("radiobutton_text", "", length_from=lambda + pkt: pkt.radiobutton_event_id_len + pkt.radiobutton_text_length), lambda pkt: pkt.etype in [129]), # DIAG_DGOTYP_EFIELD_2 or DIAG_DGOTYP_OFIELD_2 or DIAG_DGOTYP_KEYWORD_2 - ConditionalField(ShortField("field2_flag1", 0), lambda pkt:pkt.etype in [130, 131, 132]), - ConditionalField(FieldLenField("field2_dlen", None, fmt="B", length_of="field2_text"), lambda pkt:pkt.etype in [130, 131, 132]), - ConditionalField(ByteField("field2_mlen", 0), lambda pkt:pkt.etype in [130, 131, 132]), - ConditionalField(ShortField("field2_maxnrchars", 0), lambda pkt:pkt.etype in [130, 131, 132]), - ConditionalField(StrLenField("field2_text", "", length_from=lambda pkt:pkt.field2_dlen), lambda pkt:pkt.etype in [130, 131, 132]), + ConditionalField(ShortField("field2_flag1", 0), lambda pkt: pkt.etype in [130, 131, 132]), + ConditionalField(FieldLenField("field2_dlen", None, fmt="B", length_of="field2_text"), + lambda pkt: pkt.etype in [130, 131, 132]), + ConditionalField(ByteField("field2_mlen", 0), lambda pkt: pkt.etype in [130, 131, 132]), + ConditionalField(ShortField("field2_maxnrchars", 0), lambda pkt: pkt.etype in [130, 131, 132]), + ConditionalField(StrLenField("field2_text", "", length_from=lambda pkt: pkt.field2_dlen), + lambda pkt: pkt.etype in [130, 131, 132]), # Remaining types - ConditionalField(StrLenField("value", "", length_from=lambda pkt:pkt.atom_length - 13), lambda pkt:pkt.etype not in [114, 115, 116, 120, 121, 122, 123, 127, 129, 130, 131, 132]), + ConditionalField(StrLenField("value", "", length_from=lambda pkt: pkt.atom_length - 13), + lambda pkt: pkt.etype not in [114, 115, 116, 120, 121, 122, 123, 127, 129, 130, 131, 132]), ] def post_build(self, p, pay): @@ -517,14 +532,14 @@ class SAPDiagMenuEntry(PacketNoPadded): ByteField("position_3", 0), ByteField("position_4", 0), # Menu Entry Flags - BitField("flag_TERM_??8", 0, 1), # 80 - BitField("flag_TERM_??7", 0, 1), # 40 - BitField("flag_TERM_??6", 0, 1), # 20 + BitField("flag_TERM_8", 0, 1), # 80 "flag_TERM_??8" + BitField("flag_TERM_7", 0, 1), # 40 "flag_TERM_??7" + BitField("flag_TERM_6", 0, 1), # 20 "flag_TERM_??6" BitField("flag_TERM_VKEY", 0, 1), # 10 BitField("flag_TERM_SEP", 0, 1), # 8 BitField("flag_TERM_MEN", 0, 1), # 4 BitField("flag_TERM_SEL", 0, 1), # 2 - BitField("flag_TERM_??1", 0, 1), # 1 + BitField("flag_TERM_1", 0, 1), # 1 "flag_TERM_??1" ByteField("virtual_key", 0), ByteField("return_code_1", 0), ByteField("return_code_2", 0), @@ -557,7 +572,6 @@ class SAPDiagMenuEntries(Packet): bind_diagitem(SAPDiagMenuEntries, "APPL", 0x0b, 0x03) bind_diagitem(SAPDiagMenuEntries, "APPL", 0x0b, 0x04) - # Diag UI Event Type values diag_ui_event_type_values = { 0x01: "SELECT", @@ -626,12 +640,13 @@ class SAPDiagUIEventSource(PacketNoPadded): BitField("valid_menu_pos", 0, 1), ShortEnumKeysField("event_type", 0, diag_ui_event_type_values), ShortEnumKeysField("control_type", 0, diag_ui_event_control_values), - ConditionalField(ByteEnumKeysField("navigation_data", 0, diag_ui_event_navigation_data_values), lambda pkt:pkt.valid_navigation_data), - ConditionalField(ByteField("event_data", 0), lambda pkt:not pkt.valid_navigation_data), + ConditionalField(ByteEnumKeysField("navigation_data", 0, diag_ui_event_navigation_data_values), + lambda pkt: pkt.valid_navigation_data), + ConditionalField(ByteField("event_data", 0), lambda pkt: not pkt.valid_navigation_data), ShortField("control_row", 0), ShortField("control_col", 0), FieldLenField("container_nrs", None, count_of="containers"), - FieldListField("containers", None, ByteField("container", 0), count_from=lambda x:x.container_nrs) + FieldListField("containers", None, ByteField("container", 0), count_from=lambda x: x.container_nrs) ] diff --git a/pysap/SAPLPS.py b/pysap/SAPLPS.py index de24523..d2ddc7f 100644 --- a/pysap/SAPLPS.py +++ b/pysap/SAPLPS.py @@ -34,7 +34,7 @@ log_lps = logging.getLogger("pysap.lps") -cred_key_lps_fallback = "\xe7\x6a\xd2\xce\x4b\xa7\xc7\x9e\xf9\x79\x5f\xa8\x2e\x6e\xaa\x1d\x76\x02\x2e\xcd\xd7\x74\x38\x51" +cred_key_lps_fallback = b"\xe7\x6a\xd2\xce\x4b\xa7\xc7\x9e\xf9\x79\x5f\xa8\x2e\x6e\xaa\x1d\x76\x02\x2e\xcd\xd7\x74\x38\x51" """Fixed key embedded in CommonCryptoLib for encrypted credentials using LPS in fallback mode""" @@ -110,7 +110,7 @@ def decrypt(self): raise SAPLPSDecryptionError("Invalid LPS decryption method") # Decrypt the cipher text with the encryption key - iv = "\x00" * 16 + iv = b"\x00" * 16 decryptor = Cipher(algorithms.AES(encryption_key), modes.CBC(iv)).decryptor() plain = decryptor.update(self.encrypted_data) + decryptor.finalize() @@ -148,7 +148,7 @@ def decrypt_encryption_key_fallback(self): hmac.update(self.context) default_key = hmac.finalize()[:16] - iv = "\x00" * 16 + iv = b"\x00" * 16 decryptor = Cipher(algorithms.AES(default_key), modes.CBC(iv)).decryptor() encryption_key = decryptor.update(self.encrypted_key) + decryptor.finalize() diff --git a/pysap/SAPMS.py b/pysap/SAPMS.py index 8b39a18..34b63e4 100644 --- a/pysap/SAPMS.py +++ b/pysap/SAPMS.py @@ -23,7 +23,7 @@ IPField, ShortField, IntField, StrField, PacketListField, FieldLenField, PacketField, StrLenField, IntEnumField, ByteEnumKeysField, ShortEnumKeysField, Field, - PacketLenField, XByteField, SignedIntField) + PacketLenField, XByteField, SignedIntField, MultipleTypeField) from scapy.layers.inet6 import IP6Field # Custom imports from pysap.SAPNI import SAPNI @@ -551,19 +551,29 @@ class SAPMSAdmRecord(PacketNoPadded): to execute. """ name = "SAP Message Server Adm Record" + fields_desc = [ ByteEnumKeysField("opcode", 0x00, ms_adm_opcode_values), ByteField("serial_number", 0x00), ByteField("executed", 0x00), ByteField("errorno", 0x00), # TODO: Look for error names - ConditionalField(StrFixedLenField("record", None, 75), lambda pkt:pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x01), - ConditionalField(StrFixedLenField("record_pad", None, 25), lambda pkt:pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x01), - ConditionalField(StrFixedLenField("record", None, 100), lambda pkt:pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x00), + + MultipleTypeField( + [ + (StrFixedLenField("record", None, 75), + lambda pkt: pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x01), + (StrFixedLenField("record_pad", None, 25), + lambda pkt: pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x01), + (StrFixedLenField("record", None, 100), + lambda pkt: pkt.opcode not in [0x01, 0x15, 0x2e] and pkt.executed == 0x00), + ], + StrFixedLenField("record", None, 100) # Default field + ), # TODO: Add more opcodes fields # AD_PROFILE and AD_SHARED_PARAMETER fields - ConditionalField(StrNullFixedLenPaddedField("parameter", "", 100), lambda pkt:pkt.opcode in [0x01, 0x2e]), + ConditionalField(StrNullFixedLenPaddedField("parameter", b"", 100), lambda pkt:pkt.opcode in [0x01, 0x2e]), # AD_RZL_STRG opcode ConditionalField(ByteEnumKeysField("rzl_strg_type", 1, ms_adm_rzl_strg_type_values), lambda pkt:pkt.opcode in [0x15]), @@ -681,43 +691,43 @@ class SAPMSStat3(PacketNoPadded): ByteField("version", 0), ShortField("unused1", 0), IntField("no_requests", 0), - StrFixedLenField("no_requests_padd", "", 12), + StrFixedLenField("no_requests_padd", b"", 12), IntField("no_error", 0), - StrFixedLenField("no_error_padd", "", 4), + StrFixedLenField("no_error_padd", b"", 4), IntField("no_login", 0), - StrFixedLenField("no_login_padd", "", 4), + StrFixedLenField("no_login_padd", b"", 4), IntField("no_logout", 0), - StrFixedLenField("no_logout_padd", "", 4), + StrFixedLenField("no_logout_padd", b"", 4), IntField("no_send_by_name", 0), - StrFixedLenField("no_send_by_name_padd", "", 4), + StrFixedLenField("no_send_by_name_padd", b"", 4), IntField("no_send_by_type", 0), - StrFixedLenField("no_send_by_type_padd", "", 4), + StrFixedLenField("no_send_by_type_padd", b"", 4), IntField("no_adm_messages", 0), - StrFixedLenField("no_adm_messages_padd", "", 4), + StrFixedLenField("no_adm_messages_padd", b"", 4), IntField("no_adms", 0), - StrFixedLenField("no_amds_padd", "", 4), - StrFixedLenField("no_adm_type", "", 648), + StrFixedLenField("no_amds_padd", b"", 4), + StrFixedLenField("no_adm_type", b"", 648), IntField("no_mod_types", 0), - StrFixedLenField("no_mod_types_padd", "", 4), + StrFixedLenField("no_mod_types_padd", b"", 4), IntField("no_opcodes_rcvd", 0), - StrFixedLenField("no_opcodes_rcvd_padd", "", 4), + StrFixedLenField("no_opcodes_rcvd_padd", b"", 4), IntField("no_opcodes_send", 0), - StrFixedLenField("no_opcodes_send_padd", "", 4), + StrFixedLenField("no_opcodes_send_padd", b"", 4), IntField("no_opcodes", 0), - StrFixedLenField("no_opcode_type", "", 408), + StrFixedLenField("no_opcode_type", b"", 408), IntField("no_keepalive_send", 0), - StrFixedLenField("no_keepalive_send_padd", "", 4), + StrFixedLenField("no_keepalive_send_padd", b"", 4), IntField("no_keepalive_rcvd", 0), - StrFixedLenField("no_keepalive_rcvd_padd", "", 4), + StrFixedLenField("no_keepalive_rcvd_padd", b"", 4), IntField("no_keepalive_disc", 0), - StrFixedLenField("no_keepalive_disc_padd", "", 4), + StrFixedLenField("no_keepalive_disc_padd", b"", 4), IntField("no_bytes_read", 0), - StrFixedLenField("no_bytes_read_padd", "", 12), + StrFixedLenField("no_bytes_read_padd", b"", 12), IntField("no_bytes_written", 0), - StrFixedLenField("no_bytes_written_padd", "", 12), + StrFixedLenField("no_bytes_written_padd", b"", 12), IntField("no_clients", 0), - StrFixedLenField("sta_time", "", 30), - StrFixedLenField("act_time", "", 30), + StrFixedLenField("sta_time", b"", 30), + StrFixedLenField("act_time", b"", 30), ] @@ -728,7 +738,7 @@ class SAPMSCounter(PacketNoPadded): """ name = "SAP Message Server Counter" fields_desc = [ - StrFixedLenField("uuid", "", 40), + StrFixedLenField("uuid", b"", 40), IntField("count", 0), IntField("no", 0), ] @@ -745,13 +755,13 @@ class SAPMSLogon(PacketNoPadded): ShortField("port", 0), IPField("address", "0.0.0.0"), FieldLenField("logonname_length", None, length_of="logonname", fmt="!H"), # <= 80h bytes - StrLenField("logonname", "", length_from=lambda pkt:pkt.logonname_length), + StrLenField("logonname", b"", length_from=lambda pkt:pkt.logonname_length), FieldLenField("prot_length", None, length_of="prot", fmt="!H"), # <= 80h bytes - StrLenField("prot", "", length_from=lambda pkt:pkt.prot_length), + StrLenField("prot", b"", length_from=lambda pkt:pkt.prot_length), FieldLenField("host_length", None, length_of="host", fmt="!H"), # <= 100h bytes - StrLenField("host", "", length_from=lambda pkt:pkt.host_length), + StrLenField("host", b"", length_from=lambda pkt:pkt.host_length), FieldLenField("misc_length", None, length_of="misc", fmt="!H"), # <= 100h bytes - StrLenField("misc", "", length_from=lambda pkt:pkt.misc_length), + StrLenField("misc", b"", length_from=lambda pkt:pkt.misc_length), FieldLenField("address6_length", 16, length_of="address6", fmt="!h"), # == 16 bytes ConditionalField(IP6Field("address6", "::"), lambda pkt:pkt.address6_length > 0), ConditionalField(SignedIntField("end", -1), lambda pkt:pkt.address6_length > 0), @@ -772,7 +782,7 @@ class SAPMSProperty(Packet): ConditionalField(ShortEnumKeysField("logon", 0, ms_logon_type_values), lambda pkt:pkt.id in [0x02]), ConditionalField(StrFixedLenField("pad", None, 12), lambda pkt:pkt.id in [0x02]), ConditionalField(ShortField("len", 0), lambda pkt:pkt.id in [0x02]), - ConditionalField(StrLenField("value", "", length_from=lambda pkt: pkt.len), lambda pkt:pkt.id in [0x02]), + ConditionalField(StrLenField("value", b"", length_from=lambda pkt: pkt.len), lambda pkt:pkt.id in [0x02]), ConditionalField(ShortField("pad2", 0xffff), lambda pkt:pkt.id in [0x02]), # MS_PROPERTY_IPADR @@ -781,18 +791,18 @@ class SAPMSProperty(Packet): # MS_PROPERTY_PARAM ConditionalField(FieldLenField("param_len", 0, length_of="param", fmt="I"), lambda pkt:pkt.id in [0x04]), - ConditionalField(StrLenField("param", "", length_from=lambda pkt: pkt.param_len), lambda pkt:pkt.id in [0x04]), - ConditionalField(StrLenField("param_padding", "", length_from=lambda pkt: 100 - pkt.param_len), lambda pkt:pkt.id in [0x04]), + ConditionalField(StrLenField("param", b"", length_from=lambda pkt: pkt.param_len), lambda pkt:pkt.id in [0x04]), + ConditionalField(StrLenField("param_padding", b"", length_from=lambda pkt: 100 - pkt.param_len), lambda pkt:pkt.id in [0x04]), ConditionalField(ShortField("pad3", 0), lambda pkt:pkt.id in [0x04]), ConditionalField(FieldLenField("value_len", 0x0, length_of="value", fmt="H"), lambda pkt:pkt.id in [0x04]), - ConditionalField(StrLenField("value", "", length_from=lambda pkt:pkt.value_len), lambda pkt:pkt.id in [0x04]), + ConditionalField(StrLenField("value", b"", length_from=lambda pkt:pkt.value_len), lambda pkt:pkt.id in [0x04]), # MS_PROPERTY_SERVICE ConditionalField(ShortField("service", 0), lambda pkt:pkt.id in [0x05]), ConditionalField(ByteField("value", 0), lambda pkt:pkt.id in [0x05]), # Release Information fields - ConditionalField(StrNullFixedLenField("release", "720", length=10), lambda pkt:pkt.id in [0x07]), + ConditionalField(StrNullFixedLenField("release", b"720", length=10), lambda pkt:pkt.id in [0x07]), ConditionalField(IntField("patchno", 0), lambda pkt:pkt.id in [0x07]), ConditionalField(IntField("supplvl", 0), lambda pkt:pkt.id in [0x07]), ConditionalField(IntField("platform", 0), lambda pkt:pkt.id in [0x07]), @@ -814,15 +824,15 @@ class SAPMSJ2EECluster(Packet): IntField("cluster_id", cluster_no_id), IntField("group_id", cluster_no), IntField("join_port", icm_port), - StrFixedLenField("name", "J2EE{}".format(cluster_no_id), 32), - StrFixedLenField("host", "localhost", 32), + StrFixedLenField("name", "J2EE{}".format(cluster_no_id).encode(), 32), + StrFixedLenField("host", b"localhost", 32), IPField("hostaddrv4", "127.0.0.1"), ByteField("type", 0x02), ByteField("state", 0x00), - StrFixedLenField("service_mask", 32 * "\xff", 32), + StrFixedLenField("service_mask", 32 * b"\xff", 32), ByteField("version", 0x02), ByteField("modifiers", 0x02), - StrFixedLenField("reserved", 4 * "\x00", 4), + StrFixedLenField("reserved", 4 * b"\x00", 4), IP6Field("hostaddrv6", "::1") ] @@ -859,7 +869,7 @@ class SAPMSJ2EEService(PacketNoPadded): fields_desc = [ ByteField("service_id", 0x00), ByteField("attached_nodes", 0x00), - StrFixedLenField("name", "\x00", 50), + StrFixedLenField("name", b"\x00", 50), ] @@ -878,7 +888,7 @@ class SAPDPInfo1(Packet): ShortField("dp_padd3", 0x0), ByteField("dp_padd4", 0x0), ByteEnumKeysField("dp_type_from", 0x2, dp_type_values), - StrFixedLenField("dp_fromname", " "*40, 40), + StrFixedLenField("dp_fromname", b" "*40, 40), ShortField("dp_padd41", 0x0), ByteField("dp_padd42", 0x0), ByteEnumKeysField("dp_agent_type_from", 0x6, dp_agent_type_values), @@ -899,7 +909,7 @@ class SAPDPInfo1(Packet): ShortField("dp_padd10", 0x0), ByteField("dp_padd11", 0x0), ByteEnumKeysField("dp_type_to", 0x2, dp_type_values), - StrFixedLenField("dp_toname", " "*40, 40), + StrFixedLenField("dp_toname", b" "*40, 40), ShortField("dp_padd51", 0x0), ByteField("dp_padd52", 0x0), @@ -932,21 +942,21 @@ class SAPDPInfo1(Packet): Field("dp_blob_worker_from_num", 0, ' 0), ConditionalField(ShortField("cpic_packet_size", 0x0), lambda pkt: pkt.version == 0x6 and 'F_V_SEND_DATA' in str(pkt.vector)), # chipik diff --git a/pysap/SAPRouter.py b/pysap/SAPRouter.py index 41cb46c..fcac06a 100644 --- a/pysap/SAPRouter.py +++ b/pysap/SAPRouter.py @@ -28,7 +28,7 @@ IntField, StrNullField, PacketListField, FieldLenField, FieldListField, SignedIntEnumField, StrFixedLenField, PacketField, BitField, LongField, - ByteEnumKeysField) + ByteEnumKeysField, MultipleTypeField) # Custom imports from pysap.SAPSNC import SAPSNCFrame from pysap.SAPNI import (SAPNI, SAPNIStreamSocket, SAPNIProxy, @@ -160,12 +160,17 @@ def from_string(cls, route_string): (/H/host/S/service/P/pass)* :param route_string: route string - :type route_string: C{string} + :type route_string: C{string} or C{bytes} :return: route hops in the route string :rtype: ``list`` of :class:`SAPRouterRouteHop` """ result = [] + + # Convert bytes to string if necessary + if isinstance(route_string, bytes): + route_string = route_string.decode('utf-8') + for route_hop in [x.groupdict() for x in cls.regex.finditer(route_string)]: result.append(cls(hostname=route_hop["hostname"], port=route_hop["port"], @@ -182,14 +187,15 @@ def from_hops(cls, route_hops): :return: route string :rtype: C{string} """ - result = "" - for route_hop in route_hops: - result += "/H/{}".format(route_hop.hostname) - if route_hop.port: - result += "/S/{}".format(route_hop.port) - if route_hop.password: - result += "/W/{}".format(route_hop.password) - return result + route_string = "" + for hop in route_hops: + route_string += "/H/" + hop.hostname.decode('utf-8') if isinstance(hop.hostname, bytes) else hop.hostname + if hop.port: + route_string += "/S/" + hop.port.decode('utf-8') if isinstance(hop.port, bytes) else hop.port + if hop.password: + route_string += "/W/" + hop.password.decode('utf-8') if isinstance(hop.password, + bytes) else hop.password + return route_string class SAPRouterInfoClient(PacketNoPadded): @@ -248,7 +254,7 @@ class SAPRouterError(PacketNoPadded): This packet is used to describe an error returned by SAP Router. """ name = "SAP Router Error Text" - fields_desc = [ + ields_desc = [ StrNullField("eyecatcher", "*ERR*"), StrNullField("counter", "1"), StrNullField("error", ""), @@ -269,7 +275,8 @@ class SAPRouterError(PacketNoPadded): StrNullField("XXX6", ""), StrNullField("XXX7", ""), StrNullField("XXX8", ""), - StrNullField("eyecatcher", "*ERR*"), + ConditionalField(StrNullField("eyecatcher", "*ERR*"), + lambda pkt: pkt.fields.get("eyecatcher") == "*ERR*") ] time_format = "%a %b %d %H:%M:%S %Y" @@ -449,11 +456,17 @@ class SAPRouter(Packet): # Info Request fields ConditionalField(StrNullFixedLenField("adm_password", "", 19), lambda pkt:router_is_admin(pkt) and pkt.adm_command in [2]), - # Cancel Route fields - ConditionalField(FieldLenField("adm_client_count", None, count_of="adm_client_ids", fmt="H"), lambda pkt:router_is_admin(pkt) and pkt.adm_command in [6]), - # Trace Connection fields - ConditionalField(FieldLenField("adm_client_count", None, count_of="adm_client_ids", fmt="I"), lambda pkt:router_is_admin(pkt) and pkt.adm_command in [12, 13]), - + MultipleTypeField( + [ + # Cancel Route fields + (FieldLenField("adm_client_count", None, count_of="adm_client_ids", fmt="H"), + lambda pkt: router_is_admin(pkt) and pkt.adm_command == 6), + # Trace Connection fields + (FieldLenField("adm_client_count", None, count_of="adm_client_ids", fmt="I"), + lambda pkt: router_is_admin(pkt) and pkt.adm_command in [12, 13]), + ], + FieldLenField("adm_client_count", None, count_of="adm_client_ids", fmt="I") + ), # Cancel Route or Trace Connection fields ConditionalField(FieldListField("adm_client_ids", [0x00], IntField("", 0), count_from=lambda pkt:pkt.adm_client_count), lambda pkt:router_is_admin(pkt) and pkt.adm_command in [6, 12, 13]), @@ -572,7 +585,7 @@ def route_to(self, route, talk_mode): # Build the route request packet talk_mode = talk_mode or ROUTER_TALK_MODE_NI_MSG_IO router_strings = list(map(str, route)) - target = "%s:%d" % (route[-1].hostname, int(route[-1].port)) + target = "%s:%d" % (route[-1].hostname.decode(), int(route[-1].port)) router_strings_lens = list(map(len, router_strings)) route_request = SAPRouter(type=SAPRouter.SAPROUTER_ROUTE, route_ni_version=self.router_version, diff --git a/pysap/SAPSSFS.py b/pysap/SAPSSFS.py index 2391cee..063bd00 100644 --- a/pysap/SAPSSFS.py +++ b/pysap/SAPSSFS.py @@ -33,7 +33,7 @@ log_ssfs = logging.getLogger("pysap.ssfs") -ssfs_hmac_key_unobscured = "\xe3\xa0\x61\x11\x85\x41\x68\x99\xf3\x0e\xda\x87\x7a\x80\xcc\x69" +ssfs_hmac_key_unobscured = b"\xe3\xa0\x61\x11\x85\x41\x68\x99\xf3\x0e\xda\x87\x7a\x80\xcc\x69" """Fixed key embedded in rsecssfx binaries for validating integrity of records""" @@ -43,7 +43,7 @@ class SAPSSFSLKY(Packet): """ name = "SAP SSFS LKY" fields_desc = [ - StrFixedLenField("preamble", "RSecSSFsLKY", 11), + StrFixedLenField("preamble", b"RSecSSFsLKY", 11), ] @@ -53,7 +53,7 @@ class SAPSSFSLock(Packet): """ name = "SAP SSFS Lock" fields_desc = [ - StrFixedLenField("preamble", "RSecSSFsLock", 12), + StrFixedLenField("preamble", b"RSecSSFsLock", 12), ByteField("file_type", 0), ByteField("type", 0), TimestampField("timestamp", None), @@ -69,7 +69,7 @@ class SAPSSFSKey(Packet): """ name = "SAP SSFS Key" fields_desc = [ - StrFixedLenField("preamble", "RSecSSFsKey", 11), + StrFixedLenField("preamble", b"RSecSSFsKey", 11), ByteField("type", 1), StrFixedLenField("key", None, 24), TimestampField("timestamp", None), @@ -87,7 +87,7 @@ class SAPSSFSDecryptedPayload(PacketNoPadded): fields_desc = [ # Record Header - StrFixedLenField("preamble", "\x00"*8, 8), + StrFixedLenField("preamble", b"\x00"*8, 8), LenField("length", 0, fmt="I"), # Max record length supported is 0x18150 StrFixedLenField("hash", None, 20), # Data Header @@ -98,7 +98,7 @@ class SAPSSFSDecryptedPayload(PacketNoPadded): @property def valid(self): """Returns whether the SHA1 value is valid for the given payload""" - blob = str(self) + blob = self digest = Hash(SHA1()) digest.update(blob[:8]) @@ -121,15 +121,15 @@ class SAPSSFSDataRecord(PacketNoPadded): fields_desc = [ # Record Header - StrFixedLenField("preamble", "RSecSSFsData", 12), + StrFixedLenField("preamble", b"RSecSSFsData", 12), LenField("length", 0, fmt="I"), # Max record length supported is 0x18150 ByteField("type", 1), # Record type "1" supported StrFixedLenField("filler1", None, 7), # Data Header - StrFixedLenPaddedField("key_name", None, 64, padd=" "), + StrFixedLenPaddedField("key_name", b"", 64, padd=" "), TimestampField("timestamp", None), - StrFixedLenPaddedField("user", None, 24, padd=" "), - StrFixedLenPaddedField("host", None, 24, padd=" "), + StrFixedLenPaddedField("user", "", 24, padd=" "), + StrFixedLenPaddedField("host", "", 24, padd=" "), YesNoByteField("is_deleted", 0), YesNoByteField("is_stored_as_plaintext", 0), YesNoByteField("is_binary_data", 0), @@ -150,16 +150,20 @@ def decrypt_data(self, key): log_ssfs.debug("Decrypting record {}".format(self.key_name)) decrypted_data = rsec_decrypt(self.data, key.key) decrypted_payload = SAPSSFSDecryptedPayload(decrypted_data) - log_ssfs.warn("Decrypted payload integrity is {}".format(decrypted_payload.valid)) + log_ssfs.warning("Decrypted payload integrity is {}".format(decrypted_payload.valid)) return decrypted_payload.data @property def valid(self): """Returns whether the HMAC-SHA1 value is valid for the given payload""" - # Calculate the HMAC-SHA1 h = HMAC(ssfs_hmac_key_unobscured, SHA1()) - h.update(str(self)[24:156]) # Entire Data header without the HMAC field + header = None + try: + header = bytes(self)[24:156] + except: + header = str(self)[24:156].encode() + h.update(header) h.update(self.data) # Validate the signature @@ -195,7 +199,7 @@ def has_record(self, key_name): :rtype: bool """ for record in self.records: - if record.key_name.rstrip(" ") == key_name: + if record.key_name.rstrip(b" ") == key_name: return True return False @@ -209,7 +213,7 @@ def get_records(self, key_name): :rtype: SAPSSFSDataRecord """ for record in self.records: - if record.key_name.rstrip(" ") == key_name: + if record.key_name.rstrip(b" ") == key_name: yield record def get_record(self, key_name): @@ -238,4 +242,4 @@ def get_value(self, key_name, key=None): try: return self.get_record(key_name).get_plain_data(key) except AttributeError: - return None + return None \ No newline at end of file diff --git a/pysap/utils/crypto/__init__.py b/pysap/utils/crypto/__init__.py index b287426..f4ecc17 100644 --- a/pysap/utils/crypto/__init__.py +++ b/pysap/utils/crypto/__init__.py @@ -111,15 +111,17 @@ def derive(self, password): v = self._algorithm.block_size # Step 1 - Concatenate v/8 copies of ID - d = chr(self._id) * v + d = bytes([self._id]) * v def concatenate_string(inp): + if isinstance(inp, str): + inp = inp.encode('utf-8') s = b'' if inp != b'': s_len = v * int(math.ceil(float(len(inp)) / v)) while len(s) < s_len: s += inp - s = s[0:s_len] + s = s[:s_len] return s # Step 2 - Concatenate copies of the salt @@ -137,6 +139,8 @@ def concatenate_string(inp): # Step 6 def digest(inp): + if isinstance(inp, str): + inp = inp.encode('utf-8') h = Hash(self._algorithm()) h.update(inp) return h.finalize() @@ -144,13 +148,10 @@ def digest(inp): def to_int(value): if value == b'': return 0 - return int(value.encode("hex"), 16) + return int.from_bytes(value, 'big') def to_bytes(value): - value = "%x" % value - if len(value) & 1: - value = "0" + value - return value.decode("hex") + return value.to_bytes((value.bit_length() + 7) // 8, 'big') a = b'\x00' * (c * u) for n in range(1, c + 1): @@ -258,16 +259,14 @@ def decrypt(self, cipher_text): class SCRAM(object): - """Base interface for implementing SCRAM password schemes. - """ + """Base interface for implementing SCRAM password schemes.""" CLIENT_PROOF_SIZE = 32 CLIENT_KEY_SIZE = 64 ALGORITHM = None def get_client_key(self): - """Returns a client key to be used during the handshake. - """ + """Returns a client key to be used during the handshake.""" return os.urandom(self.CLIENT_KEY_SIZE) def salt_key(self, password, salt, rounds): @@ -276,8 +275,7 @@ def salt_key(self, password, salt, rounds): return hmac.finalize() def scramble_salt(self, password, salt, server_key, client_key, rounds=None): - """Scrambles a given salt using the specified server key. - """ + """Scrambles a given salt using the specified server key.""" msg = salt + server_key + client_key hmac_digest = self.salt_key(password, salt, rounds) @@ -307,15 +305,12 @@ def xor(self, a, b): class SCRAM_SHA256(SCRAM): - """SCRAM scheme using SHA256 as the hashing algorithm. - """ + """SCRAM scheme using SHA256 as the hashing algorithm.""" ALGORITHM = SHA256 class SCRAM_MD5(SCRAM): - """SCRAM scheme using MD5 as the hashing algorithm. - """ - + """SCRAM scheme using MD5 as the hashing algorithm.""" ALGORITHM = MD5 @@ -335,10 +330,10 @@ def rsec_decrypt(blob, key): hence implemented in the crypto library instead of the particular layer. :param blob: encrypted blob to decrypt - :type blob: bytes + :type blob: bytes or str :param key: key to use to decrypt - :type key: bytes + :type key: bytes or str :return: decrypted blob :rtype: bytes @@ -348,8 +343,17 @@ def rsec_decrypt(blob, key): if len(key) != 24: raise Exception("Wrong key length") - blob = [ord(i) for i in blob] - key = [ord(i) for i in key] + # Convert input to list of integers if they are strings or bytes + if isinstance(blob, str): + blob = [ord(i) for i in blob] + elif isinstance(blob, bytes): + blob = list(blob) + + if isinstance(key, str): + key = [ord(i) for i in key] + elif isinstance(key, bytes): + key = list(key) + key1 = key[0:8] key2 = key[8:16] key3 = key[16:24] @@ -359,4 +363,4 @@ def rsec_decrypt(blob, key): round_2 = cipher.crypt(RSECCipher.MODE_ENCODE, round_1, key2, len(round_1)) round_3 = cipher.crypt(RSECCipher.MODE_DECODE, round_2, key1, len(round_2)) - return ''.join([chr(i) for i in round_3]) + return bytes(round_3) \ No newline at end of file diff --git a/pysap/utils/crypto/rsec.py b/pysap/utils/crypto/rsec.py index 5c0ff34..dee375c 100644 --- a/pysap/utils/crypto/rsec.py +++ b/pysap/utils/crypto/rsec.py @@ -354,7 +354,7 @@ def _generate_keys(self, key): if array_2[self.pc2[n] - 1]: tmp = self.byte_bit[n % 6] tmp = tmp >> 2 - keys[8 * j + n / 6] |= tmp + keys[8 * j + n // 6] |= tmp # Use floor division here return keys def crypt(self, mode, blob, key, length): diff --git a/pysap/utils/fields.py b/pysap/utils/fields.py index 0a75ae1..0f2caab 100644 --- a/pysap/utils/fields.py +++ b/pysap/utils/fields.py @@ -355,7 +355,7 @@ class TimestampField(LongField): """Timestamp field""" def i2h(self, pkt, x): - dt = datetime.utcfromtimestamp(x) + dt = datetime.utcfromtimestamp return dt.strftime("%Y-%m-%d %H:%M:%S UTC") diff --git a/requirements.txt b/requirements.txt index fbb8f23..5e89b52 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -scapy==2.4.5 -cryptography==3.3.2 +scapy==2.5.0 +cryptography==42.0.4 diff --git a/tests/test_crypto.py b/tests/test_crypto.py index d45414d..2735dc0 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -30,7 +30,7 @@ def test_scram_sha256_scramble_salt(self): Values are taken from https://github.com/SAP/PyHDB/blob/master/tests/test_auth.py """ - password = "secret" + password = b"secret" salt = b"\x80\x96\x4f\xa8\x54\x28\xae\x3a\x81\xac" \ b"\xd3\xe6\x86\xa2\x79\x33" server_key = b"\x41\x06\x51\x50\x11\x7e\x45\x5f\xec\x2f\x03\xf6" \ @@ -58,7 +58,7 @@ def test_scram_pbkdf2sha256_scramble_salt(self): Values are taken from https://github.com/SAP/go-hdb/blob/master/internal/protocol/authentication_test.go """ - password = "Toor1234" + password = b"Toor1234" rounds = 15000 salt = b"3\xb2\xd5\xd5\\R\xc2(Px\xc5[\xa6C\x17?" server_key = b" [\xa5\x12\x9eM\x86E\x80\x9dE\xd1/!\xab\xa48\xac\xe5\x00\x99\x03A\x1d\xef\xd2\xba\x86Q \x1d\x89\xef\xa7'\x01\xabuU\x8am&*M+*RF" diff --git a/tests/test_sapcredv2.py b/tests/test_sapcredv2.py index be05ebd..5c00a4e 100644 --- a/tests/test_sapcredv2.py +++ b/tests/test_sapcredv2.py @@ -30,19 +30,19 @@ class PySAPCredV2Test(unittest.TestCase): - decrypt_username = "username" - decrypt_pin = "1234567890" - cert_name = "CN=PSEOwner" - common_name = "PSEOwner" - subject_str = "/CN=PSEOwner" + decrypt_username = b"username" + decrypt_pin = b"1234567890" + cert_name = b"CN=PSEOwner" + common_name = b"PSEOwner" + subject_str = b"/CN=PSEOwner" subject = [ X509_RDN(rdn=[ X509_AttributeTypeAndValue(type=ASN1_OID("2.5.4.3"), value=ASN1_PRINTABLE_STRING(common_name)) ]) ] - pse_path = "/secudir/pse-v2-noreq-DSA-1024-SHA1.pse" - pse_path_win = "C:\\secudir\\pse-v2-noreq-DSA-1024-SHA1.pse" + pse_path = b"/secudir/pse-v2-noreq-DSA-1024-SHA1.pse" + pse_path_win = b"C:\\secudir\\pse-v2-noreq-DSA-1024-SHA1.pse" def validate_credv2_lps_off_fields(self, creds, number, lps_type, cipher_format_version, cipher_algorithm, cert_name=None, pse_path=None): @@ -56,9 +56,9 @@ def validate_credv2_lps_off_fields(self, creds, number, lps_type, cipher_format_ self.assertEqual(cred.cipher_algorithm, cipher_algorithm) self.assertEqual(cred.cert_name, cert_name or self.cert_name) - self.assertEqual(cred.unknown1, "") + self.assertEqual(cred.unknown1, b"") self.assertEqual(cred.pse_path, pse_path or self.pse_path) - self.assertEqual(cred.unknown2, "") + self.assertEqual(cred.unknown2, b"") def validate_credv2_plain(self, cred, decrypt_username=None, decrypt_pin=None): plain = cred.decrypt(decrypt_username or self.decrypt_username) @@ -96,7 +96,7 @@ def test_credv2_lps_off_v0_dp_3des_decrypt(self): cred = SAPCredv2(s).creds[0].cred plain = cred.decrypt(self.decrypt_username) - self.assertEqual(plain.option1, SAPCredv2_Cred_Plain.PROVIDER_MSCryptProtect) + self.assertEqual(plain.option1, SAPCredv2_Cred_Plain.PROVIDER_MSCryptProtect.encode()) def test_credv2_lps_off_v1_3des(self): """Test parsing of a version 1 3DES encrypted credential with LPS off""" @@ -143,7 +143,7 @@ def test_credv2_lps_on_v2_int_aes256(self): self.assertEqual(len(creds), 1) cred = creds[0].cred - self.assertEqual(cred.common_name, self.subject_str) + self.assertEqual(cred.common_name.encode(), self.subject_str) self.assertEqual(cred.subject, self.subject) self.assertEqual(cred.subject[0].rdn[0].type.val, "2.5.4.3") self.assertEqual(cred.subject[0].rdn[0].value.val, self.common_name) @@ -173,7 +173,7 @@ def test_credv2_lps_on_v2_dp_aes256(self): self.assertEqual(len(creds), 1) cred = creds[0].cred - self.assertEqual(cred.common_name, self.subject_str) + self.assertEqual(cred.common_name.encode(), self.subject_str) self.assertEqual(cred.subject, self.subject) self.assertEqual(cred.subject[0].rdn[0].type.val, "2.5.4.3") self.assertEqual(cred.subject[0].rdn[0].value.val, self.common_name) @@ -196,20 +196,20 @@ def test_credv2_lps_on_v2_int_aes256_composed_subject(self): creds = SAPCredv2(s).creds self.assertEqual(len(creds), 1) - subject_str = "/C=AR/CN=PSEOwner" + subject_str = b"/C=AR/CN=PSEOwner" subject = [ X509_RDN(rdn=[ X509_AttributeTypeAndValue(type=ASN1_OID("2.5.4.6"), - value=ASN1_PRINTABLE_STRING("AR"))]), + value=ASN1_PRINTABLE_STRING(b"AR"))]), X509_RDN(rdn=[ X509_AttributeTypeAndValue(type=ASN1_OID("2.5.4.3"), value=ASN1_PRINTABLE_STRING(self.common_name))]), ] cred = creds[0].cred - self.assertEqual(cred.common_name, subject_str) + self.assertEqual(cred.common_name.encode(), subject_str) self.assertEqual(cred.subject, subject) self.assertEqual(cred.subject[0].rdn[0].type.val, "2.5.4.6") - self.assertEqual(cred.subject[0].rdn[0].value.val, "AR") + self.assertEqual(cred.subject[0].rdn[0].value.val, b"AR") self.assertEqual(cred.subject[1].rdn[0].type.val, "2.5.4.3") self.assertEqual(cred.subject[1].rdn[0].value.val, self.common_name) @@ -221,6 +221,5 @@ def test_credv2_lps_on_v2_int_aes256_composed_subject(self): self.validate_credv2_plain(cred) - if __name__ == "__main__": unittest.main(verbosity=1) diff --git a/tests/test_sapdiag.py b/tests/test_sapdiag.py index 905ca81..496f81b 100755 --- a/tests/test_sapdiag.py +++ b/tests/test_sapdiag.py @@ -55,7 +55,7 @@ def test_sapdiag_header_dissection_plain(self): diag_header_plain = SAPDiag(compress=0) diag_header_plain.message.append(diag_item) - new_diag_header_plain = SAPDiag(str(diag_header_plain)) + new_diag_header_plain = SAPDiag(diag_header_plain) self.assertEqual(str(diag_header_plain), str(new_diag_header_plain)) @@ -66,7 +66,7 @@ def test_sapdiag_header_dissection_compressed(self): diag_header_compr = SAPDiag(compress=1) diag_header_compr.message.append(diag_item) - new_diag_header_compr = SAPDiag(str(diag_header_compr)) + new_diag_header_compr = SAPDiag(diag_header_compr) self.assertEqual(str(diag_header_compr.message[0]), str(new_diag_header_compr.message[0])) @@ -138,7 +138,7 @@ def test_sapdiag_items_lookup(self): self.assertNotIn(sapdiag_appl_item, sapdiag.get_item(["APPL"], ["ST_USER"], ["CONNECT"])) # Insert a wrong item and observe that the lookup still works - sapdiag.message.append(Raw("\x00" * 10)) + sapdiag.message.append(Raw(b"\x00" * 10)) self.assertIn(sapdiag_ses_item, sapdiag.get_item("SES")) self.assertIn(sapdiag_appl_item, sapdiag.get_item(["APPL"], "ST_USER", ["RFC_PARENT_UUID", "CONNECT"])) @@ -150,11 +150,11 @@ class SAPDiagItemTest(Packet): item_string = "strfield" item_value = SAPDiagItemTest(strfield=item_string) - item = SAPDiagItem("\x10\x99\xff" + pack("!H", len(item_string)) + item_string) + item = SAPDiagItem(b"\x10\x99\xff" + pack("!H", len(item_string)) + item_string.encode()) self.assertEqual(item.item_value, item_value) self.assertEqual(item.item_length, len(item_string)) - self.assertEqual(item.item_value.strfield, item_string) + self.assertEqual(item.item_value.strfield, item_string.encode()) self.assertEqual(str(item.item_value), str(item_value)) self.assertIs(diag_item_get_class(item, "APPL", 0x99, 0xff), SAPDiagItemTest) diff --git a/tests/test_sapni.py b/tests/test_sapni.py index 7519b62..b86d608 100755 --- a/tests/test_sapni.py +++ b/tests/test_sapni.py @@ -19,6 +19,7 @@ # Standard imports import sys import socket +import time import unittest from threading import Thread from struct import pack, unpack @@ -53,13 +54,13 @@ def stop_server(self): class PySAPNITest(unittest.TestCase): - test_string = "LALA" * 10 + test_string = b"LALA" * 10 def test_sapni_building(self): """Test SAPNI length field building""" sapni = SAPNI() / self.test_string - - (sapni_length, ) = unpack("!I", str(sapni)[:4]) + sapni_bytes = bytes(sapni) # Convert to bytes + (sapni_length,) = unpack("!I", sapni_bytes[:4]) self.assertEqual(sapni_length, len(self.test_string)) self.assertEqual(sapni.payload.load, self.test_string) @@ -91,21 +92,21 @@ class SAPNITestHandlerKeepAlive(SAPNITestHandler): def handle(self): SAPNITestHandler.handle(self) - self.request.sendall("\x00\x00\x00\x08NI_PING\x00") + self.request.sendall(b"\x00\x00\x00\x08NI_PING\x00") class SAPNITestHandlerClose(SAPNITestHandler): """Basic SAP NI server that closes the connection""" def handle(self): - self.request.send("") + self.request.send(b"") class PySAPNIStreamSocketTest(PySAPBaseServerTest): test_port = 8005 test_address = "127.0.0.1" - test_string = "TEST" * 10 + test_string = b"TEST" * 10 def test_sapnistreamsocket(self): """Test SAPNIStreamSocket""" @@ -184,7 +185,7 @@ def test_sapnistreamsocket_without_keep_alive(self): self.assertIn(SAPNI, packet) self.assertEqual(packet[SAPNI].length, len(SAPNI.SAPNI_PING)) - self.assertEqual(packet.payload.load, SAPNI.SAPNI_PING) + self.assertEqual(packet.payload.load, SAPNI.SAPNI_PING.encode()) self.client.close() self.stop_server() @@ -208,6 +209,7 @@ def test_sapnistreamsocket_with_keep_alive(self): self.assertEqual(packet.payload.load, self.test_string) # Then we should get a connection reset if we try to receive from the server + self.client.recv() self.assertRaises(socket.error, self.client.recv) self.client.close() @@ -239,7 +241,7 @@ class PySAPNIServerTest(PySAPBaseServerTest): test_port = 8005 test_address = "127.0.0.1" - test_string = "TEST" * 10 + test_string = b"TEST" * 10 handler_cls = SAPNIServerTestHandler def test_sapniserver(self): @@ -268,7 +270,7 @@ class PySAPNIProxyTest(PySAPBaseServerTest): test_proxyport = 8005 test_serverport = 8006 test_address = "127.0.0.1" - test_string = "TEST" * 10 + test_string = b"TEST" * 10 proxyhandler_cls = SAPNIProxyHandler serverhandler_cls = SAPNIServerTestHandler @@ -326,7 +328,7 @@ def process_server(self, packet): sock.connect((self.test_address, self.test_proxyport)) sock.sendall(pack("!I", len(self.test_string)) + self.test_string) - expected_reponse = self.test_string + "Client" + "Server" + expected_reponse = self.test_string + b"Client" + b"Server" response = sock.recv(4) self.assertEqual(len(response), 4) diff --git a/tests/test_sappse.py b/tests/test_sappse.py index 0f31b1f..b6ea8eb 100644 --- a/tests/test_sappse.py +++ b/tests/test_sappse.py @@ -46,10 +46,8 @@ def test_pse_v2_lps_off_pbes1_3des_sha1(self): def test_pse_v2_lps_off_pbes1_3des_sha1_decrypt(self): """Test decryption of a v2 PBES1 encrypted PSE with LPS off""" - with open(data_filename("pse_v2_lps_off_pbes1_3des_sha1.pse"), "rb") as fd: s = fd.read() - pse = SAPPSEFile(s) self.assertRaisesRegex(ValueError, "Invalid PIN supplied", pse.decrypt, "Some Invalid PIN") pse.decrypt(self.decrypt_pin) diff --git a/tests/test_saprouter.py b/tests/test_saprouter.py index 9c12ee2..c3f277a 100755 --- a/tests/test_saprouter.py +++ b/tests/test_saprouter.py @@ -22,6 +22,7 @@ import unittest from struct import unpack from threading import Thread + # External imports # Custom imports @@ -34,60 +35,73 @@ class PySAPRouterTest(unittest.TestCase): def check_route(self, route_string, route_hops): """Check from string to hops and back again""" + # Convert route_string to bytes if it's a string + if isinstance(route_string, str): + route_string = route_string.encode('utf-8') + hops = SAPRouterRouteHop.from_string(route_string) self.assertListEqual(hops, route_hops) string = SAPRouterRouteHop.from_hops(hops) - route_string = route_string.replace("/h/", "/H/").replace("/s/", "/S/").replace("/p/", "/P/").replace("/w/", "/W/") - self.assertEqual(string, route_string) + normalized_route_string = route_string.replace(b"/h/", b"/H/").replace(b"/s/", b"/S/").replace(b"/p/", + b"/P/").replace( + b"/w/", b"/W/") + + # Convert both strings to the same type for comparison + if isinstance(string, str): + normalized_route_string = normalized_route_string.decode('utf-8') + else: + string = string.encode('utf-8') + + self.assertEqual(string, normalized_route_string) def test_saprouter_route_string(self): """Test construction of SAPRouterRouteHop items""" # Two hops with full details self.check_route("/H/host1/S/service1/W/pass1/H/host2/S/service2/W/pass2", - [SAPRouterRouteHop(hostname="host1", - port="service1", - password="pass1"), - SAPRouterRouteHop(hostname="host2", - port="service2", - password="pass2")]) + [SAPRouterRouteHop(hostname=b"host1", + port=b"service1", + password=b"pass1"), + SAPRouterRouteHop(hostname=b"host2", + port=b"service2", + password=b"pass2")]) # One intermediate hop with service/password - self.check_route("/H/host1/H/host2/S/service2/W/pass2/H/host3", - [SAPRouterRouteHop(hostname="host1"), - SAPRouterRouteHop(hostname="host2", - port="service2", - password="pass2"), - SAPRouterRouteHop(hostname="host3")]) + self.check_route(b"/H/host1/H/host2/S/service2/W/pass2/H/host3", + [SAPRouterRouteHop(hostname=b"host1"), + SAPRouterRouteHop(hostname=b"host2", + port=b"service2", + password=b"pass2"), + SAPRouterRouteHop(hostname=b"host3")]) # Example in SAP Help - self.check_route("/H/sap_rout/H/your_rout/W/pass_to_app/H/yourapp/S/sapsrv", - [SAPRouterRouteHop(hostname="sap_rout"), - SAPRouterRouteHop(hostname="your_rout", - password="pass_to_app"), - SAPRouterRouteHop(hostname="yourapp", - port="sapsrv")]) + self.check_route(b"/H/sap_rout/H/your_rout/W/pass_to_app/H/yourapp/S/sapsrv", + [SAPRouterRouteHop(hostname=b"sap_rout"), + SAPRouterRouteHop(hostname=b"your_rout", + password=b"pass_to_app"), + SAPRouterRouteHop(hostname=b"yourapp", + port=b"sapsrv")]) # Hostname with FQDN - self.check_route("/H/some.valid.domain.com/S/3299", - [SAPRouterRouteHop(hostname="some.valid.domain.com", - port="3299")]) + self.check_route(b"/H/some.valid.domain.com/S/3299", + [SAPRouterRouteHop(hostname=b"some.valid.domain.com", + port=b"3299")]) # Hostname with IP addresses - self.check_route("/H/127.0.0.1/S/3299", + self.check_route(b"/H/127.0.0.1/S/3299", [SAPRouterRouteHop(hostname="127.0.0.1", port="3299")]) # Lowercase hostname and service - self.check_route("/h/127.0.0.1/s/3299/w/Password", + self.check_route(b"/h/127.0.0.1/s/3299/w/Password", [SAPRouterRouteHop(hostname="127.0.0.1", port="3299", - password="Password")]) + password=b"Password")]) # Invalid route strings - self.assertListEqual(SAPRouterRouteHop.from_string("/S/service"), []) - self.assertListEqual(SAPRouterRouteHop.from_string("/P/password"), []) + self.assertListEqual(SAPRouterRouteHop.from_string(b"/S/service"), []) + self.assertListEqual(SAPRouterRouteHop.from_string(b"/P/password"), []) class SAPRouterServerTestHandler(SAPNIServerHandler): @@ -107,7 +121,7 @@ def handle_data(self): route_request = self.packet[SAPRouter] if router_is_route(route_request): - if route_request.route_string[1].hostname == "10.0.0.1" and \ + if route_request.route_string[1].hostname == "127.0.0.1" and \ route_request.route_string[1].port == "3200": self.routed = True self.request.send(SAPRouter(type=SAPRouter.SAPROUTER_PONG)) @@ -122,7 +136,7 @@ class PySAPRoutedStreamSocketTest(unittest.TestCase): test_port = 8005 test_address = "127.0.0.1" - test_string = "TEST" * 10 + test_string = b"TEST" * 10 def start_server(self, handler_cls): self.server = SAPNIServerThreaded((self.test_address, self.test_port), @@ -148,9 +162,9 @@ def test_saproutedstreamsocket(self): sock.connect((self.test_address, self.test_port)) route = [SAPRouterRouteHop(hostname=self.test_address, - port=self.test_port), - SAPRouterRouteHop(hostname="10.0.0.1", - port="3200")] + port=str(self.test_port)), + SAPRouterRouteHop(hostname="127.0.0.1", + port=3200)] self.client = SAPRoutedStreamSocket(sock, route=route, router_version=40) @@ -158,7 +172,7 @@ def test_saproutedstreamsocket(self): self.assertIn(SAPNI, packet) self.assertEqual(packet[SAPNI].length, len(self.test_string) + 4) - self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string), )) + self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string),)) self.assertEqual(packet[SAPNI].payload.load[4:], self.test_string) self.client.close() @@ -172,10 +186,10 @@ def test_saproutedstreamsocket_route_error(self): sock = socket.socket() sock.connect((self.test_address, self.test_port)) - route = [SAPRouterRouteHop(hostname=self.test_address, - port=self.test_port), - SAPRouterRouteHop(hostname="10.0.0.2", - port="3200")] + route = [SAPRouterRouteHop(hostname=self.test_address.encode(), + port=str(self.test_port).encode()), + SAPRouterRouteHop(hostname="127.0.0.1", + port=3200)] with self.assertRaises(SAPRouteException): self.client = SAPRoutedStreamSocket(sock, route=route, @@ -202,43 +216,42 @@ def test_saproutedstreamsocket_getnisocket(self): self.start_server(SAPRouterServerTestHandler) # Test using a complete route - route = [SAPRouterRouteHop(hostname=self.test_address, - port=self.test_port), - SAPRouterRouteHop(hostname="10.0.0.1", - port="3200")] + route = [SAPRouterRouteHop(hostname=self.test_address.encode(), + port=str(self.test_port).encode()), + SAPRouterRouteHop(hostname="127.0.0.1", + port=3200)] self.client = SAPRoutedStreamSocket.get_nisocket(route=route, router_version=40) packet = self.client.sr(self.test_string) self.assertIn(SAPNI, packet) self.assertEqual(packet[SAPNI].length, len(self.test_string) + 4) - self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string), )) + self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string),)) self.assertEqual(packet[SAPNI].payload.load[4:], self.test_string) # Test using a route and a target host/port - route = [SAPRouterRouteHop(hostname=self.test_address, - port=self.test_port)] - self.client = SAPRoutedStreamSocket.get_nisocket("10.0.0.1", - "3200", + route = [SAPRouterRouteHop(hostname=self.test_address.encode(), + port=str(self.test_port).encode())] + self.client = SAPRoutedStreamSocket.get_nisocket("127.0.0.1", + 3200, route=route, router_version=40) packet = self.client.sr(self.test_string) self.assertIn(SAPNI, packet) self.assertEqual(packet[SAPNI].length, len(self.test_string) + 4) - self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string), )) + self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string),)) self.assertEqual(packet[SAPNI].payload.load[4:], self.test_string) # Test using a route string - route = "/H/%s/S/%s/H/10.0.0.1/S/3200" % (self.test_address, - self.test_port) - self.client = SAPRoutedStreamSocket.get_nisocket(route=route, + route = f"/H/{self.test_address}/S/{self.test_port}/H/127.0.0.1/S/3200" + self.client = SAPRoutedStreamSocket.get_nisocket(route=route.encode(), router_version=40) packet = self.client.sr(self.test_string) self.assertIn(SAPNI, packet) self.assertEqual(packet[SAPNI].length, len(self.test_string) + 4) - self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string), )) + self.assertEqual(unpack("!I", packet[SAPNI].payload.load[:4]), (len(self.test_string),)) self.assertEqual(packet[SAPNI].payload.load[4:], self.test_string) self.client.close() @@ -246,4 +259,4 @@ def test_saproutedstreamsocket_getnisocket(self): if __name__ == "__main__": - unittest.main(verbosity=1) + unittest.main(verbosity=1) \ No newline at end of file diff --git a/tests/test_sapssfs.py b/tests/test_sapssfs.py index 08c1854..c17e571 100755 --- a/tests/test_sapssfs.py +++ b/tests/test_sapssfs.py @@ -26,8 +26,8 @@ class PySAPSSFSKeyTest(unittest.TestCase): - USERNAME = "SomeUser " - HOST = "ubuntu " + USERNAME = b"SomeUser " + HOST = b"ubuntu " def test_ssfs_key_parsing(self): """Test parsing of a SSFS Key file""" @@ -37,7 +37,7 @@ def test_ssfs_key_parsing(self): key = SAPSSFSKey(s) - self.assertEqual(key.preamble, "RSecSSFsKey") + self.assertEqual(key.preamble, b"RSecSSFsKey") self.assertEqual(key.type, 1) self.assertEqual(key.user, self.USERNAME) self.assertEqual(key.host, self.HOST) @@ -45,12 +45,12 @@ def test_ssfs_key_parsing(self): class PySAPSSFSDataTest(unittest.TestCase): - USERNAME = "SomeUser " - HOST = "ubuntu " + USERNAME = b"SomeUser " + HOST = b"ubuntu " - PLAIN_VALUES = {"HDB/KEYNAME/DB_CON_ENV": "Env", - "HDB/KEYNAME/DB_DATABASE_NAME": "Database", - "HDB/KEYNAME/DB_USER": "SomeUser", + PLAIN_VALUES = {b"HDB/KEYNAME/DB_CON_ENV": b"Env", + b"HDB/KEYNAME/DB_DATABASE_NAME": b"Database", + b"HDB/KEYNAME/DB_USER": b"SomeUser", } def test_ssfs_data_parsing(self): @@ -63,7 +63,7 @@ def test_ssfs_data_parsing(self): self.assertEqual(len(data.records), 4) for record in data.records: - self.assertEqual(record.preamble, "RSecSSFsData") + self.assertEqual(record.preamble, b"RSecSSFsData") self.assertEqual(record.length, len(record)) self.assertEqual(record.type, 1) self.assertEqual(record.user, self.USERNAME) @@ -77,9 +77,9 @@ def test_ssfs_data_record_lookup(self): data = SAPSSFSData(s) - self.assertFalse(data.has_record("HDB/KEYNAME/UNEXISTENT")) - self.assertIsNone(data.get_record("HDB/KEYNAME/UNEXISTENT")) - self.assertIsNone(data.get_value("HDB/KEYNAME/UNEXISTENT")) + self.assertFalse(data.has_record(b"HDB/KEYNAME/UNEXISTENT")) + self.assertIsNone(data.get_record(b"HDB/KEYNAME/UNEXISTENT")) + self.assertIsNone(data.get_value(b"HDB/KEYNAME/UNEXISTENT")) for key, value in list(self.PLAIN_VALUES.items()): self.assertTrue(data.has_record(key)) @@ -108,14 +108,14 @@ def test_ssfs_data_record_hmac(self): # Now tamper with the data orginal_data = record.data - record.data = orginal_data + "AddedDataBytes" + record.data = orginal_data + b"AddedDataBytes" self.assertFalse(record.valid) record.data = orginal_data self.assertTrue(record.valid) # Now tamper with the HMAC orginal_hmac = record.hmac - record.hmac = orginal_hmac[:-1] + "A" + record.hmac = orginal_hmac[:-1] + b"A" self.assertFalse(record.valid) record.hmac = orginal_hmac self.assertTrue(record.valid) @@ -137,14 +137,14 @@ def test_ssfs_data_record_decrypt(self): data = SAPSSFSData(s) for name, value in list(self.ENCRYPTED_VALUES.items()): - self.assertTrue(data.has_record(name)) - self.assertIsNotNone(data.get_record(name)) - self.assertEqual(data.get_value(name, key), value) + self.assertTrue(data.has_record(name.encode())) + self.assertIsNotNone(data.get_record(name.encode())) + self.assertEqual(data.get_value(name, key), value.encode()) - record = data.get_record(name) + record = data.get_record(name.encode()) self.assertFalse(record.is_stored_as_plaintext) self.assertTrue(record.valid) if __name__ == "__main__": - unittest.main(verbosity=1) + unittest.main(verbosity=1) \ No newline at end of file