Skip to content

Commit

Permalink
Relax requirements around PKCS#11 config
Browse files Browse the repository at this point in the history
 - Allow key label / id to be defaulted from the corresponding
   cert settings and vice-versa
 - For directly instantiated PKCS11Signers, delay throwing errors
   until key/cert lookup
   (allows even more flexibility at the cost of less informative error
    messages at this level of the API)

Fixes #386
  • Loading branch information
MatthiasValvekens committed Mar 7, 2024
1 parent 9936d8c commit 5041978
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 85 deletions.
28 changes: 18 additions & 10 deletions pyhanko/config/pkcs11.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,24 +224,32 @@ def process_entries(cls, config_dict):
config_dict['key_id'] = _process_pkcs11_id_value(
config_dict['key_id']
)
elif 'key_label' not in config_dict and 'cert_label' not in config_dict:
raise ConfigurationError(
"Either 'key_id', 'key_label' or 'cert_label' must be provided "
"in PKCS#11 setup"
)

if 'cert_id' in config_dict:
config_dict['cert_id'] = _process_pkcs11_id_value(
config_dict['cert_id']
)
elif (

if 'key_label' not in config_dict and 'key_id' not in config_dict:
if 'cert_id' not in config_dict and 'cert_label' not in config_dict:
raise ConfigurationError(
"Either 'key_id', 'key_label', 'cert_label' or 'cert_id',"
"must be provided in PKCS#11 setup"
)
if 'cert_id' in config_dict:
config_dict['key_id'] = config_dict['cert_id']
if 'cert_label' in config_dict:
config_dict['key_label'] = config_dict['cert_label']

if (
'cert_label' not in config_dict
and 'cert_id' not in config_dict
and 'signing_certificate' not in config_dict
):
raise ConfigurationError(
"Either 'cert_id', 'cert_label' or 'signing_certificate' "
"must be provided in PKCS#11 setup"
)
if 'key_id' in config_dict:
config_dict['cert_id'] = config_dict['key_id']
if 'key_label' in config_dict:
config_dict['cert_label'] = config_dict['key_label']

config_dict['prompt_pin'] = get_and_apply(
config_dict,
Expand Down
7 changes: 7 additions & 0 deletions pyhanko/pdf_utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,10 @@ def __init__(self, i: Iterable[X]):

def lift_iterable_async(i: Iterable[X]) -> CancelableAsyncIterator[X]:
return _LiftedIterable(i)


def coalesce(*args):
for arg in args:
if arg is not None:
return arg
return None
22 changes: 5 additions & 17 deletions pyhanko/sign/pkcs11.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
PKCS11SignatureConfig,
TokenCriteria,
)
from pyhanko.pdf_utils.misc import coalesce
from pyhanko.sign.general import SigningError, get_pyca_cryptography_hash
from pyhanko.sign.signers import Signer

Expand Down Expand Up @@ -516,23 +517,10 @@ def __init__(
"""
Initialise a PKCS11 signer.
"""
if signing_cert is None and cert_id is None and cert_label is None:
raise SigningError(
"Please specify a signer's certificate through the "
"'cert_id', 'signing_cert' and/or 'cert_label' options"
)

self.cert_label = cert_label
self.key_id = key_id
self.cert_id = cert_id
if key_id is None and key_label is None:
if cert_label is None:
raise SigningError(
"If 'cert_label' is None, then 'key_label' or 'key_id' "
"must be provided."
)
key_label = cert_label
self.key_label = key_label
self.cert_label = coalesce(cert_label, key_label)
self.key_id = coalesce(key_id, cert_id)
self.cert_id = coalesce(cert_id, key_id)
self.key_label = coalesce(key_label, cert_label)
self.pkcs11_session = pkcs11_session
self.other_certs = other_certs_to_pull
self._other_certs_loaded = False
Expand Down
127 changes: 84 additions & 43 deletions pyhanko_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,20 +718,6 @@ def test_read_pkcs11_config_external_cert():
assert isinstance(setup.signing_certificate, x509.Certificate)


def test_read_pkcs11_config_no_key_spec():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
cert-id: "deadbeef"
"""
)
with pytest.raises(ConfigurationError, match="Either 'key_id'"):
ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')


def test_read_pkcs11_config_bad_criteria_type():
cli_config = _parse_cli_config(
f"""
Expand Down Expand Up @@ -765,20 +751,75 @@ def test_read_pkcs11_config_bad_serial():
ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')


def test_read_pkcs11_config_no_cert_spec():
def test_read_pkcs11_config_no_cert_spec_or_key_spec():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
key-label: signer
"""
)
with pytest.raises(ConfigurationError, match="Either 'cert_id'"):
with pytest.raises(ConfigurationError, match="Either 'key_id'"):
ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')


def test_read_pkcs11_config_cert_label_from_key_label():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
key-label: signer
"""
)
cfg = ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')
assert (cfg.cert_label, cfg.cert_id) == ('signer', None)


def test_read_pkcs11_config_cert_id_from_key_id():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
key-id: deadbeef
"""
)
cfg = ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')
assert (cfg.cert_label, cfg.cert_id) == (None, b"\xde\xad\xbe\xef")


def test_read_pkcs11_config_key_id_from_cert_id():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
cert-id: "deadbeef"
"""
)
cfg = ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')
assert (cfg.key_label, cfg.key_id) == (None, b"\xde\xad\xbe\xef")


