Skip to content

Commit

Permalink
feat: implement editable dataframe to manage dataframe editor component
Browse files Browse the repository at this point in the history
* feat: implement record_remove on EditableDataframe
  • Loading branch information
FabienArcellier committed Jul 4, 2024
1 parent e980afa commit eae79c0
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 18 deletions.
166 changes: 148 additions & 18 deletions src/writer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,13 @@ def carry_mutation_flag(base_key, child_key):
for child_key, child_mutation in child_mutations.items():
nested_key = carry_mutation_flag(escaped_key, child_key)
serialised_mutations[nested_key] = child_mutation
elif f"+{key}" in self.mutated or \
(isinstance(value, MutableValue) is True and value.mutated()):
elif f"+{key}" in self.mutated:
try:
serialised_value = state_serialiser.serialise(value)
except BaseException:
raise ValueError(f"""Couldn't serialise value of type "{ type(value) }" for key "{ key }".""")
serialised_mutations[f"+{escaped_key}"] = serialised_value
elif isinstance(value, MutableValue) is True and value.mutated():
try:
serialised_value = state_serialiser.serialise(value)
value.reset_mutation()
Expand Down Expand Up @@ -1582,17 +1587,17 @@ def record_update(df: Any, payload: DataframeRecordUpdated) -> Any:
signature of the methods to be implemented to process wf-dfeditor-update event
>>> edf = EditableDataframe(df)
>>> edf.record_update({"record_id": 12, "record": {"a": 1, "b": 2}})
>>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}})
"""
raise NotImplementedError

@staticmethod
def record_delete(df: Any, payload: DataframeRecordUpdated) -> Any:
def record_remove(df: Any, payload: DataframeRecordRemoved) -> Any:
"""
signature of the methods to be implemented to process wf-dfeditor-remove event
>>> edf = EditableDataframe(df)
>>> edf.record_delete({"record_id": 12})
>>> edf.record_remove({"record_index": 12})
"""
raise NotImplementedError

Expand Down Expand Up @@ -1623,25 +1628,49 @@ def record_add(df: pandas.DataFrame, payload: DataframeRecordAdded) -> pandas.Da
>>> edf = EditableDataframe(df)
>>> edf.record_add({"record": {"a": 1, "b": 2}})
"""
_assert_record_match_pandas_df(df, payload['record'])

record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names)

new_df = pandas.DataFrame([record], index=[index])
return pandas.concat([df, new_df])

@staticmethod
def record_update(df: pandas.DataFrame, payload: DataframeRecordUpdated):
raise NotImplementedError
"""
>>> edf = EditableDataframe(df)
>>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}})
"""
_assert_record_match_pandas_df(df, payload['record'])

record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names)

record_index = payload['record_index']
df.iloc[record_index] = record

index_list = df.index.tolist()
index_list[record_index] = index
df.index = index_list

return df

@staticmethod
def record_delete(df: pandas.DataFrame, payload: DataframeRecordUpdated):
raise NotImplementedError
def record_remove(df: pandas.DataFrame, payload: DataframeRecordRemoved):
"""
>>> edf = EditableDataframe(df)
>>> edf.record_remove({"record_index": 12})
"""
record_index: int = payload['record_index']
idx = df.index[record_index]
df = df.drop(idx)

return df

@staticmethod
def pyarrow_table(df: pandas.DataFrame) -> pyarrow.Table:
"""
Serializes the dataframe into a pyarrow table
"""
df['__record_id'] = range(1, len(df) + 1)
table = pyarrow.Table.from_pandas(df=df)
return table

Expand All @@ -1662,17 +1691,34 @@ def match(df: Any) -> bool:

@staticmethod
def record_add(df: 'polars.DataFrame', payload: DataframeRecordAdded) -> 'polars.DataFrame':
_assert_record_match_polar_df(df, payload['record'])

import polars
new_df = polars.DataFrame([payload['record']])
return polars.concat([df, new_df])

