diff --git a/README.md b/README.md index e30f355..620cea9 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Enter passphrase: ***** | Option | Description | | --- | --- | | `--comment` | Archive comment. | -| `--compression`, `-c` | Compression for all files: `bz2`, `gz` or `none` (Default: `gz`) | +| `--compression`, `-c` | Compression type: `bz2`, `gz` or `none` (Default: `gz`) | | `--mode` | Store file/directory modes. | | `--mtime` | Store file/directory modification times. | | `--recipient`, `-r` | Allow another public key/alias to extract. | diff --git a/src/icepack/__init__.py b/src/icepack/__init__.py index 4ca6db5..a7077c0 100644 --- a/src/icepack/__init__.py +++ b/src/icepack/__init__.py @@ -3,6 +3,7 @@ import json import os from pathlib import Path +import re from shutil import copyfileobj, rmtree from pydantic import ValidationError @@ -16,6 +17,7 @@ _BUFFER_SIZE = 64 * 1024 _MAX_ATTEMPTS = 3 +_METADATA_RE = re.compile(r'^metadata\.(\w+)\.(\w+)$') class IcepackBase(): @@ -32,22 +34,19 @@ def __init__(self, archive_path, key_path, mode=False, mtime=False): raise Exception(f'Missing public key: {self.public_key}') if not self.allowed_signers.is_file(): raise Exception(f'Missing allowed_signers: {self.allowed_signers}') - self._tempdir = None + self._tempdir = File.mktemp(directory=True) self._zipfile = None self._mode = mode self._mtime = mtime def close(self, silent=False): """Close the archive and delete all temporary files.""" - if self._tempdir is not None: - rmtree(self._tempdir, ignore_errors=True) + rmtree(self._tempdir, ignore_errors=True) if self._zipfile is not None: self._zipfile.close(silent=silent) def _mktemp(self): """Return the Path of a new temporary file.""" - if self._tempdir is None: - File.mktemp(directory=True) return File.mktemp(parent=self._tempdir) @@ -114,18 +113,26 @@ def _decrypt_path(self, src_path, dst_path): def _load_metadata(self): """Extract and validate the metadata.""" meta_path, sig_path = self._zipfile.extract_metadata() + if (m := _METADATA_RE.match(meta_path.name)) is None: + raise InvalidArchiveError('Invalid metadata filename.') + compression = m.group(1) + if compression != 'gz': + raise InvalidArchiveError('Unsupported metadata compression.') + encryption = m.group(2) + if encryption != 'age': + raise InvalidArchiveError('Unsupported metadata encryption.') try: SSH.verify(meta_path, sig_path, self.allowed_signers) except Exception: raise Exception('Failed to verify metadata signature.') for attempt in range(0, _MAX_ATTEMPTS): try: - bz2_bytes = Age.decrypt_bytes(meta_path, self.secret_key) + gz_bytes = Age.decrypt_bytes(meta_path, self.secret_key) break except Exception: if attempt == _MAX_ATTEMPTS - 1: raise Exception('Failed to decrypt metadata.') - json_bytes = bz2.decompress(bz2_bytes) + json_bytes = gzip.decompress(gz_bytes) try: self.metadata = Metadata.parse_raw(json_bytes) except ValidationError as e: @@ -222,10 +229,10 @@ def add_entry(self, source, base_path): def add_metadata(self): """Add the metadata file.""" json_bytes = self.metadata.json(exclude_none=True).encode() - bz2_bytes = bz2.compress(json_bytes) - meta_path = self._mktemp() + gz_bytes = gzip.compress(json_bytes) + meta_path = self._tempdir / 'metadata.gz.age' try: - Age.encrypt_bytes(bz2_bytes, meta_path, self._recipients) + Age.encrypt_bytes(gz_bytes, meta_path, self._recipients) except Exception: raise Exception('Failed to encrypt metadata.') for attempt in range(0, _MAX_ATTEMPTS): diff --git a/src/icepack/cli.py b/src/icepack/cli.py index 1dbd6a0..541ba49 100644 --- a/src/icepack/cli.py +++ b/src/icepack/cli.py @@ -47,7 +47,7 @@ def init(ctx): @click.option('--comment', help='Archive comment.') @click.option( '--compression', '-c', - help=f'Compression for all files. (Default: {Compression.GZ})', + help=f'Compression type. (Default: {Compression.GZ})', type=click.Choice([c.value for c in Compression]), default=Compression.GZ) @click.option( diff --git a/src/icepack/helper.py b/src/icepack/helper.py index 354933a..50ab45d 100644 --- a/src/icepack/helper.py +++ b/src/icepack/helper.py @@ -277,13 +277,16 @@ def __init__(self, path, mode='r'): self.path = path.resolve() self._mode = mode self._zipfile = ZipFile(path, mode=mode) - self._temp_dir = File.mktemp(directory=True) + self._tempdir = File.mktemp(directory=True) + self._entries = {} if mode == 'r': infolist = self._zipfile.infolist() self._validate_infolist(infolist) - self._entries = {i.filename: i for i in infolist} - else: - self._entries = {} + for i in infolist: + if i.filename.startswith('metadata'): + self._entries['metadata'] = i + else: + self._entries[i.filename] = i def __enter__(self): return self @@ -298,6 +301,8 @@ def add_entry(self, key, path): raise Exception('Not in write mode.') if key in self._entries: raise InvalidArchiveError(f'Duplicate key: {key}') + if key.startswith('metadata'): + raise InvalidArchiveError(f'Invalid key: {key}') if 'metadata' in self._entries: raise InvalidArchiveError(f'Metadata file already added.') info = ZipInfo(key) @@ -314,7 +319,9 @@ def add_metadata(self, data_path, sig_path): raise Exception('Not in write mode.') if 'metadata' in self._entries: raise InvalidArchiveError(f'Metadata file already added.') - info = ZipInfo('metadata') + if not data_path.name.startswith('metadata'): + raise InvalidArchiveError('Invalid metadata filename.') + info = ZipInfo(data_path.name) info.comment = sig_path.read_bytes() self._entries['metadata'] = info with self._zipfile.open(info, mode='w', force_zip64=True) as dst: @@ -323,7 +330,7 @@ def add_metadata(self, data_path, sig_path): def close(self, silent=False): """Close the Zip archive and delete all temporary files.""" - rmtree(self._temp_dir, ignore_errors=True) + rmtree(self._tempdir, ignore_errors=True) try: self._zipfile.close() except Exception: @@ -339,7 +346,7 @@ def extract_entry(self, key): if key not in self._entries: raise Exception(f'Invalid key: {key}') info = self._entries[key] - path = Path(self._zipfile.extract(info, path=self._temp_dir)) + path = Path(self._zipfile.extract(info, path=self._tempdir)) return path def extract_metadata(self): @@ -347,7 +354,7 @@ def extract_metadata(self): if self._mode != 'r': raise Exception('Not in read mode.') info = self._entries['metadata'] - meta_path = Path(self._zipfile.extract(info, path=self._temp_dir)) + meta_path = Path(self._zipfile.extract(info, path=self._tempdir)) sig_path = meta_path.parent / (meta_path.name + '.sig') sig_path.write_bytes(info.comment) return meta_path, sig_path @@ -357,12 +364,15 @@ def _validate_infolist(infolist): """Check the infolist for validity.""" if len(infolist) == 0: raise InvalidArchiveError('Empty Zip.') - if infolist[-1].filename != 'metadata': + if any(map(lambda i: i.compress_type != ZIP_STORED, infolist)): + raise InvalidArchiveError('Non-STORED Zip entry.') + if not infolist[-1].filename.startswith('metadata'): raise InvalidArchiveError('No metadata file at end of Zip.') if infolist[-1].comment is None: raise InvalidArchiveError('No metadata signature.') - filenames = {i.filename for i in infolist} - if len(filenames) != len(infolist): + files = infolist[:-1] + names = {f.filename for f in files} + if len(names) != len(files): raise InvalidArchiveError(f'Duplicate filename in Zip.') - if any(map(lambda i: i.compress_type != ZIP_STORED, infolist)): - raise InvalidArchiveError('Non-STORED Zip entry.') + if any(map(lambda f: f.filename.startswith('metadata'), files)): + raise InvalidArchiveError('Invalid filename.') diff --git a/tests/data/zips/zip-helper.zip b/tests/data/zips/zip-helper.zip index c7fc137..c0a7d64 100644 Binary files a/tests/data/zips/zip-helper.zip and b/tests/data/zips/zip-helper.zip differ diff --git a/tests/test_helper_zip.py b/tests/test_helper_zip.py index 4958507..c10bf3d 100644 --- a/tests/test_helper_zip.py +++ b/tests/test_helper_zip.py @@ -5,7 +5,14 @@ from icepack.error import InvalidArchiveError from icepack.helper import File, Zip -from helper import src_path, zip_path +from helper import src_path, dst_path, zip_path + + +@pytest.fixture +def meta_path(dst_path): + path = dst_path / 'metadata' + path.write_text('metadata') + return path @pytest.fixture @@ -16,10 +23,9 @@ def zip_file(zip_path): class TestWriteMode: """Test write operations.""" - def test_write_mode(self, src_path, zip_file): + def test_write_mode(self, src_path, meta_path, zip_file): zip_file.add_entry('foo', src_path / 'foo') zip_file.add_entry('bar', None) - meta_path = src_path / 'qux' / 'quux' zip_file.add_metadata(meta_path, meta_path) zip_file.close() with ZipFile(zip_file.path) as zip_file: @@ -28,34 +34,36 @@ def test_write_mode(self, src_path, zip_file): names = [i.filename for i in infolist] assert names == ['foo', 'bar', 'metadata'] sizes = [i.file_size for i in infolist] - assert sizes == [4, 0, 5] + assert sizes == [4, 0, 8] + + def test_no_metadata(self, src_path, zip_file): + zip_file.add_entry('foo', src_path / 'foo') + with pytest.raises(InvalidArchiveError): + zip_file.close() def test_add_entry_twice(self, src_path, zip_file): zip_file.add_entry('foo', src_path / 'foo') with pytest.raises(InvalidArchiveError): zip_file.add_entry('foo', src_path / 'foo') - def test_add_metadata_twice(self, src_path, zip_file): - foo = src_path / 'foo' - zip_file.add_metadata(foo, foo) + def test_add_metadata_twice(self, meta_path, zip_file): + zip_file.add_metadata(meta_path, meta_path) with pytest.raises(InvalidArchiveError): - zip_file.add_metadata(foo, foo) + zip_file.add_metadata(meta_path, meta_path) - def test_add_entry_after_metdata(self, src_path, zip_file): - foo = src_path / 'foo' - zip_file.add_entry('foo', foo) - zip_file.add_metadata(foo, foo) + def test_add_entry_after_metdata(self, meta_path, zip_file): + zip_file.add_entry('foo', meta_path) + zip_file.add_metadata(meta_path, meta_path) with pytest.raises(InvalidArchiveError): - zip_file.add_entry('bar', foo) + zip_file.add_entry('bar', meta_path) - def test_context_manager(self, src_path, zip_path): + def test_context_manager(self, meta_path, zip_path): with Zip(zip_path, mode='w') as zip_file: - zip_file.add_entry('foo', src_path / 'foo') - meta_path = src_path / 'qux' / 'quux' + zip_file.add_entry('foo', meta_path) zip_file.add_metadata(meta_path, meta_path) - temp_dir = zip_file._temp_dir - assert temp_dir.exists() - assert not temp_dir.exists() + tempdir = zip_file._tempdir + assert tempdir.exists() + assert not tempdir.exists() assert zip_path.exists() @@ -68,8 +76,8 @@ def test_read_mode(self, shared_datadir): path = zip_file.extract_entry('foo') assert File.sha256(path) == 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c' # noqa path, sig = zip_file.extract_metadata() - assert File.sha256(path) == '49ae93732fcf8d63fe1cce759664982dbd5b23161f007dba8561862adc96d063' # noqa - assert File.sha256(sig) == '49ae93732fcf8d63fe1cce759664982dbd5b23161f007dba8561862adc96d063' # noqa + assert File.sha256(path) == '45447b7afbd5e544f7d0f1df0fccd26014d9850130abd3f020b89ff96b82079f' # noqa + assert File.sha256(sig) == '45447b7afbd5e544f7d0f1df0fccd26014d9850130abd3f020b89ff96b82079f' # noqa zip_file.close() def test_regular_zip(self, shared_datadir): @@ -82,6 +90,6 @@ def test_context_manager(self, shared_datadir): with Zip(path) as zip_file: path = zip_file.extract_entry('foo') assert File.sha256(path) == 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c' # noqa - temp_dir = zip_file._temp_dir - assert temp_dir.exists() - assert not temp_dir.exists() + tempdir = zip_file._tempdir + assert tempdir.exists() + assert not tempdir.exists()