diff --git a/test/test_suit/base_iceberg_test.py b/test/test_suit/base_iceberg_test.py index 713c6b4f9..b9ab7dafc 100644 --- a/test/test_suit/base_iceberg_test.py +++ b/test/test_suit/base_iceberg_test.py @@ -2,6 +2,7 @@ from test_suit.assertions import * from test_suit.test_utils import RetryableError, NonRetryableError import json +from confluent_kafka import avro class BaseIcebergTest(BaseE2eTest): @@ -49,33 +50,186 @@ def __init__(self, driver, name_salt: str, config_file_name: str): "type":"record", "name":"value_schema", "fields": [ + { + "name": "id", + "type": [ + "null", + "int" + ] + }, + { + "name": "body_temperature", + "type": [ + "null", + "float" + ] + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + }, + { + "name": "approved_coffee_types", + "type": [ + "null", { - "name": "id", - "type": "int" - }, + "type": "array", + "items": "string" + } + ] + }, + { + "name": "animals_possessed", + "type": [ + "null", + { + "type": "map", + "values": "boolean" + } + ] + } + ] + } + """ + + self.test_message_for_schema_evolution_schema = """ + { + "type": "record", + "name": "value_schema", + "fields": [ + { + "name": "id", + "type": [ + "null", + "int" + ] + }, + { + "name": "body_temperature", + "type": [ + "null", + "float" + ] + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + }, + { + "name": "approved_coffee_types", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ] + }, + { + "name": "animals_possessed", + "type": [ + "null", { - "name": "body_temperature", - "type": "float" - }, + "type": "map", + "values": "boolean" + } + ] + }, + { + "name": "null_long", + "default": null, + "type": [ + "null", + "long" + ] + }, + { + "name": "null_array", + "default": null, + "type": [ + "null", { - "name": "name", - "type": "string" - }, + "type": "array", + "items": "int" + } + ] + }, + { + "name": "null_object", + "default": null, + "type": [ + "null", + { + "type": "record", + "name": "null_object_record", + "fields": [ + { + "name": "key", + "type": "string" + } + ] + } + ] + }, + { + "name": "empty_array", + "default": null, + "type": [ + "null", { - "name": "approved_coffee_types", - "type": { - "type": "array", - "items": "string" - } - }, + "type": "array", + "items": "int" + } + ] + }, + { + "name": "some_object", + "default": null, + "type": [ + "null", { - "name": "animals_possessed", - "type": { - "type": "map", - "values": "boolean" - } + "type": "record", + "name": "some_object_record", + "fields": [ + { + "name": "null_key", + "type": [ + "null", + "string" + ] + }, + { + "name": "string_key", + "type": "string" + }, + { + "name": "another_string_key", + "type": [ + "null", + "string" + ] + }, + { + "name": "inner_object", + "type": [ + "null", + { + "type": "map", + "values": "int" + } + ] + } + ] } - ] + ] + } + ] } """ @@ -128,6 +282,39 @@ def _verify_iceberg_content_from_docs(self, content: dict): assert_equals(False, content['animals_possessed']['cats']) + def _verify_iceberg_content_for_schema_evolution_1(self, content: dict): + assert_equals(None, content['id']) + assert_equals(None, content['body_temperature']) + assert_equals(None, content['name']) + assert_equals(None, content['approved_coffee_types']) + assert_equals(None, content['animals_possessed']) + + assert_equals(None, content['null_long']) + assert_equals(None, content['null_array']) + assert_equals(None, content['null_object']) + assert_equals([], content['empty_array']) + assert_equals(None, content['some_object']['null_key']) + assert_equals('string_key', content['some_object']['string_key']) + + + def _verify_iceberg_content_for_schema_evolution_2(self, content: dict): + assert_equals(None, content['id']) + assert_equals(None, content['body_temperature']) + assert_equals(None, content['name']) + assert_equals(None, content['approved_coffee_types']) + assert_equals(None, content['animals_possessed']) + + assert_equals(2137, content['null_long']) + assert_equals([1, 2, 3], content['null_array']) + assert_equals('value', content['null_object']['key']) + assert_equals([1, 2, 3], content['empty_array']) + assert_equals(None, content['some_object']['null_key']) + assert_equals('string_key', content['some_object']['string_key']) + assert_equals('another_string_key', content['some_object']['another_string_key']) + assert_equals(456, content['some_object']['inner_object']['inner_object_key']) + + + def _verify_iceberg_metadata(self, metadata: dict): assert_equals(0, metadata['offset']) assert_equals(0, metadata['partition']) @@ -161,3 +348,12 @@ def _select_schematized_record_with_offset(self, offset: int) -> dict: def __none_or_json_loads(self, value: str) -> dict: return None if value is None else json.loads(value) + + def _send_avro_messages(self, message: str, schema: str): + self.driver.sendAvroSRData( + topic=self.topic, + value=[message for _ in range(100)], + value_schema=avro.loads(schema), + headers=self.test_headers, + ) + diff --git a/test/test_suit/iceberg_avro_aws.py b/test/test_suit/iceberg_avro_aws.py index f6cf70625..2952616af 100644 --- a/test/test_suit/iceberg_avro_aws.py +++ b/test/test_suit/iceberg_avro_aws.py @@ -17,27 +17,11 @@ def setup(self): def send(self): - value = [] - - for e in range(100): - value.append(self.test_message_from_docs) - - self.driver.sendAvroSRData( - topic=self.topic, - value=value, - value_schema=avro.loads(self.test_message_from_docs_schema), - headers=self.test_headers, - ) + self._send_avro_messages(self.test_message_from_docs, self.test_message_from_docs_schema) def verify(self, round): - number_of_records = self.driver.select_number_of_records(self.topic) - if number_of_records == 0: - raise RetryableError() - elif number_of_records != 100: - raise NonRetryableError( - "Number of record in table is different from number of record sent" - ) + self._assert_number_of_records_in_table(100) first_record = ( self.driver.snowflake_conn.cursor() diff --git a/test/test_suit/iceberg_schema_evolution_avro_aws.py b/test/test_suit/iceberg_schema_evolution_avro_aws.py index 74a80c27d..13348f9da 100644 --- a/test/test_suit/iceberg_schema_evolution_avro_aws.py +++ b/test/test_suit/iceberg_schema_evolution_avro_aws.py @@ -15,8 +15,19 @@ def setup(self): def send(self): - pass + self._send_avro_messages(self.test_message_from_docs, self.test_message_from_docs_schema) + self._send_avro_messages(self.test_message_for_schema_evolution_1, self.test_message_for_schema_evolution_schema) + self._send_avro_messages(self.test_message_for_schema_evolution_2, self.test_message_for_schema_evolution_schema) def verify(self, round): - pass + self._assert_number_of_records_in_table(300) + + actual_record_from_docs_dict = self._select_schematized_record_with_offset(1) + self._verify_iceberg_content_from_docs(actual_record_from_docs_dict) + + actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100) + self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1) + + actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200) + self._verify_iceberg_content_for_schema_evolution_2(actual_record_for_schema_evolution_2) \ No newline at end of file diff --git a/test/test_suit/iceberg_schema_evolution_json_aws.py b/test/test_suit/iceberg_schema_evolution_json_aws.py index 2c99ea23c..82f46e18e 100644 --- a/test/test_suit/iceberg_schema_evolution_json_aws.py +++ b/test/test_suit/iceberg_schema_evolution_json_aws.py @@ -26,9 +26,12 @@ def verify(self, round): self._assert_number_of_records_in_table(200) actual_record_from_docs_dict = self._select_schematized_record_with_offset(1) + self._verify_iceberg_content_from_docs(actual_record_from_docs_dict) + actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100) + self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1) + # TODO SNOW-1731264 # actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200) - print(actual_record_from_docs_dict) - self._verify_iceberg_content_from_docs(actual_record_from_docs_dict) +