Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include version in query hash #6841

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Changed
- Mode is now available for use with categorical metrics when running joined spatial aggregates via api. [#2021](https://github.com/Flowminder/FlowKit/issues/2021)
- Flowmachine now includes the version number in query ids which means cache entries are per-version. [#4489](https://github.com/Flowminder/FlowKit/issues/4489)

### Fixed
- Fixed dangling async tasks not being properly cancelled during server shutdown [#6833](https://github.com/Flowminder/FlowKit/issues/6833)
Expand Down
27 changes: 11 additions & 16 deletions flowmachine/flowmachine/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from flowmachine.core.query_state import QueryStateMachine, QueryEvent
from flowmachine import __version__


if TYPE_CHECKING:
from .query import Query
from .connection import Connection
Expand Down Expand Up @@ -58,20 +59,7 @@ def get_obj_or_stub(connection: "Connection", query_id: str):
the dependencies it had if not.
"""
from flowmachine.core.query import Query

class QStub(Query):
def __init__(self, deps, qid):
self.deps = deps
self._md5 = qid
super().__init__()

def _make_query(self):
pass

@property
def column_names(self):
pass
from .query_stub import QStub

try:
return get_query_object_by_id(connection, query_id)
Expand All @@ -81,6 +69,7 @@ def column_names(self):
AttributeError,
pickle.UnpicklingError,
IndexError,
ValueError,
) as exc:
logger.debug("Can't unpickle, creating stub.", query_id=query_id, exception=exc)
qry = f"SELECT depends_on FROM cache.dependencies WHERE query_id='{query_id}'"
Expand Down Expand Up @@ -306,6 +295,8 @@ def write_cache_metadata(
)

if not in_cache:
print(f"Deps for {query.query_id} are {query.dependencies}.")
print(f"Stored is {query._get_stored_dependencies(exclude_self=True)}")
greenape marked this conversation as resolved.
Show resolved Hide resolved
for dep in query._get_stored_dependencies(exclude_self=True):
connection.exec_driver_sql(
"INSERT INTO cache.dependencies values (%(query_id)s, %(dep_id)s) ON CONFLICT DO NOTHING",
Expand Down Expand Up @@ -473,9 +464,13 @@ def get_query_object_by_id(connection: "Connection", query_id: str) -> "Query":
The original query object.
"""
qry = f"SELECT obj FROM cache.cached WHERE query_id='{query_id}'"
qry = f"SELECT obj, version FROM cache.cached WHERE query_id='{query_id}'"
try:
obj = connection.fetch(qry)[0][0]
obj, version = connection.fetch(qry)[0]
if version != __version__:
raise ValueError(
f"Query id '{query_id}' belongs to a different flowmachine version (query version {version}, our version {__version__})."
)
return pickle.loads(obj)
except IndexError:
raise ValueError(f"Query id '{query_id}' is not in cache on this connection.")
Expand Down
3 changes: 2 additions & 1 deletion flowmachine/flowmachine/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ def query_id(self):
pass

hashes.append(str(item))
hashes.append(self.__class__.__name__)
hashes.sort()
hashes.append(self.__class__.__name__)
hashes.append(flowmachine.__version__)
self._md5 = md5(str(hashes).encode()).hexdigest()
return self._md5

Expand Down
19 changes: 19 additions & 0 deletions flowmachine/flowmachine/core/query_stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from flowmachine.core.query import Query


class QStub(Query):
def __init__(self, deps, qid):
self.deps = deps
self._md5 = qid
super().__init__()

greenape marked this conversation as resolved.
Show resolved Hide resolved
def _make_query(self):
pass

Check warning on line 15 in flowmachine/flowmachine/core/query_stub.py

View check run for this annotation

Codecov / codecov/patch

flowmachine/flowmachine/core/query_stub.py#L15

Added line #L15 was not covered by tests

@property
def column_names(self):
pass

Check warning on line 19 in flowmachine/flowmachine/core/query_stub.py

View check run for this annotation

Codecov / codecov/patch

flowmachine/flowmachine/core/query_stub.py#L19

Added line #L19 was not covered by tests
greenape marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 14 additions & 1 deletion flowmachine/tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from flowmachine.core.errors.flowmachine_errors import QueryCancelledException
from flowmachine.core.query import Query
from flowmachine.core.query_state import QueryState
from flowmachine.core.query_stub import QStub
from flowmachine.features import daily_location, ModalLocation, Flows


Expand Down Expand Up @@ -308,7 +309,7 @@ def column_names(self):
return ["value"]

def _make_query(self):
return "select 1 as value"
return self.nested.get_query()

q = NestTestQuery()
q_id = q.query_id
Expand All @@ -333,6 +334,18 @@ def test_retrieve_novel_query_with_dependency(
assert not from_cache.is_stored


def test_retrieve_query_other_version(flowmachine_connect, monkeypatch):
"""
Test that we can get a query object which is a different version. Should give a QStub
"""
with monkeypatch.context() as ctxt:
ctxt.setattr("flowmachine.__version__", "OLD_VERSION")
ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION")
dl = daily_location("2016-01-01").store().result()
other_version = get_obj_or_stub(get_db(), dl.query_id)
assert isinstance(other_version, QStub)


def test_df_not_pickled():
"""
Test that a pickled query does not contain a dataframe.
Expand Down
20 changes: 20 additions & 0 deletions flowmachine/tests/test_cache_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
Tests for cache management utilities.
"""
import re

from cachey import Scorer
from unittest.mock import Mock, MagicMock

Expand Down Expand Up @@ -38,6 +40,7 @@
from flowmachine.core.context import get_db, get_redis, get_executor
from flowmachine.core.query_state import QueryState, QueryStateMachine
from flowmachine.features import daily_location
from flowmachine import __version__


class TestException(Exception):
Expand Down Expand Up @@ -386,6 +389,23 @@ def test_get_query_object_by_id(flowmachine_connect):
assert dl.get_query() == retrieved_query.get_query()


def test_get_query_object_by_id_other_version(flowmachine_connect, monkeypatch):
"""
Test that we raise a value error with differing version.
"""
with monkeypatch.context() as ctxt:
ctxt.setattr("flowmachine.__version__", "OLD_VERSION")
ctxt.setattr("flowmachine.core.cache.__version__", "OLD_VERSION")
dl = daily_location("2016-01-01").store().result()
with pytest.raises(
ValueError,
match=re.escape(
f"Query id '5525f9a94aeef6c5387287f2e1dfb2a7' belongs to a different flowmachine version (query version OLD_VERSION, our version {__version__})."
),
):
get_query_object_by_id(get_db(), dl.query_id)


def test_delete_query_by_id(flowmachine_connect):
"""
Test that we can remove a query from cache by the query's id
Expand Down
15 changes: 8 additions & 7 deletions flowmachine/tests/test_dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
)


def test_print_dependency_tree():
def test_print_dependency_tree(monkeypatch):
"""
Test that the expected dependency tree is printed for a daily location query (with an explicit subset).
"""
monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
subscriber_subsetter = make_subscriber_subsetter(
CustomQuery(
"SELECT duration, msisdn as subscriber FROM events.calls WHERE duration < 10",
Expand All @@ -50,20 +51,20 @@ def test_print_dependency_tree():
"""\
<Query of type: MostFrequentLocation, query_id: 'xxxxx'>
- <Query of type: SubscriberLocations, query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: JoinToLocation, query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: EventsTablesUnion, query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.sms', query_id: 'xxxxx'>
- <Table: 'events.sms', query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
- <Query of type: EventTableSubset, query_id: 'xxxxx'>
- <Query of type: CustomQuery, query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Table: 'events.calls', query_id: 'xxxxx'>
- <Query of type: AdminSpatialUnit, query_id: 'xxxxx'>
- <Table: 'geography.admin3', query_id: 'xxxxx'>
"""
Expand Down
3 changes: 2 additions & 1 deletion flowmachine/tests/test_query_object_construction.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _example_coalesced_location_spec(preferred_dates, fallback_dates, aggregatio
)


def test_construct_query(diff_reporter):
def test_construct_query(diff_reporter, monkeypatch):
"""
Test that query objects constructed by construct_query_object() have the expected query ids.
"""
Expand Down Expand Up @@ -456,6 +456,7 @@ def test_construct_query(diff_reporter):
def get_query_id_for_query_spec(query_spec):
return FlowmachineQuerySchema().load(query_spec).query_id

monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
query_ids_and_specs_as_json_string = json.dumps(
{get_query_id_for_query_spec(spec): spec for spec in query_specs_to_test},
indent=2,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"261b85fc9c64253889174ea8151f1d69": {
"75a22e4ebd0bc8818e058c6cc712cdf3": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "daily_location",
Expand All @@ -20,7 +20,7 @@
}
}
},
"e142ffb3174add433422c2724a08c02b": {
"e395e2585d85a32f763b13a4772ee7f8": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "daily_location",
Expand All @@ -32,7 +32,7 @@
"sampling": null
}
},
"93580818b4cf2b30071c3c46c09e9de1": {
"e926609096b5df6284f922ad7acf1d8a": {
"query_kind": "location_event_counts",
"start_date": "2016-01-01",
"end_date": "2016-01-02",
Expand All @@ -42,7 +42,7 @@
"event_types": null,
"subscriber_subset": null
},
"b48d02838163766771eeed8cd8aafd98": {
"6dcc59b33263cb81eb5055f472da432f": {
"query_kind": "spatial_aggregate",
"locations": {
"query_kind": "modal_location",
Expand All @@ -64,11 +64,11 @@
]
}
},
"6521353e7563ed700dfd2cf90721934b": {
"71044ad1c2135c73e497c96aaf2fbcbb": {
"query_kind": "geography",
"aggregation_unit": "admin3"
},
"75b0532a747b473f69b45b78fdc29865": {
"02536d8c735132b83c2b1664eaa6a781": {
"query_kind": "meaningful_locations_aggregate",
"aggregation_unit": "admin1",
"start_date": "2016-01-01",
Expand Down Expand Up @@ -161,7 +161,7 @@
"tower_cluster_call_threshold": 0,
"subscriber_subset": null
},
"5df56eee4dc96ed961f8eb76583cc6de": {
"2f30ce9f395263f08ad3e718f2aca782": {
"query_kind": "meaningful_locations_between_label_od_matrix",
"aggregation_unit": "admin1",
"start_date": "2016-01-01",
Expand Down Expand Up @@ -256,7 +256,7 @@
"event_types": null,
"subscriber_subset": null
},
"00bee92b22b1ceb98244c5700a079656": {
"208871abc6d79eea5eb0110fdef2c0f8": {
"query_kind": "meaningful_locations_between_dates_od_matrix",
"aggregation_unit": "admin1",
"start_date_a": "2016-01-01",
Expand Down Expand Up @@ -355,7 +355,7 @@
],
"subscriber_subset": null
},
"c3dc95da89aeb0908b9398918dab6f26": {
"2fac0598bc3b873b66d5f53e3574f031": {
"query_kind": "flows",
"from_location": {
"query_kind": "daily_location",
Expand All @@ -371,7 +371,7 @@
},
"join_type": "left outer"
},
"95fbc18554e15733df47bfd5cbaa3f87": {
"02f6857b03d82eacbdfe14391b2e97b6": {
"query_kind": "flows",
"from_location": {
"query_kind": "majority_location",
Expand Down Expand Up @@ -418,7 +418,7 @@
}
}
},
"920dfdf5568d75921c4173da8bccc6ef": {
"7f97219a0468e969dd171f16f33a5f6c": {
"query_kind": "labelled_spatial_aggregate",
"locations": {
"query_kind": "coalesced_location",
Expand Down Expand Up @@ -640,7 +640,7 @@
"stay_length_threshold": 2
}
},
"c915d87b66df83904634990e2f78da9e": {
"475e01b47e232e52f34c18e0ef868a6c": {
"query_kind": "labelled_flows",
"from_location": {
"query_kind": "coalesced_location",
Expand Down
5 changes: 3 additions & 2 deletions flowmachine/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,19 @@ def test_get_table_is_self():
assert t.get_table() is t


def test_dependencies():
def test_dependencies(monkeypatch):
"""
Check that a table without explicit columns has no other queries as a dependency,
and a table with explicit columns has its parent table as a dependency.
"""
monkeypatch.setattr("flowmachine.__version__", "TEST_VERSION")
t1 = Table("events.calls")
assert t1.dependencies == set()

t2 = Table("events.calls", columns=["id"])
assert len(t2.dependencies) == 1
t2_parent = t2.dependencies.pop()
assert "057addedac04dbeb1dcbbb6b524b43f0" == t2_parent.query_id
assert t2_parent.query_id == "397c8cee22d63ac718ccce8c20a2eae9"


def test_subset():
Expand Down