diff --git a/python/delta_sharing/tests/test_rest_client.py b/python/delta_sharing/tests/test_rest_client.py index 8d9ee61a4..4cd8b1324 100644 --- a/python/delta_sharing/tests/test_rest_client.py +++ b/python/delta_sharing/tests/test_rest_client.py @@ -633,6 +633,141 @@ def test_list_files_in_table_timestamp( assert isinstance(e, HTTPError) assert "is after the latest available version" in str(e) +pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_with_embedded_newlines(rest_client: DataSharingRestClient): + # Testing the updated function with JSON data containing embedded newline characters + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "12345", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file1.parquet", "id": "abc123", "partitionValues": {}, "size": 1000, "stats": "{\"numRecords\": 100, \"value\": \"This is a multiline\nstring with embedded newlines\"}"}}\n' + '{"file": {"url": "file2.parquet", "id": "def456", "partitionValues": {}, "size": 1500, "stats": "{\"numRecords\": 150, \"value\": \"Another\nmultiline\nstring\"}"}}' + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '2' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_newlines", share="share_with_newlines", schema="default") + ) + + assert response.delta_table_version == 2 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="12345", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert response.add_files == [ + AddFile( + url="file1.parquet", + id="abc123", + partition_values={}, + size=1000, + stats=( + '{"numRecords": 100, "value": "This is a multiline\nstring with embedded newlines"}' + ), + ), + AddFile( + url="file2.parquet", + id="def456", + partition_values={}, + size=1500, + stats=( + '{"numRecords": 150, "value": "Another\nmultiline\nstring"}' + ), + ) + ] + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_partial_json_objects(rest_client: DataSharingRestClient): + # Testing the handling of partial JSON objects in the response + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "67890", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file3.parquet", "id": "ghi789", "partitionValues": {}, "size": 1200, "stats": "{\"numRecords\": 120, \"value\": \"Partial\nJSON\nobject\"}"}}\n' + '{"file": {"url": "file4.parquet", "id": "jkl012", "partitionValues": {}, "size": 1700, "stats": "{\"numRecords\": 170}' # Incomplete JSON object + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '3' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_partial_json", share="share_with_partial_json", schema="default") + ) + + assert response.delta_table_version == 3 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="67890", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert len(response.add_files) == 1 # Only the valid JSON object should be parsed + assert response.add_files[0] == AddFile( + url="file3.parquet", + id="ghi789", + partition_values={}, + size=1200, + stats=( + '{"numRecords": 120, "value": "Partial\nJSON\nobject"}' + ), + ) + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_json_decode_error(rest_client: DataSharingRestClient): + # Testing handling of JSON decode errors with problematic lines + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "98765", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file5.parquet", "id": "mno345", "partitionValues": {}, "size": 2000, "stats": "{\"numRecords\": 1074361, \"value\": \"Develop2.1 Pipeline, Veracode \"}"}}\n' + '{"file": {"url": "file6.parquet", "id": "pqr678", "partitionValues": {}, "size": 2500, "stats": "{\"numRecords\": 200, \"value\": \"Another value\"}"}}' + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '4' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_json_decode_error", share="share_with_json_decode_error", schema="default") + ) + + assert response.delta_table_version == 4 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="98765", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert response.add_files == [ + AddFile( + url="file5.parquet", + id="mno345", + partition_values={}, + size=2000, + stats=( + '{"numRecords": 1074361, "value": "Develop2.1 Pipeline, Veracode "}' + ), + ), + AddFile( + url="file6.parquet", + id="pqr678", + partition_values={}, + size=2500, + stats=( + '{"numRecords": 200, "value": "Another value"}' + ), + ) + ] @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_list_table_changes(