diff --git a/test/test_suit/base_iceberg_test.py b/test/test_suit/base_iceberg_test.py index 58c423197..5131e5993 100644 --- a/test/test_suit/base_iceberg_test.py +++ b/test/test_suit/base_iceberg_test.py @@ -45,6 +45,18 @@ def __init__(self, driver, name_salt: str, config_file_name: str): } } + self.test_message_for_schema_evolution_3 = { + "extra_null_long": None, + "null_long": None, + "null_array": None, + "null_object": None, + "empty_array": [], + "some_object": { + "null_key": None, + "string_key": "string_key" + } + } + self.test_message_from_docs_schema = """ { "type":"record", diff --git a/test/test_suit/iceberg_schema_evolution_json_aws.py b/test/test_suit/iceberg_schema_evolution_json_aws.py index a3d81c14f..0390dfede 100644 --- a/test/test_suit/iceberg_schema_evolution_json_aws.py +++ b/test/test_suit/iceberg_schema_evolution_json_aws.py @@ -16,23 +16,28 @@ def setup(self): def send(self): self._send_json_values(self.test_message_from_docs, 100) - # first 10 messages should be discarded due to lack of schema for null fields + # first 10 messages should be discarded due to lack of schema for null fields, but after schema evolution + # coming from the next messages, offset should be reset and the messages should once again consumed and inserted self._send_json_values(self.test_message_for_schema_evolution_1, 10) + # this message should be never inserted due to lack of schema for one extra null field + self._send_json_values(self.test_message_for_schema_evolution_3, 10) self._send_json_values(self.test_message_for_schema_evolution_2, 100) # now with the schema coming from test_message_for_schema_evolution_2 we should be able to insert null values self._send_json_values(self.test_message_for_schema_evolution_1, 10) + # this message should be never inserted due to lack of schema for one extra null field + self._send_json_values(self.test_message_for_schema_evolution_3, 10) def verify(self, round): - self._assert_number_of_records_in_table(210) + self._assert_number_of_records_in_table(220) actual_record_from_docs_dict = self._select_schematized_record_with_offset(1) self._verify_iceberg_content_from_docs(actual_record_from_docs_dict) - actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(100) + actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(120) self._verify_iceberg_content_for_schema_evolution_2(actual_record_for_schema_evolution_2) - actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(200) + actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(220) self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1)