Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clear data and sst files after manifest flush #457

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deltacat/storage/rivulet/writer/memtable_dataset_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ def flush(self) -> str:

manifest_file = self.location_provider.new_manifest_file_uri()
self.__write_manifest_file(manifest_file)

self._sst_files.clear()
self._data_files.clear()

return manifest_file.location

def __enter__(self) -> Any:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest

from deltacat.storage.rivulet.fs.file_location_provider import FileLocationProvider
from deltacat.storage.rivulet.fs.file_store import FileStore
from deltacat.storage.rivulet.metastore.manifest import JsonManifestIO
from deltacat.storage.rivulet import Schema
from deltacat.storage.rivulet.schema.datatype import Datatype
from deltacat.storage.rivulet.writer.memtable_dataset_writer import MemtableDatasetWriter


@pytest.fixture
def test_schema():
return Schema(
fields=[
("id", Datatype.int32()),
("name", Datatype.string()),
],
merge_keys="id",
)


@pytest.fixture
def location_provider(tmp_path):
return FileLocationProvider(str(tmp_path))


@pytest.fixture
def file_store():
return FileStore()


@pytest.fixture
def writer(location_provider, test_schema):
return MemtableDatasetWriter(
location_provider=location_provider,
schema=test_schema
)

def test_write_after_flush(writer, file_store):
writer.write_dict({"id": 100, "name": "alpha"})
manifest_uri_1 = writer.flush()

manifest_io = JsonManifestIO()
manifest_1 = manifest_io.read(file_store.new_input_file(manifest_uri_1))
data_files_1 = manifest_1.data_files
sst_files_1 = manifest_1.sst_files

assert len(data_files_1) > 0, "First flush: no data files found."
assert len(sst_files_1) > 0, "First flush: no SST files found."
assert manifest_1.context.schema == writer.schema, "Schema mismatch in first flush."

writer.write_dict({"id": 200, "name": "gamma"})
manifest_uri_2 = writer.flush()

manifest_2 = manifest_io.read(file_store.new_input_file(manifest_uri_2))
data_files_2 = manifest_2.data_files
sst_files_2 = manifest_2.sst_files

assert len(data_files_2) > 0, "Second flush: no data files found."
assert len(sst_files_2) > 0, "Second flush: no SST files found."

# ensures data_files and sst_files from first write are not included in second write.
assert data_files_1.isdisjoint(data_files_2), \
"Expected no overlap of data files between first and second flush."
assert sst_files_1.isdisjoint(sst_files_2), \
"Expected no overlap of SST files between first and second flush."
assert manifest_2.context.schema == writer.schema, "Schema mismatch in second flush."
Loading