diff --git a/test/rest_request_template/iceberg_avro_aws.json b/test/rest_request_template/iceberg_avro_aws.json new file mode 100644 index 000000000..150e3d151 --- /dev/null +++ b/test/rest_request_template/iceberg_avro_aws.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC", + "tasks.max": "1", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter":"io.confluent.connect.avro.AvroConverter", + "value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.streaming.iceberg.enabled": true, + "snowflake.streaming.max.client.lag": "1", + "snowflake.streaming.enable.single.buffer": "true" + } +} \ No newline at end of file diff --git a/test/rest_request_template/iceberg_json_aws.json b/test/rest_request_template/iceberg_json_aws.json index d4b19c67f..873acbdf0 100644 --- a/test/rest_request_template/iceberg_json_aws.json +++ b/test/rest_request_template/iceberg_json_aws.json @@ -4,8 +4,6 @@ "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", "topics": "SNOWFLAKE_TEST_TOPIC", "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", @@ -14,9 +12,6 @@ "snowflake.schema.name": "SNOWFLAKE_SCHEMA", "snowflake.role.name": "SNOWFLAKE_ROLE", "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "snowflake.streaming.enable.single.buffer": true, - "snowflake.streaming.closeChannelsInParallel.enabled": true, - "snowflake.enable.schematization": "false", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", @@ -25,6 +20,8 @@ "errors.log.enable": true, "errors.deadletterqueue.topic.name": "DLQ_TOPIC", "errors.deadletterqueue.topic.replication.factor": 1, - "snowflake.streaming.iceberg.enabled": true + "snowflake.streaming.iceberg.enabled": true, + "snowflake.streaming.max.client.lag": "1", + "snowflake.streaming.enable.single.buffer": "true" } } \ No newline at end of file diff --git a/test/test_suit/assertions.py b/test/test_suit/assertions.py new file mode 100644 index 000000000..c5f212bac --- /dev/null +++ b/test/test_suit/assertions.py @@ -0,0 +1,38 @@ +from test_suit.test_utils import NonRetryableError + + +def assert_equals(expected, actual): + if expected != actual: + raise NonRetryableError( + "Actual {} does not equal expected {}".format(actual, expected) + ) + + +def assert_equals_with_precision(expected, actual, precision=0.1): + if not expected - precision < actual < expected + precision: + raise NonRetryableError( + "Actual {} does not equal expected {} with precision {}".format( + actual, expected, precision + ) + ) + + +def assert_starts_with(expected_prefix, actual): + if not actual.startswith(expected_prefix): + raise NonRetryableError( + "Actual {} does not start with {}".format(expected_prefix, actual) + ) + + +def assert_not_null(actual): + if actual is None: + raise NonRetryableError("Actual {} is null".format(actual)) + + +def assert_dict_contains(expected_key, expected_value, actual_dict): + if actual_dict[expected_key] != expected_value: + raise NonRetryableError( + "Actual value from dict {} does not equal expected {}".format( + actual_dict[expected_key], expected_value + ) + ) diff --git a/test/test_suit/base_iceberg_test.py b/test/test_suit/base_iceberg_test.py new file mode 100644 index 000000000..0e9360c1d --- /dev/null +++ b/test/test_suit/base_iceberg_test.py @@ -0,0 +1,38 @@ +from test_suit.base_e2e import BaseE2eTest +from test_suit.assertions import * + +class BaseIcebergTest(BaseE2eTest): + + def __init__(self, driver, nameSalt): + self.driver = driver + self.test_message = { + "id": 1, + "body_temperature": 36.6, + "name": "Steve", + "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"], + "animals_possessed": {"dogs": True, "cats": False}, + } + self.test_headers = [("header1", "value1")] + + def verify_iceberg_content(self, content: dict): + assert_equals(1, content['id']) + assert_equals_with_precision(36.6, content['body_temperature']) + assert_equals('Steve', content['name']) + + assert_equals('Espresso', content['approved_coffee_types'][0]) + assert_equals('Doppio', content['approved_coffee_types'][1]) + assert_equals('Ristretto', content['approved_coffee_types'][2]) + assert_equals('Lungo', content['approved_coffee_types'][3]) + + assert_equals(True, content['animals_possessed']['dogs']) + assert_equals(False, content['animals_possessed']['cats']) + + + def verify_iceberg_metadata(self, metadata: dict): + assert_equals(0, metadata['offset']) + assert_equals(0, metadata['partition']) + assert_starts_with('iceberg_', metadata['topic']) + assert_not_null(metadata['SnowflakeConnectorPushTime']) + + assert_dict_contains('header1', 'value1', metadata['headers']) + diff --git a/test/test_suit/iceberg_avro_aws.py b/test/test_suit/iceberg_avro_aws.py new file mode 100644 index 000000000..add8c9502 --- /dev/null +++ b/test/test_suit/iceberg_avro_aws.py @@ -0,0 +1,90 @@ +from test_suit.test_utils import RetryableError, NonRetryableError +import json +from confluent_kafka import avro +from test_suit.base_iceberg_test import BaseIcebergTest + + +class TestIcebergAvroAws(BaseIcebergTest): + def __init__(self, driver, nameSalt: str): + BaseIcebergTest.__init__(self, driver, nameSalt) + self.fileName = "iceberg_avro_aws" + self.topic = self.fileName + nameSalt + + valueSchemaStr = """ + { + "type":"record", + "name":"value_schema", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "body_temperature", + "type": "float" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "approved_coffee_types", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "animals_possessed", + "type": { + "type": "map", + "values": "boolean" + } + } + ] + } + """ + self.valueSchema = avro.loads(valueSchemaStr) + + def getConfigFileName(self): + return self.fileName + ".json" + + def setup(self): + self.driver.create_iceberg_table_with_content( + table_name=self.topic, + external_volume="kafka_push_e2e_volume_aws", # volume created manually + ) + + def send(self): + value = [] + + for e in range(100): + value.append(self.test_message) + + self.driver.sendAvroSRData( + topic=self.topic, + value=value, + value_schema=self.valueSchema, + headers=self.test_headers, + ) + + def verify(self, round): + number_of_records = self.driver.select_number_of_records(self.topic) + if number_of_records == 0: + raise RetryableError() + elif number_of_records != 100: + raise NonRetryableError( + "Number of record in table is different from number of record sent" + ) + + first_record = ( + self.driver.snowflake_conn.cursor() + .execute("Select * from {} limit 1".format(self.topic)) + .fetchone() + ) + + self.verify_iceberg_content(json.loads(first_record[0])) + self.verify_iceberg_metadata(json.loads(first_record[1])) + + def clean(self): + self.driver.drop_iceberg_table(self.topic) diff --git a/test/test_suit/iceberg_json_aws.py b/test/test_suit/iceberg_json_aws.py index 004b2b941..58f7faf28 100644 --- a/test/test_suit/iceberg_json_aws.py +++ b/test/test_suit/iceberg_json_aws.py @@ -1,14 +1,11 @@ -import datetime - from test_suit.test_utils import RetryableError, NonRetryableError import json -from time import sleep -from test_suit.base_e2e import BaseE2eTest +from test_suit.base_iceberg_test import BaseIcebergTest -class TestIcebergJsonAws(BaseE2eTest): +class TestIcebergJsonAws(BaseIcebergTest): def __init__(self, driver, nameSalt: str): - self.driver = driver + BaseIcebergTest.__init__(self, driver, nameSalt) self.fileName = "iceberg_json_aws" self.topic = self.fileName + nameSalt @@ -22,20 +19,39 @@ def setup(self): ) def send(self): - msg = json.dumps( - { - "id": 1, - "body_temperature": 36.6, - "name": "Steve", - "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"], - "animals_possessed": {"dogs": True, "cats": False}, - } + msg = json.dumps(self.test_message) + + key = [] + value = [] + for e in range(100): + key.append(json.dumps({"number": str(e)}).encode("utf-8")) + value.append(msg.encode("utf-8")) + + self.driver.sendBytesData( + topic=self.topic, + value=value, + key=key, + partition=0, + headers=self.test_headers, ) def verify(self, round): - res = self.driver.select_number_of_records(self.topic) - print("Count records in table {}={}".format(self.topic, str(res))) + number_of_records = self.driver.select_number_of_records(self.topic) + if number_of_records == 0: + raise RetryableError() + elif number_of_records != 100: + raise NonRetryableError( + "Number of record in table is different from number of record sent" + ) + + first_record = ( + self.driver.snowflake_conn.cursor() + .execute("Select * from {} limit 1".format(self.topic)) + .fetchone() + ) + + self.verify_iceberg_content(json.loads(first_record[0])) + self.verify_iceberg_metadata(json.loads(first_record[1])) def clean(self): self.driver.drop_iceberg_table(self.topic) - return diff --git a/test/test_suit/test_utils.py b/test/test_suit/test_utils.py index b44b2d970..94e0a2036 100644 --- a/test/test_suit/test_utils.py +++ b/test/test_suit/test_utils.py @@ -1,7 +1,6 @@ import re from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization -from test_suit.base_e2e import BaseE2eTest class Error(Exception): diff --git a/test/test_suites.py b/test/test_suites.py index 60e0e14bd..568fd18af 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -40,6 +40,7 @@ TestSchemaEvolutionAvroSRLogicalTypes, ) from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable +from test_suit.iceberg_avro_aws import TestIcebergAvroAws from test_suit.iceberg_json_aws import TestIcebergJsonAws from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import ( @@ -621,8 +622,17 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS "TestIcebergJsonAws", EndToEndTestSuite( test_instance=TestIcebergJsonAws(driver, nameSalt), - run_in_confluent=True, - run_in_apache=True, + run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release + cloud_platform=CloudPlatform.AWS, + ), + ), + ( + "TestIcebergAvroAws", + EndToEndTestSuite( + test_instance=TestIcebergAvroAws(driver, nameSalt), + run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_apache=False, cloud_platform=CloudPlatform.AWS, ), ), diff --git a/test/test_verify.py b/test/test_verify.py index 63fe639ba..892c74ad2 100755 --- a/test/test_verify.py +++ b/test/test_verify.py @@ -205,17 +205,17 @@ def sendBytesData(self, topic, value, key=[], partition=0, headers=[]): self.producer.flush() self.producer.flush() - def sendAvroSRData(self, topic, value, value_schema, key=[], key_schema="", partition=0): + def sendAvroSRData(self, topic, value, value_schema, key=[], key_schema="", partition=0, headers=[]): if len(key) == 0: for i, v in enumerate(value): self.avroProducer.produce( - topic=topic, value=v, value_schema=value_schema, partition=partition) + topic=topic, value=v, value_schema=value_schema, partition=partition, headers=headers) if (i + 1) % self.MAX_FLUSH_BUFFER_SIZE == 0: self.producer.flush() else: for i, (k, v) in enumerate(zip(key, value)): self.avroProducer.produce( - topic=topic, value=v, value_schema=value_schema, key=k, key_schema=key_schema, partition=partition) + topic=topic, value=v, value_schema=value_schema, key=k, key_schema=key_schema, partition=partition, headers=headers) if (i + 1) % self.MAX_FLUSH_BUFFER_SIZE == 0: self.producer.flush() self.avroProducer.flush()