Skip to content

Commit

Permalink
stats: Ensure that events are aggregated only once
Browse files Browse the repository at this point in the history
  • Loading branch information
psaiz committed Nov 8, 2023
1 parent d569f38 commit dcc08aa
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 16 deletions.
11 changes: 3 additions & 8 deletions invenio_stats/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,6 @@ def agg_iter(self, dt, previous_bookmark):
num_partitions = max(
int(math.ceil(float(total_buckets) / self.max_bucket_size)), 1
)
if previous_bookmark:
# Let's make sure that both objects are of the same type. This is important for the test
previous = float(previous_bookmark.strftime("%s"))
else:
previous = 0

for p in range(num_partitions):
terms = agg_query.aggs.bucket(
"terms",
Expand All @@ -255,8 +249,9 @@ def agg_iter(self, dt, previous_bookmark):
interval_date = datetime.strptime(
doc["timestamp"], "%Y-%m-%dT%H:%M:%S"
).replace(**dict.fromkeys(INTERVAL_ROUNDING[self.interval], 0))
if aggregation["last_update"]["value"] and previous_bookmark:
if aggregation["last_update"]["value"] < previous:
if aggregation["last_update"]["value_as_string"] and previous_bookmark:
last_date = datetime.fromisoformat(aggregation["last_update"]["value_as_string"].rstrip("Z"))
if last_date < previous_bookmark:
continue
aggregation_data = {}
aggregation_data["timestamp"] = interval_date.isoformat()
Expand Down
58 changes: 50 additions & 8 deletions tests/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,17 @@ def test_overwriting_aggregations(app, search_clear, mock_event_queue):
results within the interval of the previous events
overwrite the aggregation,
by checking that the document version has increased.
4. Run one more time, without any new events, and ensure that the
aggregations have not been overwritten
"""
# Send some events
mock_event_queue.consume.return_value = [
_create_file_download_event(date) for date in [(2017, 6, 1), (2017, 6, 2, 10)]
]

indexer = EventsIndexer(mock_event_queue)
indexer.run()
# Note that the events use the current time. Let's mock that as well
with patch("invenio_stats.processors.datetime", mock_date(2017, 6, 2, 11)):
indexer = EventsIndexer(mock_event_queue)
indexer.run()
current_search.flush_and_refresh(index="*")

# Aggregate events
Expand All @@ -98,13 +101,47 @@ def test_overwriting_aggregations(app, search_clear, mock_event_queue):
_create_file_download_event(date)
for date in [(2017, 6, 2, 15), (2017, 7, 1)] # second event on the same date
]
indexer = EventsIndexer(mock_event_queue)
indexer.run()
with patch("invenio_stats.processors.datetime", mock_date(2017, 7, 1, 5)):
indexer = EventsIndexer(mock_event_queue)
indexer.run()
current_search.flush_and_refresh(index="*")

# Aggregate again. The aggregation should start from the last bookmark.
with patch("invenio_stats.aggregations.datetime", mock_date(2017, 7, 2)):
aggregate_events(["file-download-agg"])
with patch("invenio_stats.aggregations.datetime", mock_date(2017, 7, 1, 6)):
d = aggregate_events(["file-download-agg"])
assert d == [
[
(1, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(0, 0),
(1, 0),
]
]
current_search.flush_and_refresh(index="*")
res = search_clear.search(index="stats-file-download", version=True)
for hit in res["hits"]["hits"]:
Expand All @@ -114,6 +151,11 @@ def test_overwriting_aggregations(app, search_clear, mock_event_queue):
else:
assert hit["_version"] == 1

# Run one more time, one hour later, and ensure that the aggregation does not modify anything
with patch("invenio_stats.aggregations.datetime", mock_date(2017, 7, 1, 7)):
d = aggregate_events(["file-download-agg"])
assert d == [[(0, 0)]]


def test_aggregation_without_events(app, search_clear):
"""Check that the aggregation doesn't crash if there are no events.
Expand Down

0 comments on commit dcc08aa

Please sign in to comment.