From 29484faac9cd75d758bfa082ab31560d10017790 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Thu, 8 Aug 2024 18:00:01 +0000 Subject: [PATCH] rgw/logging: basic set of tests for bucket logging Signed-off-by: Yuval Lifshitz --- README.rst | 14 + pytest.ini | 1 + s3tests_boto3/functional/test_s3.py | 759 ++++++++++++++++++++++++++++ 3 files changed, 774 insertions(+) diff --git a/README.rst b/README.rst index e34c7a11..283b7e87 100644 --- a/README.rst +++ b/README.rst @@ -100,3 +100,17 @@ You can filter tests based on their attributes:: S3TEST_CONF=your.conf tox -- s3tests_boto3/functional/test_iam.py -m 'not fails_on_rgw' +======================== + Bucket logging tests +======================== + +Ceph has extensions for the bucket logging S3 API. For the tests to cover these extensions, the following file: `examples/rgw/boto3/service-2.sdk-extras.json` from the Ceph repo, +should be copied to the: `~/.aws/models/s3/2006-03-01/` directory on the machine where the tests are run. +If the file is not present, the tests will still run, but the extension tests will be skipped. In this case, the bucket logging object roll time must be decreased manually from its default of +300 seconds to 5 seconds:: + + vstart.sh -o rgw_bucket_log_object_roll_time=5 + +Then the tests can be run with:: + + S3TEST_CONF=your.conf tox -- -m 'bucket_logging' diff --git a/pytest.ini b/pytest.ini index 1a7d9a83..4aafd658 100644 --- a/pytest.ini +++ b/pytest.ini @@ -7,6 +7,7 @@ markers = auth_common bucket_policy bucket_encryption + bucket_logging checksum cloud_transition encryption diff --git a/s3tests_boto3/functional/test_s3.py b/s3tests_boto3/functional/test_s3.py index 803279f3..65ec35de 100644 --- a/s3tests_boto3/functional/test_s3.py +++ b/s3tests_boto3/functional/test_s3.py @@ -13759,3 +13759,762 @@ def test_post_object_upload_checksum(): r = requests.post(url, files=payload, verify=get_config_ssl_verify()) assert r.status_code == 400 + + +def _has_bucket_logging_extension(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', "RecordType": "Standard"} + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + except ParamValidationError as e: + return False + return True + + +@pytest.mark.bucket_logging +def test_put_bucket_logging(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/' + } + + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + # default value for key prefix is returned + logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}} + if has_extensions: + logging_enabled['RecordType'] = 'Standard' + logging_enabled['EventType'] = 'ReadWrite' + logging_enabled['RecordsBatchSize'] = 0 + assert response['LoggingEnabled'] == logging_enabled + + # with simple target object prefix + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'SimplePrefix': {} + } + } + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['RecordType'] = 'Standard' + logging_enabled['EventType'] = 'ReadWrite' + logging_enabled['RecordsBatchSize'] = 0 + assert response['LoggingEnabled'] == logging_enabled + + # with partitioned target object prefix + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'PartitionedPrefix': { + 'PartitionDateSource': 'DeliveryTime' + } + } + } + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + if has_extensions: + logging_enabled['RecordType'] = 'Standard' + logging_enabled['EventType'] = 'ReadWrite' + logging_enabled['RecordsBatchSize'] = 0 + assert response['LoggingEnabled'] == logging_enabled + + +@pytest.mark.bucket_logging +def test_put_bucket_logging_errors(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name1 = get_new_bucket_name() + log_bucket1 = get_new_bucket_resource(name=log_bucket_name1) + client = get_client() + + # invalid source bucket + try: + response = client.put_bucket_logging(Bucket=src_bucket_name+'kaboom', BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchBucket' + + # invalid log bucket + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1+'kaboom', 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'NoSuchKey' + + # log bucket has bucket logging + log_bucket_name2 = get_new_bucket_name() + log_bucket2 = get_new_bucket_resource(name=log_bucket_name2) + response = client.put_bucket_logging(Bucket=log_bucket_name2, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name1, 'TargetPrefix': 'log/'}, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': {'TargetBucket': log_bucket_name2, 'TargetPrefix': 'log/'}, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'InvalidArgument' + + # invalid partition prefix + logging_enabled = { + 'TargetBucket': log_bucket_name1, + 'TargetPrefix': 'log/', + 'TargetObjectKeyFormat': { + 'PartitionedPrefix': { + 'PartitionDateSource': 'kaboom' + } + } + } + try: + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert False, 'expected failure' + except ClientError as e: + assert e.response['Error']['Code'] == 'MalformedXML' + + # TODO: log bucket is encrypted + #_put_bucket_encryption_s3(client, log_bucket_name) + #try: + # response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + # 'LoggingEnabled': {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'}, + # }) + # assert False, 'expected failure' + #except ClientError as e: + # assert e.response['Error']['Code'] == 'InvalidArgument' + + +@pytest.mark.bucket_logging +def test_rm_bucket_logging(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={}) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + assert not 'LoggingEnabled' in response + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_put_bucket_logging_extensions(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = {'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'EventType': 'Write', + 'RecordType': 'Standard', + 'ObjectRollTime': 5, + 'RecordsBatchSize': 0 + } + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + response = client.get_bucket_logging(Bucket=src_bucket_name) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + logging_enabled['TargetObjectKeyFormat'] = {'SimplePrefix': {}} + assert response['LoggingEnabled'] == logging_enabled + + +def _verify_records(records, bucket_name, event_type, src_keys): + keys_found = [] + for record in iter(records.splitlines()): + print('bucket log record:', record) + if bucket_name in record and event_type in record: + for key in src_keys: + if key in record: + keys_found.append(key) + break + print('keys found in bucket log:', keys_found) + print('keys from the source bucket:', src_keys) + return len(keys_found) == len(src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_put_objects(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_delete_objects(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + client.delete_object(Bucket=src_bucket_name, Key=key) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.DELETE.delete_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_get_objects(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + client.get_object(Bucket=src_bucket_name, Key=key) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.GET.get_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_copy_objects(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + client.copy_object(Bucket=src_bucket_name, Key='copy_of_'+key, CopySource={'Bucket': src_bucket_name, 'Key': key}) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key= keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.copy_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_head_objects(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + for key in src_keys: + client.head_object(Bucket=src_bucket_name, Key=key) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.HEAD.get_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_mpu(): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + src_key = "myobject" + objlen = 30 * 1024 * 1024 + (upload_id, data, parts) = _multipart_upload(bucket_name=src_bucket_name, key=src_key, size=objlen) + client.complete_multipart_upload(Bucket=src_bucket_name, Key=src_key, UploadId=upload_id, MultipartUpload={'Parts': parts}) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.POST.complete_multipart', [src_key]) + + +def _bucket_logging_event_type(event_type): + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'ObjectRollTime': 5, + 'EventType': event_type + } + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + client.head_object(Bucket=src_bucket_name, Key=name) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + client.head_object(Bucket=src_bucket_name, Key='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + if event_type == 'Write': + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + assert _verify_records(body, src_bucket_name, 'REST.HEAD.get_obj', src_keys) == False + elif event_type == 'Read': + assert _verify_records(body, src_bucket_name, 'REST.HEAD.get_obj', src_keys) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) == False + elif event_type == 'ReadWrite': + assert _verify_records(body, src_bucket_name, 'REST.HEAD.get_obj', src_keys) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + else: + assert False, 'invalid event type:'+event_type + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_event_type_r(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_event_type('Read') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_event_type_w(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_event_type('Write') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_event_type_rw(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + _bucket_logging_event_type('ReadWrite') + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_record_type(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + logging_enabled = { + 'TargetBucket': log_bucket_name, + 'TargetPrefix': 'log/', + 'ObjectRollTime': 5, + 'RecordType': 'Short'} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(5) + client.put_object(Bucket=src_bucket_name, Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + # TODO: verify short record type + + +@pytest.mark.bucket_logging +@pytest.mark.fails_on_aws +def test_bucket_logging_roll_time(): + if not _has_bucket_logging_extension(): + pytest.skip('ceph extension to bucket logging not supported at client') + src_bucket_name = get_new_bucket_name() + src_bucket = get_new_bucket_resource(name=src_bucket_name) + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + + roll_time = 10 + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/', 'ObjectRollTime': roll_time} + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + + num_keys = 5 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(roll_time/2) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body='myobject') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 0 + + time.sleep(roll_time/2) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body='myobject') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + len(keys) == 1 + + key = keys[0] + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + client.delete_object(Bucket=log_bucket_name, Key=key) + + num_keys = 25 + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + time.sleep(1) + + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + + time.sleep(roll_time) + client.put_object(Bucket=src_bucket_name, Key='myobject', Body='myobject') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) > 1 + + body = '' + for key in keys: + assert key.startswith('log/') + response = client.get_object(Bucket=log_bucket_name, Key=key) + body += _get_body(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + + +@pytest.mark.bucket_logging +def test_bucket_logging_multiple_prefixes(): + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_buckets = 5 + buckets = [] + bucket_name_prefix = get_new_bucket_name() + for j in range(num_buckets): + src_bucket_name = bucket_name_prefix+str(j) + src_bucket = get_new_bucket_resource(name=src_bucket_name) + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': src_bucket_name+'/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + buckets.append(src_bucket_name) + + num_keys = 5 + for src_bucket_name in buckets: + for j in range(num_keys): + name = 'myobject'+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + time.sleep(5) + for src_bucket_name in buckets: + client.head_object(Bucket=src_bucket_name, Key='myobject0') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) >= num_buckets + + for key in keys: + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + found = False + for src_bucket_name in buckets: + if key.startswith(src_bucket_name): + found = True + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + assert _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + assert found + + +@pytest.mark.bucket_logging +def test_bucket_logging_single_prefix(): + log_bucket_name = get_new_bucket_name() + log_bucket = get_new_bucket_resource(name=log_bucket_name) + client = get_client() + has_extensions = _has_bucket_logging_extension() + + num_buckets = 5 + buckets = [] + bucket_name_prefix = get_new_bucket_name() + for j in range(num_buckets): + src_bucket_name = bucket_name_prefix+str(j) + src_bucket = get_new_bucket_resource(name=src_bucket_name) + # minimal configuration + logging_enabled = {'TargetBucket': log_bucket_name, 'TargetPrefix': 'log/'} + if has_extensions: + logging_enabled['ObjectRollTime'] = 5 + response = client.put_bucket_logging(Bucket=src_bucket_name, BucketLoggingStatus={ + 'LoggingEnabled': logging_enabled, + }) + assert response['ResponseMetadata']['HTTPStatusCode'] == 200 + buckets.append(src_bucket_name) + + num_keys = 5 + bucket_ind = 0 + for src_bucket_name in buckets: + bucket_ind += 1 + for j in range(num_keys): + name = 'myobject'+str(bucket_ind)+str(j) + client.put_object(Bucket=src_bucket_name, Key=name, Body=name) + + time.sleep(5) + client.put_object(Bucket=buckets[0], Key='dummy', Body='dummy') + + response = client.list_objects_v2(Bucket=log_bucket_name) + keys = _get_keys(response) + assert len(keys) == 1 + + key = keys[0] + response = client.get_object(Bucket=log_bucket_name, Key=key) + body = _get_body(response) + found = False + for src_bucket_name in buckets: + response = client.list_objects_v2(Bucket=src_bucket_name) + src_keys = _get_keys(response) + found = _verify_records(body, src_bucket_name, 'REST.PUT.put_obj', src_keys) + assert found