def test_read_pkcs11_config_key_label_from_cert_label():
cli_config = _parse_cli_config(
f"""
pkcs11-setups:
foo:
module-path: /path/to/libfoo.so
slot-no: 0
cert-label: "signer"
"""
)
cfg = ModuleConfigWrapper(cli_config).get_pkcs11_config('foo')
assert (cfg.key_label, cfg.key_id) == ("signer", None)


@pytest.mark.parametrize(
'literal,exp_val',
[
Expand Down Expand Up @@ -1062,50 +1103,50 @@ class DemoConfigurableB(ConfigurableMixin):
[
(
"""
some_field:
field1: 1
field2: [1,2]
""",
some_field:
field1: 1
field2: [1,2]
""",
DemoConfigurableA(field1=1, field2=[1, 2]),
),
(
"""
some_field:
field1: 1
field2: [1,2]
field3: 5
""",
some_field:
field1: 1
field2: [1,2]
field3: 5
""",
DemoConfigurableA(field1=1, field2=[1, 2], field3=5),
),
(
"""
some_field:
field1: 1
field2: [1,2]
field3: 5
field4: [6,7,8]
""",
some_field:
field1: 1
field2: [1,2]
field3: 5
field4: [6,7,8]
""",
DemoConfigurableA(
field1=1, field2=[1, 2], field3=5, field4=[6, 7, 8]
),
),
(
"""
some_field:
field1: 1
field2: [1,2]
field5: xyz
""",
some_field:
field1: 1
field2: [1,2]
field5: xyz
""",
DemoConfigurableA(field1=1, field2=[1, 2], field5='xyz'),
),
(
"""
some_field:
field1: 1
field2: [1,2]
field5: 8
field6: xyz
""",
some_field:
field1: 1
field2: [1,2]
field5: 8
field6: xyz
""",
DemoConfigurableA(field1=1, field2=[1, 2], field5=8, field6='xyz'),
),
("{}", None),
Expand Down
36 changes: 21 additions & 15 deletions pyhanko_tests/test_pkcs11.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def test_wrong_cert(bulk_fetch):
bulk_fetch=bulk_fetch,
cert_id=binascii.unhexlify(b'deadbeef'),
)
with pytest.raises(PKCS11Error, match='Could not find.*with ID'):
with pytest.raises(PKCS11Error, match='Could not find cert'):
signers.sign_pdf(w, meta, signer=signer)


Expand Down Expand Up @@ -283,25 +283,31 @@ def test_signer_pulled_others_provided(bulk_fetch):


@freeze_time('2020-11-01')
def test_unclear_key_label():
signer_cert = TESTING_CA.get_cert(CertLabel('signer1'))
def test_unclear_key_label_and_cert():
w = IncrementalPdfFileWriter(BytesIO(MINIMAL))
meta = signers.PdfSignatureMetadata(field_name='Sig1')
with _simple_sess() as sess:
with pytest.raises(SigningError, match='\'key_label\'.*must be prov'):
pkcs11.PKCS11Signer(
sess,
signing_cert=signer_cert,
other_certs_to_pull=default_other_certs,
)
with pytest.raises(PKCS11Error, match='Found more than one'):
signer = pkcs11.PKCS11Signer(sess)
signers.sign_pdf(w, meta, signer=signer)


@freeze_time('2020-11-01')
def test_unclear_signer_cert():
def test_auto_use_only_key_if_cert_is_known():
w = IncrementalPdfFileWriter(BytesIO(MINIMAL))
meta = signers.PdfSignatureMetadata(field_name='Sig1')
signer_cert = TESTING_CA.get_cert(CertLabel('signer1'))
with _simple_sess() as sess:
with pytest.raises(SigningError, match='Please specify'):
pkcs11.PKCS11Signer(
sess,
other_certs_to_pull=default_other_certs,
)
signer = pkcs11.PKCS11Signer(
sess,
signing_cert=signer_cert,
other_certs_to_pull=default_other_certs,
)
out = signers.sign_pdf(w, meta, signer=signer)

r = PdfFileReader(out)
emb = r.embedded_signatures[0]
val_trusted(emb)


@pytest.mark.parametrize('bulk_fetch', [True, False])
Expand Down

0 comments on commit 5041978

Please sign in to comment.