Skip to content

Commit

Permalink
feat: declare optional schema on streamsync state
Browse files Browse the repository at this point in the history
* fix: issue on dictionary assignment
  • Loading branch information
FabienArcellier committed Mar 5, 2024
1 parent fb34f10 commit f1f4138
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
29 changes: 26 additions & 3 deletions src/streamsync/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,28 @@ def remove(self, key) -> None:
def _apply_raw(self, key) -> None:
self.mutated.add(key)

def apply(self, key) -> None:
self._apply_raw(f"+{key}")
def apply_mutation_marker(self, key: Optional[str] = None, recursive: bool = False) -> None:
"""
Adds the mutation marker to a state. The mutation marker is used to track changes in the state.
>>> self.apply_mutation_marker()
Add the mutation marker on a specific field
>>> self.apply_mutation_marker("field")
Add the mutation marker to a state and all of its children
>>> self.apply_mutation_marker(recursive=True)
"""
keys = [key] if key is not None else self.state.keys()

for k in keys:
self._apply_raw(f"+{k}")
if recursive is True:
value = self.state[k]
if isinstance(value, StateProxy):
value.apply_mutation_marker(recursive=True)

@staticmethod
def escape_key(key):
Expand All @@ -253,7 +273,7 @@ def carry_mutation_flag(base_key, child_key):
serialised_value = None

if isinstance(value, StateProxy):
if value.initial_assignment:
if f"+{key}" in self.mutated:
serialised_mutations[f"+{escaped_key}"] = serialised_value
value.initial_assignment = False
child_mutations = value.get_mutations_as_dict()
Expand Down Expand Up @@ -430,6 +450,9 @@ def _set_state_item(self, key: str, value: Any):
state.ingest(value)
self._state_proxy[key] = state._state_proxy
else:
if isinstance(value, StateProxy):
value.apply_mutation_marker(recursive=True)

self._state_proxy[key] = value


Expand Down
19 changes: 17 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_mutations(self) -> None:
assert d.get("features").get("height") == "short"
assert d.get("state.with.dots").get("photo.jpeg") == "Corrupted"

self.sp.apply("age")
self.sp.apply_mutation_marker("age")
m = self.sp.get_mutations_as_dict()

assert m.get("+age") == 2
Expand Down Expand Up @@ -169,16 +169,31 @@ def test_replace_dictionary_content_in_a_state_with_schema_should_transform_it_i
"""
Tests that replacing a dictionary content in a State without schema trigger mutations on all the children.
This processing must work after initialization and after recovering the mutations the first time.
>>> _state = State({'items': {}})
>>> _state["items"] = {k: v for k, v in _state["items"].items() if k != "Apple"}
"""

_state = State({"items": {
"Apple": {"name": "Apple", "type": "fruit"},
"Cucumber": {"name": "Cucumber", "type": "vegetable"},
"Lettuce": {"name": "Lettuce", "type": "vegetable"}
}})

m = _state._state_proxy.get_mutations_as_dict()
assert m == {
'+items': None,
'+items.Apple': None,
'+items.Apple.name': "Apple",
'+items.Apple.type': "fruit",
'+items.Cucumber': None,
'+items.Cucumber.name': 'Cucumber',
'+items.Cucumber.type': 'vegetable',
'+items.Lettuce': None,
'+items.Lettuce.name': 'Lettuce',
'+items.Lettuce.type': 'vegetable'
}

# When
items = _state['items']
items = {k: v for k, v in items.items() if k != "Apple"}
Expand Down

0 comments on commit f1f4138

Please sign in to comment.