Skip to content

Commit

Permalink
Process comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Jan 22, 2024
1 parent 675acc1 commit 471c5dc
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions dissect/util/compression/xz.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

from dissect.util.stream import OverlayStream

HEADER_FOOTER_SIZE = 12
CRC_SIZE = 4


def repair_checksum(fh: BinaryIO) -> BinaryIO:
"""Repair CRC32 checksums for all headers in an XZ stream.
Expand All @@ -22,50 +25,52 @@ def repair_checksum(fh: BinaryIO) -> BinaryIO:
repaired = OverlayStream(fh, size)
fh.seek(0)

header = fh.read(12)
header = fh.read(HEADER_FOOTER_SIZE)
# Check header magic
if header[:6] != b"\xfd7zXZ\x00":
magic = b"\xfd7zXZ\x00"
if header[: len(magic)] != magic:
raise ValueError("Not an XZ file")

Check warning on line 32 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L32

Added line #L32 was not covered by tests

# Add correct header CRC32
repaired.add(8, _crc32(header[6:8]))

fh.seek(-12, io.SEEK_END)
footer = fh.read(12)
footer_offset = fh.seek(-HEADER_FOOTER_SIZE, io.SEEK_END)
footer = fh.read(HEADER_FOOTER_SIZE)

# Check footer magic
if footer[10:12] != b"YZ":
footer_magic = b"YZ"
if footer[HEADER_FOOTER_SIZE - len(footer_magic) : HEADER_FOOTER_SIZE] != footer_magic:
raise ValueError("Not an XZ file")

Check warning on line 43 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L43

Added line #L43 was not covered by tests

# Add correct footer CRC32
repaired.add(fh.tell() - 12, _crc32(footer[4:10]))
repaired.add(footer_offset, _crc32(footer[CRC_SIZE : HEADER_FOOTER_SIZE - len(footer_magic)]))

backward_size = (int.from_bytes(footer[4:8], "little") + 1) * 4
fh.seek(-12 - backward_size, io.SEEK_END)
fh.seek(-HEADER_FOOTER_SIZE - backward_size, io.SEEK_END)
index = fh.read(backward_size)

# Add correct index CRC32
repaired.add(fh.tell() - 4, _crc32(index[:-4]))
repaired.add(fh.tell() - CRC_SIZE, _crc32(index[:-CRC_SIZE]))

# Parse the index
isize, nb_records = _mbi(index[1:])
isize, num_records = _mbi(index[1:])
index = index[1 + isize : -4]
records = []
for _ in range(nb_records):
for _ in range(num_records):
if not index:
raise ValueError("index size")
raise ValueError("Missing index size")

Check warning on line 61 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L61

Added line #L61 was not covered by tests

isize, unpadded_size = _mbi(index)
if not unpadded_size:
raise ValueError("index record unpadded size")
raise ValueError("Missing index record unpadded size")

Check warning on line 65 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L65

Added line #L65 was not covered by tests

index = index[isize:]
if not index:
raise ValueError("index size")
raise ValueError("Missing index size")

Check warning on line 69 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L69

Added line #L69 was not covered by tests

isize, uncompressed_size = _mbi(index)
if not uncompressed_size:
raise ValueError("index record uncompressed size")
raise ValueError("Missing index record uncompressed size")

Check warning on line 73 in dissect/util/compression/xz.py

View check run for this annotation

Codecov / codecov/patch

dissect/util/compression/xz.py#L73

Added line #L73 was not covered by tests

index = index[isize:]
records.append((unpadded_size, uncompressed_size))
Expand All @@ -81,14 +86,20 @@ def repair_checksum(fh: BinaryIO) -> BinaryIO:
block_header = fh.read(1)
block_header_size = (block_header[0] + 1) * 4
block_header += fh.read(block_header_size - 1)
repaired.add(fh.tell() - 4, _crc32(block_header[:-4]))
repaired.add(fh.tell() - CRC_SIZE, _crc32(block_header[:-4]))

block_start += (unpadded_size + 3) & ~3

return repaired


def _mbi(data: bytes) -> tuple[int, int]:
"""Decode a multibyte integer.
The encoding is similar to most other "varint" encodings. For each byte, the 7 least significant bits are used for
the integer value. The most significant bit is used to indicate if the integer continues in the next byte.
Bytes are ordered in little endian byte order, meaning the least significant byte comes first.
"""
value = 0
for size, byte in enumerate(data):
value |= (byte & 0x7F) << (size * 7)
Expand Down

0 comments on commit 471c5dc

Please sign in to comment.