Skip to content

Commit

Permalink
SNOW-1831140 Complete Iceberg e2e tests (#1015)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Dec 3, 2024
1 parent 90670e4 commit 1a437d6
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 43 deletions.
238 changes: 217 additions & 21 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from test_suit.assertions import *
from test_suit.test_utils import RetryableError, NonRetryableError
import json
from confluent_kafka import avro

class BaseIcebergTest(BaseE2eTest):

Expand Down Expand Up @@ -49,33 +50,186 @@ def __init__(self, driver, name_salt: str, config_file_name: str):
"type":"record",
"name":"value_schema",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "body_temperature",
"type": [
"null",
"float"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
},
{
"name": "approved_coffee_types",
"type": [
"null",
{
"name": "id",
"type": "int"
},
"type": "array",
"items": "string"
}
]
},
{
"name": "animals_possessed",
"type": [
"null",
{
"type": "map",
"values": "boolean"
}
]
}
]
}
"""

self.test_message_for_schema_evolution_schema = """
{
"type": "record",
"name": "value_schema",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "body_temperature",
"type": [
"null",
"float"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
},
{
"name": "approved_coffee_types",
"type": [
"null",
{
"type": "array",
"items": "string"
}
]
},
{
"name": "animals_possessed",
"type": [
"null",
{
"name": "body_temperature",
"type": "float"
},
"type": "map",
"values": "boolean"
}
]
},
{
"name": "null_long",
"default": null,
"type": [
"null",
"long"
]
},
{
"name": "null_array",
"default": null,
"type": [
"null",
{
"name": "name",
"type": "string"
},
"type": "array",
"items": "int"
}
]
},
{
"name": "null_object",
"default": null,
"type": [
"null",
{
"type": "record",
"name": "null_object_record",
"fields": [
{
"name": "key",
"type": "string"
}
]
}
]
},
{
"name": "empty_array",
"default": null,
"type": [
"null",
{
"name": "approved_coffee_types",
"type": {
"type": "array",
"items": "string"
}
},
"type": "array",
"items": "int"
}
]
},
{
"name": "some_object",
"default": null,
"type": [
"null",
{
"name": "animals_possessed",
"type": {
"type": "map",
"values": "boolean"
}
"type": "record",
"name": "some_object_record",
"fields": [
{
"name": "null_key",
"type": [
"null",
"string"
]
},
{
"name": "string_key",
"type": "string"
},
{
"name": "another_string_key",
"type": [
"null",
"string"
]
},
{
"name": "inner_object",
"type": [
"null",
{
"type": "map",
"values": "int"
}
]
}
]
}
]
]
}
]
}
"""

Expand Down Expand Up @@ -128,6 +282,39 @@ def _verify_iceberg_content_from_docs(self, content: dict):
assert_equals(False, content['animals_possessed']['cats'])


def _verify_iceberg_content_for_schema_evolution_1(self, content: dict):
assert_equals(None, content['id'])
assert_equals(None, content['body_temperature'])
assert_equals(None, content['name'])
assert_equals(None, content['approved_coffee_types'])
assert_equals(None, content['animals_possessed'])

assert_equals(None, content['null_long'])
assert_equals(None, content['null_array'])
assert_equals(None, content['null_object'])
assert_equals([], content['empty_array'])
assert_equals(None, content['some_object']['null_key'])
assert_equals('string_key', content['some_object']['string_key'])


def _verify_iceberg_content_for_schema_evolution_2(self, content: dict):
assert_equals(None, content['id'])
assert_equals(None, content['body_temperature'])
assert_equals(None, content['name'])
assert_equals(None, content['approved_coffee_types'])
assert_equals(None, content['animals_possessed'])

assert_equals(2137, content['null_long'])
assert_equals([1, 2, 3], content['null_array'])
assert_equals('value', content['null_object']['key'])
assert_equals([1, 2, 3], content['empty_array'])
assert_equals(None, content['some_object']['null_key'])
assert_equals('string_key', content['some_object']['string_key'])
assert_equals('another_string_key', content['some_object']['another_string_key'])
assert_equals(456, content['some_object']['inner_object']['inner_object_key'])



def _verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
Expand Down Expand Up @@ -161,3 +348,12 @@ def _select_schematized_record_with_offset(self, offset: int) -> dict:
def __none_or_json_loads(self, value: str) -> dict:
return None if value is None else json.loads(value)


def _send_avro_messages(self, message: str, schema: str):
self.driver.sendAvroSRData(
topic=self.topic,
value=[message for _ in range(100)],
value_schema=avro.loads(schema),
headers=self.test_headers,
)

20 changes: 2 additions & 18 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,11 @@ def setup(self):


def send(self):
value = []

for e in range(100):
value.append(self.test_message_from_docs)

self.driver.sendAvroSRData(
topic=self.topic,
value=value,
value_schema=avro.loads(self.test_message_from_docs_schema),
headers=self.test_headers,
)
self._send_avro_messages(self.test_message_from_docs, self.test_message_from_docs_schema)


def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)
self._assert_number_of_records_in_table(100)

first_record = (
self.driver.snowflake_conn.cursor()
Expand Down
15 changes: 13 additions & 2 deletions test/test_suit/iceberg_schema_evolution_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ def setup(self):


def send(self):
pass
self._send_avro_messages(self.test_message_from_docs, self.test_message_from_docs_schema)
self._send_avro_messages(self.test_message_for_schema_evolution_1, self.test_message_for_schema_evolution_schema)
self._send_avro_messages(self.test_message_for_schema_evolution_2, self.test_message_for_schema_evolution_schema)


def verify(self, round):
pass
self._assert_number_of_records_in_table(300)

actual_record_from_docs_dict = self._select_schematized_record_with_offset(1)
self._verify_iceberg_content_from_docs(actual_record_from_docs_dict)

actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100)
self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1)

actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200)
self._verify_iceberg_content_for_schema_evolution_2(actual_record_for_schema_evolution_2)
7 changes: 5 additions & 2 deletions test/test_suit/iceberg_schema_evolution_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ def verify(self, round):
self._assert_number_of_records_in_table(200)

actual_record_from_docs_dict = self._select_schematized_record_with_offset(1)
self._verify_iceberg_content_from_docs(actual_record_from_docs_dict)

actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100)
self._verify_iceberg_content_for_schema_evolution_1(actual_record_for_schema_evolution_1)

# TODO SNOW-1731264
# actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200)

print(actual_record_from_docs_dict)
self._verify_iceberg_content_from_docs(actual_record_from_docs_dict)

0 comments on commit 1a437d6

Please sign in to comment.