Skip to content

Commit

Permalink
Include version in query hash
Browse files Browse the repository at this point in the history
Where we rely on knowing the query id in tests, I have monkeypatched the `__version__` attribute
  • Loading branch information
greenape committed Oct 22, 2024
1 parent 36f0d66 commit 395e3ac
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Changed
- Mode is now available for use with categorical metrics when running joined spatial aggregates via api. [#2021](https://github.com/Flowminder/FlowKit/issues/2021)
- Flowmachine now includes the version number in query ids which means cache entries are per-version. [#4489](https://github.com/Flowminder/FlowKit/issues/4489)

### Fixed
- Fixed dangling async tasks not being properly cancelled during server shutdown [#6833](https://github.com/Flowminder/FlowKit/issues/6833)
Expand Down
27 changes: 11 additions & 16 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from flowmachine.core.query_state import QueryStateMachine, QueryEvent
from flowmachine import __version__


if TYPE_CHECKING:
from .query import Query
from .connection import Connection
Expand Down Expand Up @@ -58,20 +59,7 @@ def get_obj_or_stub(connection: "Connection", query_id: str):
the dependencies it had if not.
"""
from flowmachine.core.query import Query

class QStub(Query):
def __init__(self, deps, qid):
self.deps = deps
self._md5 = qid
super().__init__()

def _make_query(self):
pass

@property
def column_names(self):
pass
from .query_stub import QStub

try:
return get_query_object_by_id(connection, query_id)
Expand All @@ -81,6 +69,7 @@ def column_names(self):
AttributeError,
pickle.UnpicklingError,
IndexError,
ValueError,
) as exc:
logger.debug("Can't unpickle, creating stub.", query_id=query_id, exception=exc)
qry = f"SELECT depends_on FROM cache.dependencies WHERE query_id='{query_id}'"
Expand Down Expand Up @@ -306,6 +295,8 @@ def write_cache_metadata(
)

if not in_cache:
print(f"Deps for {query.query_id} are {query.dependencies}.")
print(f"Stored is {query._get_stored_dependencies(exclude_self=True)}")
for dep in query._get_stored_dependencies(exclude_self=True):
connection.exec_driver_sql(
"INSERT INTO cache.dependencies values (%(query_id)s, %(dep_id)s) ON CONFLICT DO NOTHING",
Expand Down Expand Up @@ -473,9 +464,13 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
The original query object.
"""
qry = f"SELECT obj FROM cache.cached WHERE query_id='{query_id}'"
qry = f"SELECT obj, version FROM cache.cached WHERE query_id='{query_id}'"
try:
obj = connection.fetch(qry)[0][0]
obj, version = connection.fetch(qry)[0]
if version != __version__:
raise ValueError(
f"Query id '{query_id}' belongs to a different flowmachine version (query version {version}, our version {__version__})."
)
return pickle.loads(obj)
except IndexError:
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")
Expand Down
3 changes: 2 additions & 1 deletion flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def query_id(self):
pass

hashes.append(str(item))
hashes.append(self.__class__.__name__)
hashes.sort()
hashes.append(self.__class__.__name__)
hashes.append(flowmachine.__version__)
self._md5 = md5(str(hashes).encode()).hexdigest()
return self._md5

Expand Down
19 changes: 19 additions & 0 deletions flowmachine/flowmachine/core/query_stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from flowmachine.core.query import Query


class QStub(Query):
def __init__(self, deps, qid):
self.deps = deps
self._md5 = qid
super().__init__()

def _make_query(self):
pass

@property
def column_names(self):
pass
15 changes: 14 additions & 1 deletion flowmachine/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from flowmachine.core.errors.flowmachine_errors import QueryCancelledException
from flowmachine.core.query import Query
from flowmachine.core.query_state import QueryState
from flowmachine.core.query_stub import QStub
from flowmachine.features import daily_location, ModalLocation, Flows


Expand Down Expand Up @@ -308,7 +309,7 @@ def column_names(self):
return ["value"]

def _make_query(self):
return "select 1 as value"
return self.nested.get_query()

q = NestTestQuery()
q_id = q.query_id
Expand All @@ -333,6 +334,18 @@ def test_retrieve_novel_query_with_dependency(
assert not from_cache.is_stored


def test_retrieve_query_other_version(flowmachine_connect, monkeypatch):
"""
Test that we can get a query object which is a different version. Should give a QStub
"""
with monkeypatch.context() as ctxt:
ctxt.setattr("flowmachine.__version__", "OLD_VERSION")
ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION")
dl = daily_location("2016-01-01").store().result()
other_version = get_obj_or_stub(get_db(), dl.query_id)
assert isinstance(other_version, QStub)


def test_df_not_pickled():
"""
Test that a pickled query does not contain a dataframe.
Expand Down
20 changes: 20 additions & 0 deletions flowmachine/tests/test_cache_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
Tests for cache management utilities.
"""
import re

from cachey import Scorer
from unittest.mock import Mock, MagicMock

Expand Down Expand Up @@ -38,6 +40,7 @@
from flowmachine.core.context import get_db, get_redis, get_executor
from flowmachine.core.query_state import QueryState, QueryStateMachine
from flowmachine.features import daily_location
from flowmachine import __version__


class TestException(Exception):
Expand Down Expand Up @@ -386,6 +389,23 @@ def test_get_query_object_by_id(flowmachine_connect):
assert dl.get_query() == retrieved_query.get_query()


def test_get_query_object_by_id_other_version(flowmachine_connect, monkeypatch):
"""
Test that we raise a value error with differing version.
"""
with monkeypatch.context() as ctxt:
ctxt.setattr("flowmachine.__version__", "OLD_VERSION")
ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION")
dl = daily_location("2016-01-01").store().result()
with pytest.raises(
ValueError,
match=re.escape(
f"Query id '5525f9a94aeef6c5387287f2e1dfb2a7' belongs to a different flowmachine version (query version OLD_VERSION, our version {__version__})."
),
):
get_query_object_by_id(get_db(), dl.query_id)


def test_delete_query_by_id(flowmachine_connect):
"""
Test that we can remove a query from cache by the query's id
Expand Down
15 changes: 8 additions & 7 deletions flowmachine/tests/test_dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
)


def test_print_dependency_tree():
def test_print_dependency_tree(monkeypatch):
"""
Test that the expected dependency tree is printed for a daily location query (with an explicit subset).
"""
monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
subscriber_subsetter = make_subscriber_subsetter(
CustomQuery(
"SELECT duration, msisdn as subscriber FROM events.calls WHERE duration < 10",
Expand All @@ -50,20 +51,20 @@ def test_print_dependency_tree():
"""\
<Query of type: MostFrequentLocation, query_id: 'xxxxx'>
- <Query of type: SubscriberLocations, query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: JoinToLocation, query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: EventsTablesUnion, query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.sms', query_id: 'xxxxx'>
- <Table: 'events.sms', query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
"""
Expand Down
3 changes: 2 additions & 1 deletion flowmachine/tests/test_query_object_construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _example_coalesced_location_spec(preferred_dates, fallback_dates, aggregatio
)


def test_construct_query(diff_reporter):
def test_construct_query(diff_reporter, monkeypatch):
"""
Test that query objects constructed by construct_query_object() have the expected query ids.
"""
Expand Down Expand Up @@ -456,6 +456,7 @@ def test_construct_query(diff_reporter):
def get_query_id_for_query_spec(query_spec):
return FlowmachineQuerySchema().load(query_spec).query_id

monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
query_ids_and_specs_as_json_string = json.dumps(
{get_query_id_for_query_spec(spec): spec for spec in query_specs_to_test},
indent=2,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"261b85fc9c64253889174ea8151f1d69": {
"75a22e4ebd0bc8818e058c6cc712cdf3": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "daily_location",
Expand All @@ -20,7 +20,7 @@
}
}
},
"e142ffb3174add433422c2724a08c02b": {
"e395e2585d85a32f763b13a4772ee7f8": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "daily_location",
Expand All @@ -32,7 +32,7 @@
"sampling": null
}
},
"93580818b4cf2b30071c3c46c09e9de1": {
"e926609096b5df6284f922ad7acf1d8a": {
"query_kind": "location_event_counts",
"start_date": "2016-01-01",
"end_date": "2016-01-02",
Expand All @@ -42,7 +42,7 @@
"event_types": null,
"subscriber_subset": null
},
"b48d02838163766771eeed8cd8aafd98": {
"6dcc59b33263cb81eb5055f472da432f": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "modal_location",
Expand All @@ -64,11 +64,11 @@
]
}
},
"6521353e7563ed700dfd2cf90721934b": {
"71044ad1c2135c73e497c96aaf2fbcbb": {
"query_kind": "geography",
"aggregation_unit": "admin3"
},
"75b0532a747b473f69b45b78fdc29865": {
"02536d8c735132b83c2b1664eaa6a781": {
"query_kind": "meaningful_locations_aggregate",
"aggregation_unit": "admin1",
"start_date": "2016-01-01",
Expand Down Expand Up @@ -161,7 +161,7 @@
"tower_cluster_call_threshold": 0,
"subscriber_subset": null
},
"5df56eee4dc96ed961f8eb76583cc6de": {
"2f30ce9f395263f08ad3e718f2aca782": {
"query_kind": "meaningful_locations_between_label_od_matrix",
"aggregation_unit": "admin1",
"start_date": "2016-01-01",
Expand Down Expand Up @@ -256,7 +256,7 @@
"event_types": null,
"subscriber_subset": null
},
"00bee92b22b1ceb98244c5700a079656": {
"208871abc6d79eea5eb0110fdef2c0f8": {
"query_kind": "meaningful_locations_between_dates_od_matrix",
"aggregation_unit": "admin1",
"start_date_a": "2016-01-01",
Expand Down Expand Up @@ -355,7 +355,7 @@
],
"subscriber_subset": null
},
"c3dc95da89aeb0908b9398918dab6f26": {
"2fac0598bc3b873b66d5f53e3574f031": {
"query_kind": "flows",
"from_location": {
"query_kind": "daily_location",
Expand All @@ -371,7 +371,7 @@
},
"join_type": "left outer"
},
"95fbc18554e15733df47bfd5cbaa3f87": {
"02f6857b03d82eacbdfe14391b2e97b6": {
"query_kind": "flows",
"from_location": {
"query_kind": "majority_location",
Expand Down Expand Up @@ -418,7 +418,7 @@
}
}
},
"920dfdf5568d75921c4173da8bccc6ef": {
"7f97219a0468e969dd171f16f33a5f6c": {
"query_kind": "labelled_spatial_aggregate",
"locations": {
"query_kind": "coalesced_location",
Expand Down Expand Up @@ -640,7 +640,7 @@
"stay_length_threshold": 2
}
},
"c915d87b66df83904634990e2f78da9e": {
"475e01b47e232e52f34c18e0ef868a6c": {
"query_kind": "labelled_flows",
"from_location": {
"query_kind": "coalesced_location",
Expand Down
5 changes: 3 additions & 2 deletions flowmachine/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ def test_get_table_is_self():
assert t.get_table() is t


def test_dependencies():
def test_dependencies(monkeypatch):
"""
Check that a table without explicit columns has no other queries as a dependency,
and a table with explicit columns has its parent table as a dependency.
"""
monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
t1 = Table("events.calls")
assert t1.dependencies == set()

t2 = Table("events.calls", columns=["id"])
assert len(t2.dependencies) == 1
t2_parent = t2.dependencies.pop()
assert "057addedac04dbeb1dcbbb6b524b43f0" == t2_parent.query_id
assert t2_parent.query_id == "397c8cee22d63ac718ccce8c20a2eae9"


def test_subset():
Expand Down

0 comments on commit 395e3ac

Please sign in to comment.