@staticmethod
def record_update(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame':
raise NotImplementedError
# This implementation works but is not optimal.
# I didn't find a better way to update a record in polars
#
# https://github.com/pola-rs/polars/issues/5973
_assert_record_match_polar_df(df, payload['record'])

record = payload['record']
record_index = payload['record_index']
for r in record:
df[record_index, r] = record[r]

return df

@staticmethod
def record_delete(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame':
raise NotImplementedError
def record_remove(df: 'polars.DataFrame', payload: DataframeRecordRemoved) -> 'polars.DataFrame':
import polars

record_index: int = payload['record_index']
df_filtered = polars.concat([df[:record_index], df[record_index + 1:]])
return df_filtered

@staticmethod
def pyarrow_table(df: 'polars.DataFrame') -> pyarrow.Table:
Expand All @@ -1698,16 +1744,24 @@ def match(df: Any) -> bool:

@staticmethod
def record_add(df: List[Dict[str, Any]], payload: DataframeRecordAdded) -> List[Dict[str, Any]]:
_assert_record_match_list_of_records(df, payload['record'])
df.append(payload['record'])
return df

@staticmethod
def record_update(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]:
raise NotImplementedError
_assert_record_match_list_of_records(df, payload['record'])

record_index = payload['record_index']
record = payload['record']

df[record_index] = record
return df

@staticmethod
def record_delete(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]:
raise NotImplementedError
def record_remove(df: List[Dict[str, Any]], payload: DataframeRecordRemoved) -> List[Dict[str, Any]]:
del(df[payload['record_index']])
return df

@staticmethod
def pyarrow_table(df: List[Dict[str, Any]]) -> pyarrow.Table:
Expand Down Expand Up @@ -1761,18 +1815,57 @@ def df(self, value: Union[pandas.DataFrame, 'polars.DataFrame', List[dict], List
self.mutate()

def record_add(self, payload: DataframeRecordAdded) -> None:
"""
Adds a record to the dataframe
>>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> edf = EditableDataframe(df)
>>> edf.record_add({"record": {"a": 1, "b": 2}})
"""
assert self.processor is not None

self._df = self.processor.record_add(self.df, payload)
self.mutate()

def record_update(self, payload: DataframeRecordUpdated) -> None:
pass
"""
Updates a record in the dataframe
def record_delete(self, payload: DataframeRecordRemoved) -> None:
pass
The record must be complete otherwise an error is raised (ValueError).
It must a value for each index / column.
>>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> edf = EditableDataframe(df)
>>> edf.record_update({"record_index": 0, "record": {"a": 2, "b": 2}})
"""
assert self.processor is not None

self._df = self.processor.record_update(self.df, payload)
self.mutate()

def record_remove(self, payload: DataframeRecordRemoved) -> None:
"""
Removes a record from the dataframe
>>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> edf = EditableDataframe(df)
>>> edf.record_remove({"record_index": 0})
"""
assert self.processor is not None

self._df = self.processor.record_remove(self.df, payload)
self.mutate()

def pyarrow_table(self) -> pyarrow.Table:
"""
Serializes the dataframe into a pyarrow table
This mechanism is used for serializing data for transmission to the frontend.
>>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]})
>>> edf = EditableDataframe(df)
>>> pa_table = edf.pyarrow_table()
"""
assert self.processor is not None

pa_table = self.processor.pyarrow_table(self.df)
Expand Down Expand Up @@ -1895,6 +1988,43 @@ async def _async_wrapper_internal(callable_handler: Callable, arg_values: List[A
result = await callable_handler(*arg_values)
return result

def _assert_record_match_pandas_df(df: pandas.DataFrame, record: Dict[str, Any]) -> None:
"""
Asserts that the record matches the dataframe columns & index
>>> _assert_record_match_pandas_df(pandas.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2})
"""
columns = set(list(df.columns.values) + df.index.names) if isinstance(df.index, pandas.RangeIndex) is False else set(df.columns.values)
columns_record = set(record.keys())
if columns != columns_record:
raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}")

def _assert_record_match_polar_df(df: 'polars.DataFrame', record: Dict[str, Any]) -> None:
"""
Asserts that the record matches the columns of polar dataframe
>>> _assert_record_match_pandas_df(polars.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2})
"""
columns = set(df.columns)
columns_record = set(record.keys())
if columns != columns_record:
raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}")

def _assert_record_match_list_of_records(df: List[Dict[str, Any]], record: Dict[str, Any]) -> None:
"""
Asserts that the record matches the key in the record list (it use the first record to check)
>>> _assert_record_match_list_of_records([{"a": 1, "b": 2}, {"a": 3, "b": 4}], {"a": 1, "b": 2})
"""
if len(df) == 0:
return

columns = set(list(df[0].keys()))
columns_record = set(record.keys())
if columns != columns_record:
raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}")


def _split_record_as_pandas_record_and_index(param: dict, index_columns: list) -> Tuple[dict, tuple]:
"""
Separates a record into the record part and the index part to be able to
Expand Down
92 changes: 92 additions & 0 deletions tests/backend/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,40 @@ def test_editable_dataframe_should_process_new_record_into_dataframe_with_multii
# Then
assert len(edf.df) == 4

def test_editable_dataframe_should_update_existing_record_as_dateframe_with_multiindex(self) -> None:
df = pandas.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["Paris", "London", "New York"]
})

df = df.set_index(['name', 'city'])

edf = wf.EditableDataframe(df)

# When
edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25, "city": "Paris"}})

# Then
assert edf.df.iloc[0]['age'] == 25

def test_editable_dataframe_should_remove_existing_record_as_dateframe_with_multiindex(self) -> None:
df = pandas.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["Paris", "London", "New York"]
})

df = df.set_index(['name', 'city'])

edf = wf.EditableDataframe(df)

# When
edf.record_remove({"record_index": 0})

# Then
assert len(edf.df) == 2

def test_editable_dataframe_should_serialize_pandas_dataframe_with_multiindex(self) -> None:
df = pandas.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
Expand Down Expand Up @@ -1155,6 +1189,33 @@ def test_editable_dataframe_should_process_new_record_into_polar_dataframe(self)
# Then
assert len(edf.df) == 4

def test_editable_dataframe_should_update_existing_record_into_polar_dataframe(self) -> None:
df = polars.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35]
})

edf = wf.EditableDataframe(df)

# When
edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25}})

# Then
assert edf.df[0, "name"] == "Alicia"

def test_editable_dataframe_should_remove_existing_record_into_polar_dataframe(self) -> None:
df = polars.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35]
})

edf = wf.EditableDataframe(df)

# When
edf.record_remove({"record_index": 0})

# Then
assert len(edf.df) == 2

def test_editable_dataframe_should_serialize_polar_dataframe(self) -> None:
df = polars.DataFrame({
Expand Down Expand Up @@ -1201,6 +1262,37 @@ def test_editable_dataframe_should_process_new_record_into_list_of_records(self)
assert len(edf.df) == 4


def test_editable_dataframe_should_update_existing_record_into_list_of_record(self) -> None:
records = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35}
]

edf = wf.EditableDataframe(records)

# When
edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25}})

# Then
assert edf.df[0]['name'] == "Alicia"

def test_editable_dataframe_should_remove_existing_record_into_list_of_record(self) -> None:
records = [
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Charlie", "age": 35}
]

edf = wf.EditableDataframe(records)

# When
edf.record_remove({"record_index": 0})

# Then
assert len(edf.df) == 2


def test_editable_dataframe_should_serialized_list_of_records_into_pyarrow_table(self) -> None:
records = [
{"name": "Alice", "age": 25},
Expand Down

0 comments on commit eae79c0

Please sign in to comment.