Skip to content

Commit

Permalink
Issue #1271 added LZWDecode compression
Browse files Browse the repository at this point in the history
  • Loading branch information
opposss committed Oct 15, 2024
1 parent ab0bd4d commit 22709f8
Showing 1 changed file with 126 additions and 1 deletion.
127 changes: 126 additions & 1 deletion fpdf/image_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ImageSettings:


LOGGER = logging.getLogger(__name__)
SUPPORTED_IMAGE_FILTERS = ("AUTO", "FlateDecode", "DCTDecode", "JPXDecode")
SUPPORTED_IMAGE_FILTERS = ("AUTO", "FlateDecode", "DCTDecode", "JPXDecode", "LZWDecode")
SETTINGS = ImageSettings()

# fmt: off
Expand Down Expand Up @@ -256,6 +256,11 @@ def get_img_info(filename, img=None, image_filter="AUTO", dims=None):
img = img.convert("RGBA")
img_altered = True

if img.mode in ("P", "RGBA") and image_filter == "LZWDecode":
img = img.convert("RGB")
elif img.mode in ("LA") and image_filter == "LZWDecode":
img = img.convert("L")

w, h = img.size
info = RasterImageInfo()

Expand Down Expand Up @@ -527,13 +532,133 @@ def __getitem__(self, tag):
return newimgio.read(length)


def _to_lzwdata(img, remove_slice=None, select_slice=None):
CLEAR_TABLE_MARKER = 256 # Special code to indicate table reset
EOD_MARKER = 257 # End-of-data marker
INITIAL_BITS_PER_CODE = 9 # Initial code bit width
MAX_BITS_PER_CODE = 12 # Maximum code bit width

data = bytearray(img.tobytes())

if remove_slice:
del data[remove_slice]
if select_slice:
data = data[select_slice]

if img.mode == "1":
row_size = ceil(img.size[0] / 8)
data_with_padding = data
else:
channels_count = len(data) // (img.size[0] * img.size[1])
row_size = img.size[0] * channels_count
data_with_padding = bytearray()
for i in range(0, len(data), row_size):
data_with_padding.extend(b"\0")
data_with_padding.extend(data[i : i + row_size])

data = data_with_padding

def clear_table():
"""
Reset the encoding table and coding state to initial conditions.
"""

table = {bytes([i]): i for i in range(256)}
next_code = EOD_MARKER + 1
bits_per_code = INITIAL_BITS_PER_CODE
max_code_value = (1 << bits_per_code) - 1
return table, next_code, bits_per_code, max_code_value

def pack_codes_into_bytes(codes):
"""
Convert the list of result codes into a continuous byte stream, with codes packed as per the code bit-width.
The bit-width starts at 9 bits and expands as needed.
"""

(
_,
next_code,
bits_per_code,
max_code_value,
) = clear_table()
buffer = 0
bits_in_buffer = 0
output = bytearray()

for code in codes:
buffer = (buffer << bits_per_code) | code
bits_in_buffer += bits_per_code

while bits_in_buffer >= 8:
bits_in_buffer -= 8
output.append((buffer >> bits_in_buffer) & 0xFF)

if code == CLEAR_TABLE_MARKER:
_, next_code, bits_per_code, max_code_value = clear_table()
elif code != EOD_MARKER:
next_code += 1
if next_code > max_code_value and bits_per_code < MAX_BITS_PER_CODE:
bits_per_code += 1
max_code_value = (1 << bits_per_code) - 1

if bits_in_buffer > 0:
output.append((buffer << (8 - bits_in_buffer)) & 0xFF)

return bytes(output)

# Start compression
result_codes = [
CLEAR_TABLE_MARKER
] # The encoder shall begin by issuing a clear-table code
table, next_code, bits_per_code, max_code_value = clear_table()

current_sequence = b""
for byte in data:
next_sequence = current_sequence + bytes([byte])

if next_sequence in table:
# Extend current sequence if already in the table
current_sequence = next_sequence
else:
# Output code for the current sequence
result_codes.append(table[current_sequence])

# Add the new sequence to the table if there's room
if next_code <= (1 << MAX_BITS_PER_CODE) - 1:
table[next_sequence] = next_code
next_code += 1
if next_code > max_code_value and bits_per_code < MAX_BITS_PER_CODE:
bits_per_code += 1
max_code_value = (1 << bits_per_code) - 1
else:
# If the table is full, emit a clear-table command
result_codes.append(CLEAR_TABLE_MARKER)
table, next_code, bits_per_code, max_code_value = clear_table()

# Start new sequence
current_sequence = bytes([byte])

# Ensure everything actually is encoded
if current_sequence:
result_codes.append(table[current_sequence])

result_codes.append(EOD_MARKER)

return pack_codes_into_bytes(result_codes)


def _to_data(img, image_filter, **kwargs):
if image_filter == "FlateDecode":
return _to_zdata(img, **kwargs)

if image_filter == "CCITTFaxDecode":
return transcode_monochrome(img)

if image_filter == "LZWDecode":
return _to_lzwdata(img, **kwargs)

if img.mode == "LA":
img = img.convert("L")

Expand Down

0 comments on commit 22709f8

Please sign in to comment.