Skip to content

Commit

Permalink
Denote metadata file compression/encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
alxndr42 committed Mar 5, 2022
1 parent 318799b commit f1e732f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 49 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Enter passphrase: *****
| Option | Description |
| --- | --- |
| `--comment` | Archive comment. |
| `--compression`, `-c` | Compression for all files: `bz2`, `gz` or `none` (Default: `gz`) |
| `--compression`, `-c` | Compression type: `bz2`, `gz` or `none` (Default: `gz`) |
| `--mode` | Store file/directory modes. |
| `--mtime` | Store file/directory modification times. |
| `--recipient`, `-r` | Allow another public key/alias to extract. |
Expand Down
27 changes: 17 additions & 10 deletions src/icepack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
from pathlib import Path
import re
from shutil import copyfileobj, rmtree

from pydantic import ValidationError
Expand All @@ -16,6 +17,7 @@

_BUFFER_SIZE = 64 * 1024
_MAX_ATTEMPTS = 3
_METADATA_RE = re.compile(r'^metadata\.(\w+)\.(\w+)$')


class IcepackBase():
Expand All @@ -32,22 +34,19 @@ def __init__(self, archive_path, key_path, mode=False, mtime=False):
raise Exception(f'Missing public key: {self.public_key}')
if not self.allowed_signers.is_file():
raise Exception(f'Missing allowed_signers: {self.allowed_signers}')
self._tempdir = None
self._tempdir = File.mktemp(directory=True)
self._zipfile = None
self._mode = mode
self._mtime = mtime

def close(self, silent=False):
"""Close the archive and delete all temporary files."""
if self._tempdir is not None:
rmtree(self._tempdir, ignore_errors=True)
rmtree(self._tempdir, ignore_errors=True)
if self._zipfile is not None:
self._zipfile.close(silent=silent)

def _mktemp(self):
"""Return the Path of a new temporary file."""
if self._tempdir is None:
File.mktemp(directory=True)
return File.mktemp(parent=self._tempdir)


