diff --git a/CHANGELOG.md b/CHANGELOG.md index c0f652716c..f9807d64ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Changed - Mode is now available for use with categorical metrics when running joined spatial aggregates via api. [#2021](https://github.com/Flowminder/FlowKit/issues/2021) +- Flowmachine now includes the version number in query ids which means cache entries are per-version. [#4489](https://github.com/Flowminder/FlowKit/issues/4489) ### Fixed - Fixed dangling async tasks not being properly cancelled during server shutdown [#6833](https://github.com/Flowminder/FlowKit/issues/6833) diff --git a/flowmachine/flowmachine/core/cache.py b/flowmachine/flowmachine/core/cache.py index 12962c9010..7188a81dc4 100644 --- a/flowmachine/flowmachine/core/cache.py +++ b/flowmachine/flowmachine/core/cache.py @@ -29,6 +29,7 @@ from flowmachine.core.query_state import QueryStateMachine, QueryEvent from flowmachine import __version__ + if TYPE_CHECKING: from .query import Query from .connection import Connection @@ -58,20 +59,7 @@ def get_obj_or_stub(connection: "Connection", query_id: str): the dependencies it had if not. """ - from flowmachine.core.query import Query - - class QStub(Query): - def __init__(self, deps, qid): - self.deps = deps - self._md5 = qid - super().__init__() - - def _make_query(self): - pass - - @property - def column_names(self): - pass + from .query_stub import QStub try: return get_query_object_by_id(connection, query_id) @@ -81,12 +69,16 @@ def column_names(self): AttributeError, pickle.UnpicklingError, IndexError, + ValueError, ) as exc: logger.debug("Can't unpickle, creating stub.", query_id=query_id, exception=exc) qry = f"SELECT depends_on FROM cache.dependencies WHERE query_id='{query_id}'" return QStub( deps=[get_obj_or_stub(connection, res[0]) for res in connection.fetch(qry)], qid=query_id, + flowmachine_version=connection.fetch( + f"SELECT version FROM cache.cached WHERE query_id='{query_id}'" + )[0][0], ) @@ -306,6 +298,12 @@ def write_cache_metadata( ) if not in_cache: + logger.debug( + "Writing dependencies", + query_id=query.query_id, + dependencies=query.dependencies, + stored_dependencies=query._get_stored_dependencies(exclude_self=True), + ) for dep in query._get_stored_dependencies(exclude_self=True): connection.exec_driver_sql( "INSERT INTO cache.dependencies values (%(query_id)s, %(dep_id)s) ON CONFLICT DO NOTHING", @@ -473,9 +471,13 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query": The original query object. """ - qry = f"SELECT obj FROM cache.cached WHERE query_id='{query_id}'" + qry = f"SELECT obj, version FROM cache.cached WHERE query_id='{query_id}'" try: - obj = connection.fetch(qry)[0][0] + obj, version = connection.fetch(qry)[0] + if version != __version__: + raise ValueError( + f"Query id '{query_id}' belongs to a different flowmachine version (query version {version}, our version {__version__})." + ) return pickle.loads(obj) except IndexError: raise ValueError(f"Query id '{query_id}' is not in cache on this connection.") diff --git a/flowmachine/flowmachine/core/query.py b/flowmachine/flowmachine/core/query.py index a1959014b0..96b5b9ed80 100644 --- a/flowmachine/flowmachine/core/query.py +++ b/flowmachine/flowmachine/core/query.py @@ -113,8 +113,9 @@ def query_id(self): pass hashes.append(str(item)) - hashes.append(self.__class__.__name__) hashes.sort() + hashes.append(self.__class__.__name__) + hashes.append(flowmachine.__version__) self._md5 = md5(str(hashes).encode()).hexdigest() return self._md5 diff --git a/flowmachine/flowmachine/core/query_stub.py b/flowmachine/flowmachine/core/query_stub.py new file mode 100644 index 0000000000..aa40b8d012 --- /dev/null +++ b/flowmachine/flowmachine/core/query_stub.py @@ -0,0 +1,60 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from flowmachine.core.query import Query + + +class QStub(Query): + """ + A stub representation of a Query object used for version-aware caching. + + This class serves as a lightweight placeholder for cached queries, + particularly when handling queries from different versions. + + Parameters + ---------- + deps : list[Query] + The dependencies of the original query + qid : str + The query ID hash of the original query + flowmachine_version : str + The flowmachine version used to create this cache record + """ + + def __init__(self, deps: list["Query"], qid: str, flowmachine_version: str) -> None: + """ + Parameters + ---------- + deps : list[Query] + The dependencies of the original query + qid : str + The query ID hash of the original query + """ + self.deps = deps + self._md5 = qid + self.flowmachine_version = flowmachine_version + super().__init__() + + def _make_query(self) -> str: + """ + Not implemented for stub queries. + + Raises + ------ + NotImplementedError + Always, as stub queries cannot generate SQL. + """ + raise NotImplementedError("Stub queries cannot generate SQL") + + @property + def column_names(self) -> list[str]: + """ + Not implemented for stub queries. + + Raises + ------ + NotImplementedError + Always, as stub queries cannot generate SQL. + """ + raise NotImplementedError("Stub queries cannot provide column names.") diff --git a/flowmachine/tests/test_cache.py b/flowmachine/tests/test_cache.py index 68efd8ad92..ac2b1dc54e 100644 --- a/flowmachine/tests/test_cache.py +++ b/flowmachine/tests/test_cache.py @@ -18,6 +18,7 @@ from flowmachine.core.errors.flowmachine_errors import QueryCancelledException from flowmachine.core.query import Query from flowmachine.core.query_state import QueryState +from flowmachine.core.query_stub import QStub from flowmachine.features import daily_location, ModalLocation, Flows @@ -308,7 +309,7 @@ def column_names(self): return ["value"] def _make_query(self): - return "select 1 as value" + return self.nested.get_query() q = NestTestQuery() q_id = q.query_id @@ -333,6 +334,21 @@ def test_retrieve_novel_query_with_dependency( assert not from_cache.is_stored +def test_retrieve_query_other_version(flowmachine_connect, monkeypatch): + """ + Test that we can get a query object which is a different version. Should give a QStub + """ + with monkeypatch.context() as ctxt: + ctxt.setattr("flowmachine.__version__", "OLD_VERSION") + ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION") + dl = daily_location("2016-01-01").store().result() + query_id = dl.query_id + del Query._QueryPool[query_id] + other_version = get_obj_or_stub(get_db(), query_id) + assert isinstance(other_version, QStub) + assert other_version.flowmachine_version == "OLD_VERSION" + + def test_df_not_pickled(): """ Test that a pickled query does not contain a dataframe. diff --git a/flowmachine/tests/test_cache_utils.py b/flowmachine/tests/test_cache_utils.py index 04eed46190..b3020c5da2 100644 --- a/flowmachine/tests/test_cache_utils.py +++ b/flowmachine/tests/test_cache_utils.py @@ -5,6 +5,8 @@ """ Tests for cache management utilities. """ +import re + from cachey import Scorer from unittest.mock import Mock, MagicMock @@ -38,6 +40,7 @@ from flowmachine.core.context import get_db, get_redis, get_executor from flowmachine.core.query_state import QueryState, QueryStateMachine from flowmachine.features import daily_location +from flowmachine import __version__ class TestException(Exception): @@ -386,6 +389,23 @@ def test_get_query_object_by_id(flowmachine_connect): assert dl.get_query() == retrieved_query.get_query() +def test_get_query_object_by_id_other_version(flowmachine_connect, monkeypatch): + """ + Test that we raise a value error with differing version. + """ + with monkeypatch.context() as ctxt: + ctxt.setattr("flowmachine.__version__", "OLD_VERSION") + ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION") + dl = daily_location("2016-01-01").store().result() + with pytest.raises( + ValueError, + match=re.escape( + f"Query id '5525f9a94aeef6c5387287f2e1dfb2a7' belongs to a different flowmachine version (query version OLD_VERSION, our version {__version__})." + ), + ): + get_query_object_by_id(get_db(), dl.query_id) + + def test_delete_query_by_id(flowmachine_connect): """ Test that we can remove a query from cache by the query's id diff --git a/flowmachine/tests/test_dependency_graph.py b/flowmachine/tests/test_dependency_graph.py index 5b665c79c7..41807e1b07 100644 --- a/flowmachine/tests/test_dependency_graph.py +++ b/flowmachine/tests/test_dependency_graph.py @@ -32,10 +32,11 @@ ) -def test_print_dependency_tree(): +def test_print_dependency_tree(monkeypatch): """ Test that the expected dependency tree is printed for a daily location query (with an explicit subset). """ + monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION") subscriber_subsetter = make_subscriber_subsetter( CustomQuery( "SELECT duration, msisdn as subscriber FROM events.calls WHERE duration < 10", @@ -50,20 +51,20 @@ def test_print_dependency_tree(): """\ - + - + - - - - - - - - - - - - - - - - - - - - - + - + - + - + - - - """ diff --git a/flowmachine/tests/test_query_object_construction.py b/flowmachine/tests/test_query_object_construction.py index efce395667..4af3cf72c4 100644 --- a/flowmachine/tests/test_query_object_construction.py +++ b/flowmachine/tests/test_query_object_construction.py @@ -73,7 +73,7 @@ def _example_coalesced_location_spec(preferred_dates, fallback_dates, aggregatio ) -def test_construct_query(diff_reporter): +def test_construct_query(diff_reporter, monkeypatch): """ Test that query objects constructed by construct_query_object() have the expected query ids. """ @@ -456,6 +456,7 @@ def test_construct_query(diff_reporter): def get_query_id_for_query_spec(query_spec): return FlowmachineQuerySchema().load(query_spec).query_id + monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION") query_ids_and_specs_as_json_string = json.dumps( {get_query_id_for_query_spec(spec): spec for spec in query_specs_to_test}, indent=2, diff --git a/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt b/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt index d4649bde87..66f9917bbb 100644 --- a/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt +++ b/flowmachine/tests/test_query_object_construction.test_construct_query.approved.txt @@ -1,5 +1,5 @@ { - "261b85fc9c64253889174ea8151f1d69": { + "75a22e4ebd0bc8818e058c6cc712cdf3": { "query_kind": "spatial_aggregate", "locations": { "query_kind": "daily_location", @@ -20,7 +20,7 @@ } } }, - "e142ffb3174add433422c2724a08c02b": { + "e395e2585d85a32f763b13a4772ee7f8": { "query_kind": "spatial_aggregate", "locations": { "query_kind": "daily_location", @@ -32,7 +32,7 @@ "sampling": null } }, - "93580818b4cf2b30071c3c46c09e9de1": { + "e926609096b5df6284f922ad7acf1d8a": { "query_kind": "location_event_counts", "start_date": "2016-01-01", "end_date": "2016-01-02", @@ -42,7 +42,7 @@ "event_types": null, "subscriber_subset": null }, - "b48d02838163766771eeed8cd8aafd98": { + "6dcc59b33263cb81eb5055f472da432f": { "query_kind": "spatial_aggregate", "locations": { "query_kind": "modal_location", @@ -64,11 +64,11 @@ ] } }, - "6521353e7563ed700dfd2cf90721934b": { + "71044ad1c2135c73e497c96aaf2fbcbb": { "query_kind": "geography", "aggregation_unit": "admin3" }, - "75b0532a747b473f69b45b78fdc29865": { + "02536d8c735132b83c2b1664eaa6a781": { "query_kind": "meaningful_locations_aggregate", "aggregation_unit": "admin1", "start_date": "2016-01-01", @@ -161,7 +161,7 @@ "tower_cluster_call_threshold": 0, "subscriber_subset": null }, - "5df56eee4dc96ed961f8eb76583cc6de": { + "2f30ce9f395263f08ad3e718f2aca782": { "query_kind": "meaningful_locations_between_label_od_matrix", "aggregation_unit": "admin1", "start_date": "2016-01-01", @@ -256,7 +256,7 @@ "event_types": null, "subscriber_subset": null }, - "00bee92b22b1ceb98244c5700a079656": { + "208871abc6d79eea5eb0110fdef2c0f8": { "query_kind": "meaningful_locations_between_dates_od_matrix", "aggregation_unit": "admin1", "start_date_a": "2016-01-01", @@ -355,7 +355,7 @@ ], "subscriber_subset": null }, - "c3dc95da89aeb0908b9398918dab6f26": { + "2fac0598bc3b873b66d5f53e3574f031": { "query_kind": "flows", "from_location": { "query_kind": "daily_location", @@ -371,7 +371,7 @@ }, "join_type": "left outer" }, - "95fbc18554e15733df47bfd5cbaa3f87": { + "02f6857b03d82eacbdfe14391b2e97b6": { "query_kind": "flows", "from_location": { "query_kind": "majority_location", @@ -418,7 +418,7 @@ } } }, - "920dfdf5568d75921c4173da8bccc6ef": { + "7f97219a0468e969dd171f16f33a5f6c": { "query_kind": "labelled_spatial_aggregate", "locations": { "query_kind": "coalesced_location", @@ -640,7 +640,7 @@ "stay_length_threshold": 2 } }, - "c915d87b66df83904634990e2f78da9e": { + "475e01b47e232e52f34c18e0ef868a6c": { "query_kind": "labelled_flows", "from_location": { "query_kind": "coalesced_location", diff --git a/flowmachine/tests/test_table.py b/flowmachine/tests/test_table.py index 51955abf26..f73ad811e6 100644 --- a/flowmachine/tests/test_table.py +++ b/flowmachine/tests/test_table.py @@ -71,18 +71,19 @@ def test_get_table_is_self(): assert t.get_table() is t -def test_dependencies(): +def test_dependencies(monkeypatch): """ Check that a table without explicit columns has no other queries as a dependency, and a table with explicit columns has its parent table as a dependency. """ + monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION") t1 = Table("events.calls") assert t1.dependencies == set() t2 = Table("events.calls", columns=["id"]) assert len(t2.dependencies) == 1 t2_parent = t2.dependencies.pop() - assert "057addedac04dbeb1dcbbb6b524b43f0" == t2_parent.query_id + assert t2_parent.query_id == "397c8cee22d63ac718ccce8c20a2eae9" def test_subset():