Skip to content

Commit

Permalink
chore: improve package popularity
Browse files Browse the repository at this point in the history
  • Loading branch information
smotornyuk committed Nov 4, 2023
1 parent 31d1840 commit 7a0a225
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 34 deletions.
8 changes: 8 additions & 0 deletions ckanext/search_tweaks/query_popularity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ def throttle() -> int:

def max_age() -> int:
return tk.config["ckanext.search_tweaks.query_popularity.max_age"]


def obsoletion_period() -> int:
return tk.config["ckanext.search_tweaks.query_popularity.obsoletion_period"]


def tracked_endpoints() -> list[str]:
return tk.config["ckanext.search_tweaks.query_popularity.tracked_endpoints"]
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ groups:
- key: ckanext.search_tweaks.query_popularity.ignored_terms
type: list

- key: ckanext.search_tweaks.query_popularity.tracked_endpoints
type: list
default: dataset.search

- key: ckanext.search_tweaks.query_popularity.query_throttle
type: int
default: 86_400 # 24 hours

- key: ckanext.search_tweaks.query_popularity.max_age
type: int
default: 2_592_000 # 30 days

- key: ckanext.search_tweaks.query_popularity.obsoletion_period
type: int
default: 86_400 # 24 hours
24 changes: 23 additions & 1 deletion ckanext/search_tweaks/query_popularity/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,29 @@ def search_tweaks_query_popularity_list(
if tk.asbool(data_dict.get("refresh")):
score.refresh()


limit = tk.asint(data_dict.get("limit", 10))

return list(score.stats(limit))


@tk.side_effect_free
def search_tweaks_query_popularity_export(
context: types.Context, data_dict: dict[str, Any]
) -> dict[str, Any]:
score = Score()

results = score.export()
return {"results": results, "count": len(results)}


@tk.side_effect_free
def search_tweaks_query_popularity_ignore(
context: types.Context, data_dict: dict[str, Any]
):
q = tk.get_or_bust(data_dict, "q")
score = Score()
result = score.ignore(q)
if tk.asbool(data_dict.get("remove")):
score.drop(q)

return result
23 changes: 23 additions & 0 deletions ckanext/search_tweaks/query_popularity/logic/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations
from typing import Any
from ckan import types

from ckan.authz import is_authorized


def search_tweaks_query_popularity_list(
context: types.Context, data_dict: dict[str, Any]
) -> types.AuthResult:
return is_authorized("sysadmin", context, data_dict)


def search_tweaks_query_popularity_export(
context: types.Context, data_dict: dict[str, Any]
) -> types.AuthResult:
return is_authorized("sysadmin", context, data_dict)


def search_tweaks_query_popularity_ignore(
context: types.Context, data_dict: dict[str, Any]
) -> types.AuthResult:
return is_authorized("sysadmin", context, data_dict)
14 changes: 9 additions & 5 deletions ckanext/search_tweaks/query_popularity/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
from ckanext.search_tweaks.interfaces import IQueryPopularity
from . import config, score


@tk.blanket.actions
@tk.blanket.auth_functions
@tk.blanket.config_declarations
class QueryPopularityPlugin(p.SingletonPlugin):
p.implements(p.IConfigurable)
p.implements(p.IPackageController, inherit=True)
p.implements(IQueryPopularity, inherit=True)

def after_dataset_search(self, results: dict[str, Any], params: dict[str, Any]):
if not any(
plugin.skip_query_popularity(params)
for plugin in p.PluginImplementations(IQueryPopularity)
):
self.score.save(params["q"])
bp, view = tk.get_endpoint()
if bp and view and f"{bp}.{view}" in config.tracked_endpoints():
if not any(
plugin.skip_query_popularity(params)
for plugin in p.PluginImplementations(IQueryPopularity)
):
self.score.save(params["q"])

return results

Expand Down
113 changes: 85 additions & 28 deletions ckanext/search_tweaks/query_popularity/score.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from __future__ import annotations
from collections import defaultdict
from datetime import datetime, timedelta

import logging
from hashlib import md5
from typing import Any, Iterable, cast

from operator import itemgetter
from ckan.lib.redis import connect_to_redis
import ckan.plugins.toolkit as tk
from redis import Redis
from . import config

log = logging.getLogger(__name__)
connect_to_redis: Any


Expand All @@ -23,9 +24,28 @@ def __init__(self):
site = tk.config["ckan.site_id"]
self.prefix = f"{site}:search_tweaks:qp"

def export(self):
data: dict[bytes, dict[str, Any]] = {
hash: {"query": query, "records": []}
for hash, query in self.redis.hgetall(self.trans_key()).items()
}
for k, v in self.redis.hscan_iter(self.distribution_key()):
date_str, q_hash = k.split(b"/", 1)
try:
date = datetime.strptime(date_str.decode(), self.date_format)
except ValueError:
continue

data[q_hash]["records"].append({"date": date, "count": int(v)})

return list(data.values())

def save(self, q: str):
q = q.strip()
q_hash = md5(q.encode()).hexdigest()
q_hash = self.hash(q)

if self.is_ignored(q_hash):
return

if self.is_throttling(q_hash):
return
Expand All @@ -36,6 +56,18 @@ def save(self, q: str):

self.redis.hincrby(self.distribution_key(), f"{date_stem}/{q_hash}", 1)

def drop(self, q: str):
q_hash = self.hash(q)
dk = self.distribution_key()

series = self.redis.hscan_iter(dk, f"*/{q_hash}")
keys = list(map(itemgetter(0), series))
if keys:
self.redis.hdel(dk, *keys)

self.redis.hdel(self.trans_key(), q_hash)
self.redis.zrem(self.score_key(), q_hash)

def is_throttling(self, q_hash: str):
user = tk.current_user.name

Expand All @@ -46,27 +78,17 @@ def is_throttling(self, q_hash: str):
self.redis.set(throttle_key, 1, ex=config.throttle())
return False

def now(self):
return datetime.utcnow()

def score_key(self):
return f"{self.prefix}:score"

def trans_key(self):
return f"{self.prefix}:trans"

def distribution_key(self):
return f"{self.prefix}:distribution"

def format_date_stem(self, date: datetime):
return date.strftime(self.date_format)
def reset(self):
keys = self.redis.keys(f"{self.prefix}:*")
if keys:
self.redis.delete(*keys)

def refresh(self):
max_age = timedelta(seconds=config.max_age())
dk = self.distribution_key()
sk = self.score_key()

expired_keys: set[bytes] = set()
expired_dist: set[bytes] = set()
distribution = cast(
"Iterable[tuple[bytes, bytes]]",
self.redis.hscan_iter(dk),
Expand All @@ -76,27 +98,62 @@ def refresh(self):

for k, v in distribution:
date_str, q_hash = k.split(b"/", 1)
date = datetime.strptime(date_str.decode(), self.date_format)
try:
date = datetime.strptime(date_str.decode(), self.date_format)
except ValueError:
log.error("Remove invalid key %s", k)
expired_dist.add(k)
continue

age = self.now() - date

if age > max_age:
expired_keys.add(k)
expired_dist.add(k)
continue

scores[q_hash] += int(v) / (age.days + 1)
scores[q_hash] += int(v) / (age.seconds // config.obsoletion_period() + 1)

if expired_keys:
self.redis.hdel(dk, *expired_keys)
if expired_dist:
self.redis.hdel(dk, *expired_dist)

expired_keys: set[bytes] = set()
expired_scores: set[bytes] = set()
for k, v in self.redis.zscan_iter(sk):
if k not in scores:
expired_keys.add(k)
expired_scores.add(k)
continue
self.redis.zadd(sk, cast(Any, scores))
if scores:
self.redis.zadd(sk, cast(Any, scores))

if expired_scores:
self.redis.zrem(sk, *expired_scores)
self.redis.hdel(self.trans_key(), *expired_scores)

def hash(self, q: str):
return md5(q.encode()).hexdigest()

def is_ignored(self, q_hash: str):
return self.redis.sismember(self.ignore_key(), q_hash)

def ignore(self, q: str):
return self.redis.sadd(self.ignore_key(), self.hash(q))

def now(self):
return datetime.utcnow()

if expired_keys:
self.redis.zrem(sk, *expired_keys)
def score_key(self):
return f"{self.prefix}:score"

def trans_key(self):
return f"{self.prefix}:trans"

def ignore_key(self):
return f"{self.prefix}:ignore"

def distribution_key(self):
return f"{self.prefix}:distribution"

def format_date_stem(self, date: datetime):
return date.strftime(self.date_format)

def stats(self, num: int) -> Iterable[dict[str, Any]]:
scores: list[tuple[bytes, float]] = self.redis.zrange(
Expand Down

0 comments on commit 7a0a225

Please sign in to comment.