Expand Down Expand Up @@ -114,18 +113,26 @@ def _decrypt_path(self, src_path, dst_path):
def _load_metadata(self):
"""Extract and validate the metadata."""
meta_path, sig_path = self._zipfile.extract_metadata()
if (m := _METADATA_RE.match(meta_path.name)) is None:
raise InvalidArchiveError('Invalid metadata filename.')
compression = m.group(1)
if compression != 'gz':
raise InvalidArchiveError('Unsupported metadata compression.')
encryption = m.group(2)
if encryption != 'age':
raise InvalidArchiveError('Unsupported metadata encryption.')
try:
SSH.verify(meta_path, sig_path, self.allowed_signers)
except Exception:
raise Exception('Failed to verify metadata signature.')
for attempt in range(0, _MAX_ATTEMPTS):
try:
bz2_bytes = Age.decrypt_bytes(meta_path, self.secret_key)
gz_bytes = Age.decrypt_bytes(meta_path, self.secret_key)
break
except Exception:
if attempt == _MAX_ATTEMPTS - 1:
raise Exception('Failed to decrypt metadata.')
json_bytes = bz2.decompress(bz2_bytes)
json_bytes = gzip.decompress(gz_bytes)
try:
self.metadata = Metadata.parse_raw(json_bytes)
except ValidationError as e:
Expand Down Expand Up @@ -222,10 +229,10 @@ def add_entry(self, source, base_path):
def add_metadata(self):
"""Add the metadata file."""
json_bytes = self.metadata.json(exclude_none=True).encode()
bz2_bytes = bz2.compress(json_bytes)
meta_path = self._mktemp()
gz_bytes = gzip.compress(json_bytes)
meta_path = self._tempdir / 'metadata.gz.age'
try:
Age.encrypt_bytes(bz2_bytes, meta_path, self._recipients)
Age.encrypt_bytes(gz_bytes, meta_path, self._recipients)
except Exception:
raise Exception('Failed to encrypt metadata.')
for attempt in range(0, _MAX_ATTEMPTS):
Expand Down
2 changes: 1 addition & 1 deletion src/icepack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def init(ctx):
@click.option('--comment', help='Archive comment.')
@click.option(
'--compression', '-c',
help=f'Compression for all files. (Default: {Compression.GZ})',
help=f'Compression type. (Default: {Compression.GZ})',
type=click.Choice([c.value for c in Compression]),
default=Compression.GZ)
@click.option(
Expand Down
36 changes: 23 additions & 13 deletions src/icepack/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,16 @@ def __init__(self, path, mode='r'):
self.path = path.resolve()
self._mode = mode
self._zipfile = ZipFile(path, mode=mode)
self._temp_dir = File.mktemp(directory=True)
self._tempdir = File.mktemp(directory=True)
self._entries = {}
if mode == 'r':
infolist = self._zipfile.infolist()
self._validate_infolist(infolist)
self._entries = {i.filename: i for i in infolist}
else:
self._entries = {}
for i in infolist:
if i.filename.startswith('metadata'):
self._entries['metadata'] = i
else:
self._entries[i.filename] = i

def __enter__(self):
return self
Expand All @@ -298,6 +301,8 @@ def add_entry(self, key, path):
raise Exception('Not in write mode.')
if key in self._entries:
raise InvalidArchiveError(f'Duplicate key: {key}')
if key.startswith('metadata'):
raise InvalidArchiveError(f'Invalid key: {key}')
if 'metadata' in self._entries:
raise InvalidArchiveError(f'Metadata file already added.')
info = ZipInfo(key)
Expand All @@ -314,7 +319,9 @@ def add_metadata(self, data_path, sig_path):
raise Exception('Not in write mode.')
if 'metadata' in self._entries:
raise InvalidArchiveError(f'Metadata file already added.')
info = ZipInfo('metadata')
if not data_path.name.startswith('metadata'):
raise InvalidArchiveError('Invalid metadata filename.')
info = ZipInfo(data_path.name)
info.comment = sig_path.read_bytes()
self._entries['metadata'] = info
with self._zipfile.open(info, mode='w', force_zip64=True) as dst:
Expand All @@ -323,7 +330,7 @@ def add_metadata(self, data_path, sig_path):

def close(self, silent=False):
"""Close the Zip archive and delete all temporary files."""
rmtree(self._temp_dir, ignore_errors=True)
rmtree(self._tempdir, ignore_errors=True)
try:
self._zipfile.close()
except Exception:
Expand All @@ -339,15 +346,15 @@ def extract_entry(self, key):
if key not in self._entries:
raise Exception(f'Invalid key: {key}')
info = self._entries[key]
path = Path(self._zipfile.extract(info, path=self._temp_dir))
path = Path(self._zipfile.extract(info, path=self._tempdir))
return path

def extract_metadata(self):
"""Return a tuple of temporary Paths for metadata and signature."""
if self._mode != 'r':
raise Exception('Not in read mode.')
info = self._entries['metadata']
meta_path = Path(self._zipfile.extract(info, path=self._temp_dir))
meta_path = Path(self._zipfile.extract(info, path=self._tempdir))
sig_path = meta_path.parent / (meta_path.name + '.sig')
sig_path.write_bytes(info.comment)
return meta_path, sig_path
Expand All @@ -357,12 +364,15 @@ def _validate_infolist(infolist):
"""Check the infolist for validity."""
if len(infolist) == 0:
raise InvalidArchiveError('Empty Zip.')
if infolist[-1].filename != 'metadata':
if any(map(lambda i: i.compress_type != ZIP_STORED, infolist)):
raise InvalidArchiveError('Non-STORED Zip entry.')
if not infolist[-1].filename.startswith('metadata'):
raise InvalidArchiveError('No metadata file at end of Zip.')
if infolist[-1].comment is None:
raise InvalidArchiveError('No metadata signature.')
filenames = {i.filename for i in infolist}
if len(filenames) != len(infolist):
files = infolist[:-1]
names = {f.filename for f in files}
if len(names) != len(files):
raise InvalidArchiveError(f'Duplicate filename in Zip.')
if any(map(lambda i: i.compress_type != ZIP_STORED, infolist)):
raise InvalidArchiveError('Non-STORED Zip entry.')
if any(map(lambda f: f.filename.startswith('metadata'), files)):
raise InvalidArchiveError('Invalid filename.')
Binary file modified tests/data/zips/zip-helper.zip
Binary file not shown.
56 changes: 32 additions & 24 deletions tests/test_helper_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
from icepack.error import InvalidArchiveError
from icepack.helper import File, Zip

from helper import src_path, zip_path
from helper import src_path, dst_path, zip_path


@pytest.fixture
def meta_path(dst_path):
path = dst_path / 'metadata'
path.write_text('metadata')
return path


@pytest.fixture
Expand All @@ -16,10 +23,9 @@ def zip_file(zip_path):
class TestWriteMode:
"""Test write operations."""

def test_write_mode(self, src_path, zip_file):
def test_write_mode(self, src_path, meta_path, zip_file):
zip_file.add_entry('foo', src_path / 'foo')
zip_file.add_entry('bar', None)
meta_path = src_path / 'qux' / 'quux'
zip_file.add_metadata(meta_path, meta_path)
zip_file.close()
with ZipFile(zip_file.path) as zip_file:
Expand All @@ -28,34 +34,36 @@ def test_write_mode(self, src_path, zip_file):
names = [i.filename for i in infolist]
assert names == ['foo', 'bar', 'metadata']
sizes = [i.file_size for i in infolist]
assert sizes == [4, 0, 5]
assert sizes == [4, 0, 8]

def test_no_metadata(self, src_path, zip_file):
zip_file.add_entry('foo', src_path / 'foo')
with pytest.raises(InvalidArchiveError):
zip_file.close()

def test_add_entry_twice(self, src_path, zip_file):
zip_file.add_entry('foo', src_path / 'foo')
with pytest.raises(InvalidArchiveError):
zip_file.add_entry('foo', src_path / 'foo')

def test_add_metadata_twice(self, src_path, zip_file):
foo = src_path / 'foo'
zip_file.add_metadata(foo, foo)
def test_add_metadata_twice(self, meta_path, zip_file):
zip_file.add_metadata(meta_path, meta_path)
with pytest.raises(InvalidArchiveError):
zip_file.add_metadata(foo, foo)
zip_file.add_metadata(meta_path, meta_path)

def test_add_entry_after_metdata(self, src_path, zip_file):
foo = src_path / 'foo'
zip_file.add_entry('foo', foo)
zip_file.add_metadata(foo, foo)
def test_add_entry_after_metdata(self, meta_path, zip_file):
zip_file.add_entry('foo', meta_path)
zip_file.add_metadata(meta_path, meta_path)
with pytest.raises(InvalidArchiveError):
zip_file.add_entry('bar', foo)
zip_file.add_entry('bar', meta_path)

def test_context_manager(self, src_path, zip_path):
def test_context_manager(self, meta_path, zip_path):
with Zip(zip_path, mode='w') as zip_file:
zip_file.add_entry('foo', src_path / 'foo')
meta_path = src_path / 'qux' / 'quux'
zip_file.add_entry('foo', meta_path)
zip_file.add_metadata(meta_path, meta_path)
temp_dir = zip_file._temp_dir
assert temp_dir.exists()
assert not temp_dir.exists()
tempdir = zip_file._tempdir
assert tempdir.exists()
assert not tempdir.exists()
assert zip_path.exists()


Expand All @@ -68,8 +76,8 @@ def test_read_mode(self, shared_datadir):
path = zip_file.extract_entry('foo')
assert File.sha256(path) == 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c' # noqa
path, sig = zip_file.extract_metadata()
assert File.sha256(path) == '49ae93732fcf8d63fe1cce759664982dbd5b23161f007dba8561862adc96d063' # noqa
assert File.sha256(sig) == '49ae93732fcf8d63fe1cce759664982dbd5b23161f007dba8561862adc96d063' # noqa
assert File.sha256(path) == '45447b7afbd5e544f7d0f1df0fccd26014d9850130abd3f020b89ff96b82079f' # noqa
assert File.sha256(sig) == '45447b7afbd5e544f7d0f1df0fccd26014d9850130abd3f020b89ff96b82079f' # noqa
zip_file.close()

def test_regular_zip(self, shared_datadir):
Expand All @@ -82,6 +90,6 @@ def test_context_manager(self, shared_datadir):
with Zip(path) as zip_file:
path = zip_file.extract_entry('foo')
assert File.sha256(path) == 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c' # noqa
temp_dir = zip_file._temp_dir
assert temp_dir.exists()
assert not temp_dir.exists()
tempdir = zip_file._tempdir
assert tempdir.exists()
assert not tempdir.exists()

0 comments on commit f1e732f

Please sign in to comment.