Skip to content

Commit

Permalink
DATASHADES-321 / caching system
Browse files Browse the repository at this point in the history
  • Loading branch information
mutantsan committed May 2, 2024
1 parent 7bf3c59 commit 299df57
Show file tree
Hide file tree
Showing 18 changed files with 587 additions and 165 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ With ckanext-charts, users can easily generate interactive and visually appealin

## Requirements

Requires Redis 7+

Compatibility with core CKAN versions:

| CKAN version | Compatible? |
Expand Down
134 changes: 103 additions & 31 deletions ckanext/charts/cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import time
import hashlib
import logging
import os
Expand All @@ -14,8 +15,8 @@
from ckan.lib.redis import connect_to_redis

import ckanext.charts.exception as exception
from ckanext.charts.config import get_cache_strategy
import ckanext.charts.utils as utils
import ckanext.charts.config as config
import ckanext.charts.const as const


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self):
self.client = connect_to_redis()

def get_data(self, key: str) -> pd.DataFrame | None:
"""Return data from cache if exists"""
raw_data = self.client.get(key)

if not raw_data:
Expand All @@ -52,46 +54,49 @@ def get_data(self, key: str) -> pd.DataFrame | None:
return pa.deserialize_pandas(raw_data)

def set_data(self, key: str, data: pd.DataFrame):
self.client.set(key, pa.serialize_pandas(data).to_pybytes())
"""Serialize data and save to redis"""
cache_ttl = config.get_redis_cache_ttl()

if cache_ttl:
self.client.setex(key, cache_ttl, pa.serialize_pandas(data).to_pybytes())
else:
self.client.set(key, value=pa.serialize_pandas(data).to_pybytes())

def invalidate(self, key: str):
self.client.delete(key)


class DiskCache(CacheStrategy):
"""Cache data to disk"""
class FileCache(CacheStrategy):
"""Cache data as file"""

def __init__(self):
self.directory = get_disk_cache_path()
self.directory = get_file_cache_path()

def get_data(self, key: str) -> pd.DataFrame | None:
"""Return data from cache if exists"""
file_path = os.path.join(
self.directory, f"{self.generate_unique_consistent_filename(key)}.orc"
)

if os.path.exists(file_path):
if self.is_file_cache_expired(file_path):
return None

with open(file_path, "rb") as f:
return orc.ORCFile(f).read().to_pandas()

return None

def path_to_key(self, path: str) -> str:
"""Convert a path to unique key"""
return path.replace("/", "_")

def set_data(self, key: str, data: pd.DataFrame):
def set_data(self, key: str, data: pd.DataFrame) -> None:
"""Save data to cache. The data will be stored as an ORC file."""
file_path = os.path.join(
self.directory, f"{self.generate_unique_consistent_filename(key)}.orc"
)

data.to_orc(file_path)

# table = pa.Table.from_pandas(data)

# with open(file_path, "wb") as f:
# orc.write_table(table, f)

def invalidate(self, key: str):
def invalidate(self, key: str) -> None:
"""Remove data from cache"""
file_path = os.path.join(
self.directory, f"{self.generate_unique_consistent_filename(key)}.orc"
)
Expand All @@ -100,45 +105,65 @@ def invalidate(self, key: str):
os.remove(file_path)

def generate_unique_consistent_filename(self, key: str) -> str:
"""Generate unique and consistent filename based on the key"""
hash_object = hashlib.sha256()
hash_object.update(key.encode("utf-8"))
return hash_object.hexdigest()

@staticmethod
def is_file_cache_expired(file_path: str) -> bool:
"""Check if file cache is expired"""
return os.path.getmtime(file_path) + config.get_file_cache_ttl() < time.time()


def get_cache_manager(cache_stragegy: str | None) -> CacheStrategy:
def get_cache_manager(cache_strategy: str | None) -> CacheStrategy:
"""Return cache manager based on the strategy"""
active_cache = cache_stragegy or get_cache_strategy()
active_cache = cache_strategy or config.get_cache_strategy()

if active_cache == "redis":
if active_cache == const.CACHE_REDIS:
return RedisCache()
elif active_cache == "disk":
return DiskCache()
elif active_cache == const.CACHE_FILE:
return FileCache()

raise exception.CacheStrategyNotImplemented(
f"Cache strategy {active_cache} is not implemented"
)


def invalidate_cache() -> None:
def invalidate_all_cache() -> None:
"""Invalidate all caches"""
drop_disk_cache()
drop_file_cache()
drop_redis_cache()

log.info("All chart caches have been invalidated")

def drop_redis_cache():

def invalidate_by_key(key: str) -> None:
"""Invalidate cache by key"""
RedisCache().invalidate(key)
FileCache().invalidate(key)

log.info(f"Chart cache for key {key} has been invalidated")


def drop_redis_cache() -> None:
"""Drop all ckanext-charts keys from Redis cache"""
conn = connect_to_redis()

for key in conn.scan_iter("ckanext-charts:"):
log.info("Dropping all ckanext-charts keys from Redis cache")

