-
Notifications
You must be signed in to change notification settings - Fork 478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for specifying 'RSAPublicKey' instance instead of raw bytes #1477
Add support for specifying 'RSAPublicKey' instance instead of raw bytes #1477
Conversation
CLA Assistant Lite bot All contributors have signed the CLA ✍️ ✅ |
The following is an example of how to use this functionality to authenticate using a key stored in an Azure Key Vault – the from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives.asymmetric.rsa import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
class AzureManagedKey(RSAPrivateKey):
_algorithm_mapping = {
hashes.SHA256: SignatureAlgorithm.rs256,
hashes.SHA384: SignatureAlgorithm.rs384,
hashes.SHA512: SignatureAlgorithm.rs512,
}
_key_name = KEY_NAME
_kvclient = KEY_CLIENT
def decrypt(self, *args) -> bytes:
raise NotImplementedError()
@property
def key_size(self) -> int:
raise NotImplementedError()
def sign(
self,
data: bytes,
padding: AsymmetricPadding,
algorithm: Union[asym_utils.Prehashed, hashes.HashAlgorithm],
) -> bytes:
if not isinstance(padding, PKCS1v15):
raise ValueError("Unsupported padding: %s" % padding.name)
sig_algorithm = self._algorithm_mapping.get(type(algorithm))
if sig_algorithm is None:
raise ValueError("Unsupported algorithm: %s" % algorithm.name)
cc = self._kvclient.get_cryptography_client(self._key_name)
digest = hashes.Hash(algorithm)
digest.update(data)
result = cc.sign(sig_algorithm, digest.finalize())
return result.signature
def private_numbers(self) -> "RSAPrivateNumbers":
raise NotImplementedError()
def private_bytes(self, *args) -> bytes:
raise NotImplementedError()
def public_key(self) -> RSAPublicKey:
pubkey = self._kvclient.get_key(self._key_name)
e = int.from_bytes(pubkey.key.e, "big")
n = int.from_bytes(pubkey.key.n, "big")
rsakey = RSAPublicNumbers(e, n)
return rsakey.public_key() Note that without the changes presented in this pull request, we'd need to write a plugin to achieve the same. The following authentication plugin import datetime
import jwt
from snowflake.connector.auth.keypair import AuthByKeyPair
class AuthByAzureKeyVault(AuthByKeyPair):
def __init__(self, managed_key: AzureManagedKey):
super().__init__(b"")
self._managed_key = managed_key
def reset_secrets(self) -> None:
self._managed_key = None
def prepare(
self,
*,
account: str,
user: str,
**kwargs: Any,
) -> str:
if ".global" in account:
account = account.partition("-")[0]
else:
account = account.partition(".")[0]
account = account.upper()
user = user.upper()
now = datetime.datetime.utcnow()
public_key = self._managed_key.public_key()
public_key_fp = self.calculate_public_key_fingerprint(self._managed_key)
self._jwt_token_exp = now + self._lifetime
payload = {
self.ISSUER: f"{account}.{user}.{public_key_fp}",
self.SUBJECT: f"{account}.{user}",
self.ISSUE_TIME: now,
self.EXPIRE_TIME: self._jwt_token_exp,
}
_jwt_token = jwt.encode(payload, self._managed_key, algorithm=self.ALGORITHM)
# jwt.encode() returns bytes in pyjwt 1.x and a string
# in pyjwt 2.x
if isinstance(_jwt_token, bytes):
self._jwt_token = _jwt_token.decode("utf-8")
else:
self._jwt_token = _jwt_token
return self._jwt_token While this is not too terrible, it's brittle code and there's quite a bit of duplicated code due to lack of extension points. |
I have read the CLA Document and I hereby sign the CLA. |
recheck |
Would be wonderful to get this merged! |
I have updated my previous comment, adding an example of a custom plugin implementation The main points being addressed here are:
|
elif isinstance(self._private_key, RSAPrivateKey): | ||
private_key = self._private_key | ||
else: | ||
raise TypeError(self._private_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's be more explicit on the error message: f"Expected bytes or RSAPrivateKey, got {type(self._private_key)}"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in f779da2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add an unhappy path test to cover this :) Otherwise LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 48e5aca.
Test PR in #1681 |
Hey can you fix the lint failures? Also, it would be great if you could add this commit for changelog: 6f459f9 (will need a rebase to main). |
This can be used to externalize the JWT encoding process.
Note that while this method does not require a private key, the change is inconsequential because we're anyway expecting something that implements a private key at the class level (either bytes or an abstract implementation)
028ef0a
to
8d5e6b8
Compare
@sfc-gh-sfan hope I did the right thing with that change log entry. |
Can you take a look at the test failure? |
"abcd", | ||
1234, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think removing the test is wrong. We should adapt the test to cover these cases, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so – it's not an integration test concern I would say, and we've got the unit test to cover it. One has to keep in mind that these are regression tests, not a complete certification. We're trying to avoid regressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we move these two cases to the unit test as well, in addition to class Bad
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable – good suggestion, I have pushed a change to the previous commit now in 818c3e8, moving those tests instead. Should help with continuity.
8bbb9cb
to
818c3e8
Compare
Merged. Thanks for your contribution :) |
This can be used to externalize the JWT encoding process (see linked issue for motivation and details).
For further motivation, see https://github.blog/2023-03-23-we-updated-our-rsa-ssh-host-key/ – managed keys (software- or hardware protected) are the answer to this problem.
What GitHub issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-676645: Add connector support for keypair authentication using hardware security modules or cloud key managers #1276.
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
This change makes use of the fact that
RSAPrivateKey
is already an interface (ABC) and the JWT library is fully compatible with a custom implementation, for example one that externalizes the signing process to a cloud-based service such as Azure Key Vault.Note that we do not provide any implementations of this interface, nor any new dependencies.