Skip to content

Commit

Permalink
Merge pull request #574 from clwluvw/sse-c-policy
Browse files Browse the repository at this point in the history
BucketPolicy: add test for sse-c in conditions
  • Loading branch information
cbodley authored Jul 26, 2024
2 parents 82fedef + c9aded4 commit 218f900
Showing 1 changed file with 90 additions and 29 deletions.
119 changes: 90 additions & 29 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10077,6 +10077,67 @@ def test_encryption_sse_c_post_object_authenticated_request():
body = _get_body(response)
assert body == 'bar'


@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_enforced_with_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()

deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption-customer-algorithm": "true"
}
}

p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))

s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s).to_json()

client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)

check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar')

client.put_object(
Bucket=bucket_name, Key='foo', Body='bar',
SSECustomerAlgorithm='AES256',
SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
)


@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_encryption_sse_c_deny_algo_with_bucket_policy():
bucket_name = get_new_bucket()
client = get_client()

deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption-customer-algorithm": "AES256"
}
}

p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))

s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
policy_document = p.add_statement(s).to_json()

client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)

check_access_denied(client.put_object, Bucket=bucket_name, Key='foo', Body='bar', SSECustomerAlgorithm='AES192')

client.put_object(
Bucket=bucket_name, Key='foo', Body='bar',
SSECustomerAlgorithm='AES256',
SSECustomerKey='pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
SSECustomerKeyMD5='DWygnHRtgiJ77HCm+1rvHw=='
)


@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
Expand All @@ -10102,10 +10163,6 @@ def _test_sse_kms_customer_write(file_size, key_id = 'testkey-1'):
assert body == data






@pytest.mark.encryption
@pytest.mark.fails_on_dbstore
def test_sse_kms_method_head():
Expand Down Expand Up @@ -11667,12 +11724,6 @@ def test_bucket_policy_put_obj_s3_noenc():
bucket_name = get_new_bucket()
client = get_v2_client()

deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}

deny_unencrypted_obj = {
"Null" : {
"s3:x-amz-server-side-encryption": "true"
Expand All @@ -11682,36 +11733,46 @@ def test_bucket_policy_put_obj_s3_noenc():
p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))

s1 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
s2 = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s1).add_statement(s2).to_json()

# boto3.set_stream_logger(name='botocore')
s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_unencrypted_obj)
policy_document = p.add_statement(s).to_json()

client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
key1_str ='testobj'

#response = client.get_bucket_policy(Bucket=bucket_name)
#print response
check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)

response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'

# doing this here breaks the next request w/ 400 (non-sse bug). Do it last.
#check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)

#TODO: why is this a 400 and not passing, it appears boto3 is not parsing the 200 response the rgw sends back properly
# DEBUGGING: run the boto2 and compare the requests
# DEBUGGING: try to run this with v2 auth (figure out why get_v2_client isn't working) to make the requests similar to what boto2 is doing
# DEBUGGING: try to add other options to put_object to see if that makes the response better
@pytest.mark.encryption
@pytest.mark.bucket_policy
@pytest.mark.sse_s3
@pytest.mark.fails_on_dbstore
def test_bucket_policy_put_obj_s3_incorrect_algo_sse_s3():
bucket_name = get_new_bucket()
client = get_v2_client()

deny_incorrect_algo = {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "AES256"
}
}

p = Policy()
resource = _make_arn_resource("{}/{}".format(bucket_name, "*"))

s = Statement("s3:PutObject", resource, effect="Deny", condition=deny_incorrect_algo)
policy_document = p.add_statement(s).to_json()

client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)
key1_str ='testobj'

check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str, ServerSideEncryption='AES192')

# first validate that writing a sse-s3 object works
response = client.put_object(Bucket=bucket_name, Key=key1_str, ServerSideEncryption='AES256')
response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption']
assert response['ResponseMetadata']['HTTPHeaders']['x-amz-server-side-encryption'] == 'AES256'

# then validate that a non-encrypted object fails.
# (this also breaks the connection--non-sse bug, probably because the server
# errors out before it consumes the data...)
check_access_denied(client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)

@pytest.mark.encryption
@pytest.mark.bucket_policy
Expand Down

0 comments on commit 218f900

Please sign in to comment.