From c9aded48e550e439f8d2112ebc92a77a4a6d8536 Mon Sep 17 00:00:00 2001
From: Seena Fallah <seenafallah@gmail.com>
Date: Tue, 23 Jul 2024 20:47:22 +0200
Subject: [PATCH] BucketPolicy: decouple encryption tests from invalid algo and
 unencrypted

Signed-off-by: Seena Fallah <seenafallah@gmail.com>
---
 s3tests_boto3/functional/test_s3.py | 95 ++++++++++++++++++-----------
 1 file changed, 61 insertions(+), 34 deletions(-)

diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py
index 306bfca8..cc7283c6 100644
--- a/s3tests_boto3/functional/test_s3.py
+++ b/s3tests_boto3/functional/test_s3.py
@@ -10084,12 +10084,6 @@ def test_encryption_sse_c_enforced_with_bucket_policy():
     bucket_name = get_new_bucket()
     client = get_client()
 
-    deny_incorrect_algo = {
-        "StringNotEquals": {
-          "s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
-        }
-    }
-
     deny_unencrypted_obj = {
         "Null" : {
           "s3:x-amz-server-side-encryption-customer-algorithm": "true"
@@ -10099,9 +10093,8 @@ def test_encryption_sse_c_enforced_with_bucket_policy():
     p = Policy()
     resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
 
-    s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
-    s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
-    policy_document = p.add_statement(s1).add_statement(s2).to_json()
+    s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+    policy_document = p.add_statement(s).to_json()
 
     client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
 
@@ -10115,6 +10108,36 @@ def test_encryption_sse_c_enforced_with_bucket_policy():
     )
 
 
+@pytest.mark.encryption
+@pytest.mark.fails_on_dbstore
+def test_encryption_sse_c_deny_algo_with_bucket_policy():
+    bucket_name = get_new_bucket()
+    client = get_client()
+
+    deny_incorrect_algo = {
+        "StringNotEquals": {
+          "s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
+        }
+    }
+
+    p = Policy()
+    resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
+
+    s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+    policy_document = p.add_statement(s).to_json()
+
+    client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+
+    check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar', SSECustomerAlgorithm='AES192')
+
+    client.put_object(
+        Bucket=bucket_name, Key='foo', Body='bar',
+        SSECustomerAlgorithm='AES256',
+        SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
+        SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
+    )
+
+
 @pytest.mark.encryption
 @pytest.mark.fails_on_dbstore
 def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
@@ -11701,12 +11724,6 @@ def test_bucket_policy_put_obj_s3_noenc():
     bucket_name = get_new_bucket()
     client = get_v2_client()
 
-    deny_incorrect_algo = {
-        "StringNotEquals": {
-          "s3:x-amz-server-side-encryption": "AES256"
-        }
-    }
-
     deny_unencrypted_obj = {
         "Null" : {
           "s3:x-amz-server-side-encryption": "true"
@@ -11716,36 +11733,46 @@ def test_bucket_policy_put_obj_s3_noenc():
     p = Policy()
     resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
 
-    s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
-    s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
-    policy_document = p.add_statement(s1).add_statement(s2).to_json()
-
-    # boto3.set_stream_logger(name='botocore')
+    s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
+    policy_document = p.add_statement(s).to_json()
 
     client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
     key1_str ='testobj'
 
-    #response = client.get_bucket_policy(Bucket=bucket_name)
-    #print response
+    check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+
+    response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
+    assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'
+
+
+@pytest.mark.encryption
+@pytest.mark.bucket_policy
+@pytest.mark.sse_s3
+@pytest.mark.fails_on_dbstore
+def test_bucket_policy_put_obj_s3_incorrect_algo_sse_s3():
+    bucket_name = get_new_bucket()
+    client = get_v2_client()
+
+    deny_incorrect_algo = {
+        "StringNotEquals": {
+          "s3:x-amz-server-side-encryption": "AES256"
+        }
+    }
 
+    p = Policy()
+    resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))
 
-    # doing this here breaks the next request w/ 400 (non-sse bug).  Do it last.
-    #check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
+    s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
+    policy_document = p.add_statement(s).to_json()
 
-    #TODO: why is this a 400 and not passing, it appears boto3 is not parsing the 200 response the rgw sends back properly
-    # DEBUGGING: run the boto2 and compare the requests
-    # DEBUGGING: try to run this with v2 auth (figure out why get_v2_client isn't working) to make the requests similar to what boto2 is doing
-    # DEBUGGING: try to add other options to put_object to see if that makes the response better
+    client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
+    key1_str ='testobj'
+
+    check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str, ServerSideEncryption='AES192')
 
-    # first validate that writing a sse-s3 object works
     response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
-    response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption']
     assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'
 
-    # then validate that a non-encrypted object fails.
-    # (this also breaks the connection--non-sse bug, probably because the server
-    #  errors out before it consumes the data...)
-    check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)
 
 @pytest.mark.encryption
 @pytest.mark.bucket_policy