import redis
import falcon
import redis
from falcon_cache.middleware import CacheMiddleware
from falcon_cache.cache import APICache
redis_conn = redis.Redis(...)
api_cache = APICache(redis=redis_conn)
app = falcon.API(middleware=[
CacheMiddleware(cache=api_cache),
])
class WebResourceController:
@classmethod
@api_cache.cached(timeout=300)
def on_get(cls, req, resp):
pass
class TaskWebController:
@classmethod
@api_cache.cached(timeout=300, tags_templates=["task_id:{task_id}"]) # task_id must be in req.params
def on_get(cls, req, resp):
task_id = req.params.get("task_id")
Now you can invalidate cache by tag
api_cache.invalidate_by_tag(tag="task_id:1")
def make_cache_key(req: falcon.Request) -> str:
user_id = req.context.get("current_user_id")
return (f"API_CACHE:{req.forwarded_host}:ORIGIN:{req.get_header('origin')}:ACCEPT:{req.get_header('accept')}"
f":USER:{user_id}:{req.relative_uri}:{req.params}")
app_globals.api_cache = APICache(redis=redis_conn)
app_globals.api_cache.set_cache_key_maker(make_cache_key)
api_cache = APICache(redis=redis_conn)
api_cache.enabled = False