From 87b496f25f698f8ccfcf00b2becf7cced47685f6 Mon Sep 17 00:00:00 2001 From: Seena Fallah Date: Fri, 19 Jul 2024 23:07:03 +0200 Subject: [PATCH 1/2] BucketPolicy: add test for sse-c in conditions Ref. https://github.com/ceph/ceph/pull/58689 Signed-off-by: Seena Fallah --- s3tests_boto3/functional/test_s3.py | 42 ++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 98b3cddb..306bfca8 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -10077,6 +10077,44 @@ def test_encryption_sse_c_post_object_authenticated_request(): body = _get_body(response) assert body == 'bar' + +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +def test_encryption_sse_c_enforced_with_bucket_policy(): + bucket_name = get_new_bucket() + client = get_client() + + deny_incorrect_algo = { + "StringNotEquals": { + "s3:x-amz-server-side-encryption-customer-algorithm": "AES256" + } + } + + deny_unencrypted_obj = { + "Null" : { + "s3:x-amz-server-side-encryption-customer-algorithm": "true" + } + } + + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) + + s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) + s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) + policy_document = p.add_statement(s1).add_statement(s2).to_json() + + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + + check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar') + + client.put_object( + Bucket=bucket_name, Key='foo', Body='bar', + SSECustomerAlgorithm='AES256', + SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw==' + ) + + @pytest.mark.encryption @pytest.mark.fails_on_dbstore def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'): @@ -10102,10 +10140,6 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'): assert body == data - - - - @pytest.mark.encryption @pytest.mark.fails_on_dbstore def test_sse_kms_method_head(): From c9aded48e550e439f8d2112ebc92a77a4a6d8536 Mon Sep 17 00:00:00 2001 From: Seena Fallah Date: Tue, 23 Jul 2024 20:47:22 +0200 Subject: [PATCH 2/2] BucketPolicy: decouple encryption tests from invalid algo and unencrypted Signed-off-by: Seena Fallah --- s3tests_boto3/functional/test_s3.py | 95 ++++++++++++++++++----------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 306bfca8..cc7283c6 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -10084,12 +10084,6 @@ def test_encryption_sse_c_enforced_with_bucket_policy(): bucket_name = get_new_bucket() client = get_client() - deny_incorrect_algo = { - "StringNotEquals": { - "s3:x-amz-server-side-encryption-customer-algorithm": "AES256" - } - } - deny_unencrypted_obj = { "Null" : { "s3:x-amz-server-side-encryption-customer-algorithm": "true" @@ -10099,9 +10093,8 @@ def test_encryption_sse_c_enforced_with_bucket_policy(): p = Policy() resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) - s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) - s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) - policy_document = p.add_statement(s1).add_statement(s2).to_json() + s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) + policy_document = p.add_statement(s).to_json() client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) @@ -10115,6 +10108,36 @@ def test_encryption_sse_c_enforced_with_bucket_policy(): ) +@pytest.mark.encryption +@pytest.mark.fails_on_dbstore +def test_encryption_sse_c_deny_algo_with_bucket_policy(): + bucket_name = get_new_bucket() + client = get_client() + + deny_incorrect_algo = { + "StringNotEquals": { + "s3:x-amz-server-side-encryption-customer-algorithm": "AES256" + } + } + + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) + + s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) + policy_document = p.add_statement(s).to_json() + + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + + check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar', SSECustomerAlgorithm='AES192') + + client.put_object( + Bucket=bucket_name, Key='foo', Body='bar', + SSECustomerAlgorithm='AES256', + SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=', + SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw==' + ) + + @pytest.mark.encryption @pytest.mark.fails_on_dbstore def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'): @@ -11701,12 +11724,6 @@ def test_bucket_policy_put_obj_s3_noenc(): bucket_name = get_new_bucket() client = get_v2_client() - deny_incorrect_algo = { - "StringNotEquals": { - "s3:x-amz-server-side-encryption": "AES256" - } - } - deny_unencrypted_obj = { "Null" : { "s3:x-amz-server-side-encryption": "true" @@ -11716,36 +11733,46 @@ def test_bucket_policy_put_obj_s3_noenc(): p = Policy() resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) - s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) - s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) - policy_document = p.add_statement(s1).add_statement(s2).to_json() - - # boto3.set_stream_logger(name='botocore') + s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj) + policy_document = p.add_statement(s).to_json() client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) key1_str ='testobj' - #response = client.get_bucket_policy(Bucket=bucket_name) - #print response + check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str) + + response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256') + assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256' + + +@pytest.mark.encryption +@pytest.mark.bucket_policy +@pytest.mark.sse_s3 +@pytest.mark.fails_on_dbstore +def test_bucket_policy_put_obj_s3_incorrect_algo_sse_s3(): + bucket_name = get_new_bucket() + client = get_v2_client() + + deny_incorrect_algo = { + "StringNotEquals": { + "s3:x-amz-server-side-encryption": "AES256" + } + } + p = Policy() + resource = _make_arn_resource("{}/{}".format(bucket_name, "*")) - # doing this here breaks the next request w/ 400 (non-sse bug). Do it last. - #check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str) + s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo) + policy_document = p.add_statement(s).to_json() - #TODO: why is this a 400 and not passing, it appears boto3 is not parsing the 200 response the rgw sends back properly - # DEBUGGING: run the boto2 and compare the requests - # DEBUGGING: try to run this with v2 auth (figure out why get_v2_client isn't working) to make the requests similar to what boto2 is doing - # DEBUGGING: try to add other options to put_object to see if that makes the response better + client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document) + key1_str ='testobj' + + check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str, ServerSideEncryption='AES192') - # first validate that writing a sse-s3 object works response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256') - response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256' - # then validate that a non-encrypted object fails. - # (this also breaks the connection--non-sse bug, probably because the server - # errors out before it consumes the data...) - check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str) @pytest.mark.encryption @pytest.mark.bucket_policy