for key in conn.scan_iter(const.REDIS_PREFIX):
conn.delete(key)


def drop_disk_cache():
def drop_file_cache() -> None:
"""Drop all cached files from storage"""

folder_path = get_disk_cache_path()
log.info("Dropping all charts cached files")

folder_path = get_file_cache_path()

for filename in os.listdir(get_disk_cache_path()):
for filename in os.listdir(get_file_cache_path()):
file_path = os.path.join(folder_path, filename)

try:
Expand All @@ -147,12 +172,59 @@ def drop_disk_cache():
log.error("Failed to delete %s. Reason: %s" % (file_path, e))


def get_disk_cache_path() -> str:
"""Return path to the disk cache folder"""
def get_file_cache_path() -> str:
"""Return path to the file cache folder"""
storage_path: str = tk.config["ckan.storage_path"] or tempfile.gettempdir()

cache_path = os.path.join(storage_path, "charts_cache")

os.makedirs(cache_path, exist_ok=True)

return cache_path


def update_redis_expiration(time: int) -> None:
"""Update TTL for existing Redis keys"""
if not time:
return

redis_conn = RedisCache().client

for key in redis_conn.scan_iter(const.REDIS_PREFIX):
redis_conn.expire(name=key, time=time, lt=True)


def count_redis_cache_size() -> int:
"""Return the size of the Redis cache"""
redis_conn = RedisCache().client

total_size = 0

for key in redis_conn.scan_iter(const.REDIS_PREFIX):
size = redis_conn.memory_usage(key)

if not size or not isinstance(size, int):
continue

total_size += size

return total_size


def count_file_cache_size() -> int:
"""Return the size of the file cache"""
return sum(
os.path.getsize(os.path.join(get_file_cache_path(), f))
for f in os.listdir(get_file_cache_path())
)


def remove_expired_file_cache() -> None:
"""Remove expired files from the file cache"""
for filename in os.listdir(get_file_cache_path()):
file_path = os.path.join(get_file_cache_path(), filename)

if FileCache.is_file_cache_expired(file_path):
os.unlink(file_path)

log.info("Expired files have been removed from the file cache")
20 changes: 20 additions & 0 deletions ckanext/charts/chart_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

from typing import Any
from abc import ABC, abstractmethod

import pandas as pd


class BaseChartBuilder(ABC):
def __init__(self, dataframe: pd.DataFrame, user_settings: dict[str, Any]) -> None:
self.df = dataframe
self.user_settings = user_settings

@abstractmethod
def get_settings(self) -> dict[str, Any]:
pass


class PlotlyBuilder(BaseChartBuilder):
pass
12 changes: 12 additions & 0 deletions ckanext/charts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,20 @@


CONF_CACHE_STRATEGY = "ckanext.charts.cache_strategy"
CONF_REDIS_CACHE_TTL = "ckanext.charts.redis_cache_ttl"
CONF_FILE_CACHE_TTL = "ckanext.charts.file_cache_ttl"


def get_cache_strategy():
"""Get an active cache strategy from the configuration."""
return tk.config[CONF_CACHE_STRATEGY]


def get_redis_cache_ttl():
"""Get the redis cache time-to-live from the configuration."""
return tk.asint(tk.config[CONF_REDIS_CACHE_TTL])


def get_file_cache_ttl():
"""Get the file cache time-to-live from the configuration."""
return tk.asint(tk.config[CONF_FILE_CACHE_TTL])
16 changes: 16 additions & 0 deletions ckanext/charts/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,19 @@ groups:
- key: ckanext.charts.cache_strategy
description: Charts cache strategy
default: redis
editable: true
validators: OneOf(["file","redis"])

- key: ckanext.charts.redis_cache_ttl
description: Redis cache TTL
default: 3600
editable: true
type: int
validators: ignore_empty int_validator

- key: ckanext.charts.file_cache_ttl
description: File cache TTL
default: 3600
editable: true
type: int
validators: ignore_empty int_validator
16 changes: 13 additions & 3 deletions ckanext/charts/config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@ about: An extension config form schema
fields:
- field_name: ckanext.charts.cache_strategy
label: Cache Strategy
validators: default(disk), one_of([disk, redis])
help_text: Choose the cache strategy for the charts extension.
preset: select
required: true
choices:
- value: disk
label: Disk
- value: file
label: File
- value: redis
label: Redis

- field_name: ckanext.charts.redis_cache_ttl
label: Redis Cache TTL
help_text: Time to live for the cache in seconds. Set 0 to disable cache.
input_type: number

- field_name: ckanext.charts.file_cache_ttl
label: File Cache TTL
help_text: Time to live for the cache in seconds. Set 0 to disable cache.
input_type: number
4 changes: 4 additions & 0 deletions ckanext/charts/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CACHE_FILE = "file"
CACHE_REDIS = "redis"

REDIS_PREFIX = "ckanext-charts:*"
Loading

0 comments on commit 299df57

Please sign in to comment.