diff --git a/src/writer/core.py b/src/writer/core.py index 06b92a990..a1d0fb0e7 100644 --- a/src/writer/core.py +++ b/src/writer/core.py @@ -394,8 +394,13 @@ def carry_mutation_flag(base_key, child_key): for child_key, child_mutation in child_mutations.items(): nested_key = carry_mutation_flag(escaped_key, child_key) serialised_mutations[nested_key] = child_mutation - elif f"+{key}" in self.mutated or \ - (isinstance(value, MutableValue) is True and value.mutated()): + elif f"+{key}" in self.mutated: + try: + serialised_value = state_serialiser.serialise(value) + except BaseException: + raise ValueError(f"""Couldn't serialise value of type "{ type(value) }" for key "{ key }".""") + serialised_mutations[f"+{escaped_key}"] = serialised_value + elif isinstance(value, MutableValue) is True and value.mutated(): try: serialised_value = state_serialiser.serialise(value) value.reset_mutation() @@ -1582,17 +1587,17 @@ def record_update(df: Any, payload: DataframeRecordUpdated) -> Any: signature of the methods to be implemented to process wf-dfeditor-update event >>> edf = EditableDataframe(df) - >>> edf.record_update({"record_id": 12, "record": {"a": 1, "b": 2}}) + >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) """ raise NotImplementedError @staticmethod - def record_delete(df: Any, payload: DataframeRecordUpdated) -> Any: + def record_remove(df: Any, payload: DataframeRecordRemoved) -> Any: """ signature of the methods to be implemented to process wf-dfeditor-remove event >>> edf = EditableDataframe(df) - >>> edf.record_delete({"record_id": 12}) + >>> edf.record_remove({"record_index": 12}) """ raise NotImplementedError @@ -1623,6 +1628,8 @@ def record_add(df: pandas.DataFrame, payload: DataframeRecordAdded) -> pandas.Da >>> edf = EditableDataframe(df) >>> edf.record_add({"record": {"a": 1, "b": 2}}) """ + _assert_record_match_pandas_df(df, payload['record']) + record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) new_df = pandas.DataFrame([record], index=[index]) @@ -1630,18 +1637,40 @@ def record_add(df: pandas.DataFrame, payload: DataframeRecordAdded) -> pandas.Da @staticmethod def record_update(df: pandas.DataFrame, payload: DataframeRecordUpdated): - raise NotImplementedError + """ + >>> edf = EditableDataframe(df) + >>> edf.record_update({"record_index": 12, "record": {"a": 1, "b": 2}}) + """ + _assert_record_match_pandas_df(df, payload['record']) + + record, index = _split_record_as_pandas_record_and_index(payload['record'], df.index.names) + + record_index = payload['record_index'] + df.iloc[record_index] = record + + index_list = df.index.tolist() + index_list[record_index] = index + df.index = index_list + + return df @staticmethod - def record_delete(df: pandas.DataFrame, payload: DataframeRecordUpdated): - raise NotImplementedError + def record_remove(df: pandas.DataFrame, payload: DataframeRecordRemoved): + """ + >>> edf = EditableDataframe(df) + >>> edf.record_remove({"record_index": 12}) + """ + record_index: int = payload['record_index'] + idx = df.index[record_index] + df = df.drop(idx) + + return df @staticmethod def pyarrow_table(df: pandas.DataFrame) -> pyarrow.Table: """ Serializes the dataframe into a pyarrow table """ - df['__record_id'] = range(1, len(df) + 1) table = pyarrow.Table.from_pandas(df=df) return table @@ -1662,17 +1691,34 @@ def match(df: Any) -> bool: @staticmethod def record_add(df: 'polars.DataFrame', payload: DataframeRecordAdded) -> 'polars.DataFrame': + _assert_record_match_polar_df(df, payload['record']) + import polars new_df = polars.DataFrame([payload['record']]) return polars.concat([df, new_df]) @staticmethod def record_update(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame': - raise NotImplementedError + # This implementation works but is not optimal. + # I didn't find a better way to update a record in polars + # + # https://github.com/pola-rs/polars/issues/5973 + _assert_record_match_polar_df(df, payload['record']) + + record = payload['record'] + record_index = payload['record_index'] + for r in record: + df[record_index, r] = record[r] + + return df @staticmethod - def record_delete(df: 'polars.DataFrame', payload: DataframeRecordUpdated) -> 'polars.DataFrame': - raise NotImplementedError + def record_remove(df: 'polars.DataFrame', payload: DataframeRecordRemoved) -> 'polars.DataFrame': + import polars + + record_index: int = payload['record_index'] + df_filtered = polars.concat([df[:record_index], df[record_index + 1:]]) + return df_filtered @staticmethod def pyarrow_table(df: 'polars.DataFrame') -> pyarrow.Table: @@ -1698,16 +1744,24 @@ def match(df: Any) -> bool: @staticmethod def record_add(df: List[Dict[str, Any]], payload: DataframeRecordAdded) -> List[Dict[str, Any]]: + _assert_record_match_list_of_records(df, payload['record']) df.append(payload['record']) return df @staticmethod def record_update(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]: - raise NotImplementedError + _assert_record_match_list_of_records(df, payload['record']) + + record_index = payload['record_index'] + record = payload['record'] + + df[record_index] = record + return df @staticmethod - def record_delete(df: List[Dict[str, Any]], payload: DataframeRecordUpdated) -> List[Dict[str, Any]]: - raise NotImplementedError + def record_remove(df: List[Dict[str, Any]], payload: DataframeRecordRemoved) -> List[Dict[str, Any]]: + del(df[payload['record_index']]) + return df @staticmethod def pyarrow_table(df: List[Dict[str, Any]]) -> pyarrow.Table: @@ -1761,18 +1815,57 @@ def df(self, value: Union[pandas.DataFrame, 'polars.DataFrame', List[dict], List self.mutate() def record_add(self, payload: DataframeRecordAdded) -> None: + """ + Adds a record to the dataframe + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_add({"record": {"a": 1, "b": 2}}) + """ assert self.processor is not None self._df = self.processor.record_add(self.df, payload) self.mutate() def record_update(self, payload: DataframeRecordUpdated) -> None: - pass + """ + Updates a record in the dataframe - def record_delete(self, payload: DataframeRecordRemoved) -> None: - pass + The record must be complete otherwise an error is raised (ValueError). + It must a value for each index / column. + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_update({"record_index": 0, "record": {"a": 2, "b": 2}}) + """ + assert self.processor is not None + + self._df = self.processor.record_update(self.df, payload) + self.mutate() + + def record_remove(self, payload: DataframeRecordRemoved) -> None: + """ + Removes a record from the dataframe + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> edf.record_remove({"record_index": 0}) + """ + assert self.processor is not None + + self._df = self.processor.record_remove(self.df, payload) + self.mutate() def pyarrow_table(self) -> pyarrow.Table: + """ + Serializes the dataframe into a pyarrow table + + This mechanism is used for serializing data for transmission to the frontend. + + >>> df = pandas.DataFrame({"a": [1, 2], "b": [3, 4]}) + >>> edf = EditableDataframe(df) + >>> pa_table = edf.pyarrow_table() + """ assert self.processor is not None pa_table = self.processor.pyarrow_table(self.df) @@ -1895,6 +1988,43 @@ async def _async_wrapper_internal(callable_handler: Callable, arg_values: List[A result = await callable_handler(*arg_values) return result +def _assert_record_match_pandas_df(df: pandas.DataFrame, record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the dataframe columns & index + + >>> _assert_record_match_pandas_df(pandas.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) + """ + columns = set(list(df.columns.values) + df.index.names) if isinstance(df.index, pandas.RangeIndex) is False else set(df.columns.values) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + +def _assert_record_match_polar_df(df: 'polars.DataFrame', record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the columns of polar dataframe + + >>> _assert_record_match_pandas_df(polars.DataFrame({"a": [1, 2], "b": [3, 4]}), {"a": 1, "b": 2}) + """ + columns = set(df.columns) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + +def _assert_record_match_list_of_records(df: List[Dict[str, Any]], record: Dict[str, Any]) -> None: + """ + Asserts that the record matches the key in the record list (it use the first record to check) + + >>> _assert_record_match_list_of_records([{"a": 1, "b": 2}, {"a": 3, "b": 4}], {"a": 1, "b": 2}) + """ + if len(df) == 0: + return + + columns = set(list(df[0].keys())) + columns_record = set(record.keys()) + if columns != columns_record: + raise ValueError(f"Columns mismatch. Expected {columns}, got {columns_record}") + + def _split_record_as_pandas_record_and_index(param: dict, index_columns: list) -> Tuple[dict, tuple]: """ Separates a record into the record part and the index part to be able to diff --git a/tests/backend/test_core.py b/tests/backend/test_core.py index e1868b050..ba2537285 100644 --- a/tests/backend/test_core.py +++ b/tests/backend/test_core.py @@ -1115,6 +1115,40 @@ def test_editable_dataframe_should_process_new_record_into_dataframe_with_multii # Then assert len(edf.df) == 4 + def test_editable_dataframe_should_update_existing_record_as_dateframe_with_multiindex(self) -> None: + df = pandas.DataFrame({ + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35], + "city": ["Paris", "London", "New York"] + }) + + df = df.set_index(['name', 'city']) + + edf = wf.EditableDataframe(df) + + # When + edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25, "city": "Paris"}}) + + # Then + assert edf.df.iloc[0]['age'] == 25 + + def test_editable_dataframe_should_remove_existing_record_as_dateframe_with_multiindex(self) -> None: + df = pandas.DataFrame({ + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35], + "city": ["Paris", "London", "New York"] + }) + + df = df.set_index(['name', 'city']) + + edf = wf.EditableDataframe(df) + + # When + edf.record_remove({"record_index": 0}) + + # Then + assert len(edf.df) == 2 + def test_editable_dataframe_should_serialize_pandas_dataframe_with_multiindex(self) -> None: df = pandas.DataFrame({ "name": ["Alice", "Bob", "Charlie"], @@ -1155,6 +1189,33 @@ def test_editable_dataframe_should_process_new_record_into_polar_dataframe(self) # Then assert len(edf.df) == 4 + def test_editable_dataframe_should_update_existing_record_into_polar_dataframe(self) -> None: + df = polars.DataFrame({ + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35] + }) + + edf = wf.EditableDataframe(df) + + # When + edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25}}) + + # Then + assert edf.df[0, "name"] == "Alicia" + + def test_editable_dataframe_should_remove_existing_record_into_polar_dataframe(self) -> None: + df = polars.DataFrame({ + "name": ["Alice", "Bob", "Charlie"], + "age": [25, 30, 35] + }) + + edf = wf.EditableDataframe(df) + + # When + edf.record_remove({"record_index": 0}) + + # Then + assert len(edf.df) == 2 def test_editable_dataframe_should_serialize_polar_dataframe(self) -> None: df = polars.DataFrame({ @@ -1201,6 +1262,37 @@ def test_editable_dataframe_should_process_new_record_into_list_of_records(self) assert len(edf.df) == 4 + def test_editable_dataframe_should_update_existing_record_into_list_of_record(self) -> None: + records = [ + {"name": "Alice", "age": 25}, + {"name": "Bob", "age": 30}, + {"name": "Charlie", "age": 35} + ] + + edf = wf.EditableDataframe(records) + + # When + edf.record_update({"record_index": 0, "record": {"name": "Alicia", "age": 25}}) + + # Then + assert edf.df[0]['name'] == "Alicia" + + def test_editable_dataframe_should_remove_existing_record_into_list_of_record(self) -> None: + records = [ + {"name": "Alice", "age": 25}, + {"name": "Bob", "age": 30}, + {"name": "Charlie", "age": 35} + ] + + edf = wf.EditableDataframe(records) + + # When + edf.record_remove({"record_index": 0}) + + # Then + assert len(edf.df) == 2 + + def test_editable_dataframe_should_serialized_list_of_records_into_pyarrow_table(self) -> None: records = [ {"name": "Alice", "age": 25},