Skip to content

Commit

Permalink
Merge branch 'master' into conn-with-update
Browse files Browse the repository at this point in the history
  • Loading branch information
thearifismail authored Jun 10, 2024
2 parents bffb81c + e6e35a6 commit 5e83f61
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 16 deletions.
27 changes: 26 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 45 additions & 8 deletions api/cache.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,61 @@
from flask import Flask
from flask_caching import Cache
from redis import Redis

from app.logging import get_logger

CACHE = Cache(config={"CACHE_TYPE": None})
CACHE_CONFIG = {"CACHE_TYPE": "NullCache"}
CACHE = Cache(config=CACHE_CONFIG)
logger = get_logger("cache")


def init_cache(app_config, flask_app):
global CACHE
global CACHE_CONFIG
cache_type = "NullCache"

logger.info("Initializing Cache")
if app_config.api_cache_timeout:
cache_type = "SimpleCache"

cache_config = {"CACHE_TYPE": cache_type, "CACHE_DEFAULT_TIMEOUT": app_config.api_cache_timeout}
CACHE_CONFIG = {"CACHE_TYPE": cache_type, "CACHE_DEFAULT_TIMEOUT": app_config.api_cache_timeout}
if app_config.api_cache_type == "RedisCache" and app_config._cache_host and app_config._cache_port:
cache_config["CACHE_TYPE"] = app_config.api_cache_type
cache_config["CACHE_REDIS_HOST"] = app_config._cache_host
cache_config["CACHE_REDIS_PORT"] = app_config._cache_port
CACHE_CONFIG["CACHE_TYPE"] = app_config.api_cache_type
CACHE_CONFIG["CACHE_REDIS_HOST"] = app_config._cache_host
CACHE_CONFIG["CACHE_REDIS_PORT"] = app_config._cache_port

if not CACHE:
CACHE = Cache(config=cache_config)
logger.info(f"Cache is unset; using config={CACHE_CONFIG}")
CACHE = Cache(config=CACHE_CONFIG)
else:
logger.info(f"Cache using config={CACHE_CONFIG}")
if isinstance(flask_app, Flask):
CACHE = CACHE.init_app(flask_app, config=cache_config)
logger.info("Cache initialized with app.")
CACHE = CACHE.init_app(flask_app, config=CACHE_CONFIG)
else:
logger.info(f"Cache not initialized with app. Passed the following for the app={flask_app}.")


def _delete_keys_simple(prefix):
cache_dict = CACHE.cache._cache
for cache_key in list(cache_dict.keys()):
if cache_key.startswith(f"flask_cache_{prefix}"):
cache_dict.pop(cache_key)


def _delete_keys_redis(prefix):
global CACHE_CONFIG
try:
redis_client = Redis(host=CACHE_CONFIG.get("CACHE_REDIS_HOST"), port=CACHE_CONFIG.get("CACHE_REDIS_PORT"))
# Use SCAN to find keys to delete that start with the prefix; default prefix is flask_cache_
for key in redis_client.scan_iter(f"flask_cache_{prefix}*"):
redis_client.delete(key)
except Exception as exec:
logger.exception("Cache deletion failed", exc_info=exec)


def delete_keys(prefix):
if CACHE and CACHE.config["CACHE_TYPE"] == "SimpleCache":
_delete_keys_simple(prefix)

if CACHE and CACHE.config["CACHE_TYPE"] == "RedisCache":
_delete_keys_redis(prefix)
10 changes: 10 additions & 0 deletions api/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
from api import flask_json_response
from api import json_error_response
from api import metrics
from api.cache import delete_keys
from api.group_query import build_group_response
from api.group_query import build_paginated_group_list_response
from api.group_query import get_filtered_group_list_db
from api.group_query import get_group_list_by_id_list_db
from app import RbacPermission
from app import RbacResourceType
from app.auth import get_current_identity
from app.exceptions import InventoryException
from app.instrumentation import log_create_group_failed
from app.instrumentation import log_create_group_not_allowed
Expand Down Expand Up @@ -142,6 +144,8 @@ def patch_group_by_id(group_id, body, rbac_filter=None):
)

updated_group = get_group_by_id_from_db(group_id)
current_identity = get_current_identity()
delete_keys(current_identity.org_id)
log_patch_group_success(logger, group_id)
return flask_json_response(build_group_response(updated_group), HTTPStatus.OK)

Expand All @@ -158,6 +162,8 @@ def delete_groups(group_id_list, rbac_filter=None):
log_get_group_list_failed(logger)
abort(HTTPStatus.NOT_FOUND, "No groups found for deletion.")

current_identity = get_current_identity()
delete_keys(current_identity.org_id)
return Response(None, HTTPStatus.NO_CONTENT)


Expand Down Expand Up @@ -199,6 +205,8 @@ def delete_hosts_from_group(group_id, host_id_list, rbac_filter=None):
log_delete_hosts_from_group_failed(logger)
abort(HTTPStatus.NOT_FOUND, "Group or hosts not found.")

current_identity = get_current_identity()
delete_keys(current_identity.org_id)
return Response(None, HTTPStatus.NO_CONTENT)


Expand Down Expand Up @@ -231,4 +239,6 @@ def delete_hosts_from_different_groups(host_id_list, rbac_filter=None):
log_delete_hosts_from_group_failed(logger)
abort(HTTPStatus.NOT_FOUND, "The provided hosts were not found.")

current_identity = get_current_identity()
delete_keys(current_identity.org_id)
return Response(None, HTTPStatus.NO_CONTENT)
14 changes: 12 additions & 2 deletions api/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from api import build_collection_response
from api import flask_json_response
from api import metrics
from api.cache import CACHE
from api.cache import delete_keys
from api.cache_key import make_key
from api.host_query import build_paginated_host_list_response
from api.host_query import staleness_timestamps
from api.host_query_db import get_all_hosts
Expand Down Expand Up @@ -72,6 +75,7 @@


@api_operation
@CACHE.cached(key_prefix=make_key)
@rbac(RbacResourceType.HOSTS, RbacPermission.READ)
@metrics.api_request_time.time()
def get_host_list(
Expand Down Expand Up @@ -282,6 +286,7 @@ def _delete_host_list(host_id_list, rbac_filter):
) as payload_tracker_processing_ctx:
payload_tracker_processing_ctx.inventory_id = host_id

delete_keys(current_identity.org_id)
return deletion_count


Expand Down Expand Up @@ -355,6 +360,7 @@ def get_host_by_id(host_id_list, page=1, per_page=100, order_by=None, order_how=


@api_operation
@CACHE.cached(key_prefix=make_key)
@rbac(RbacResourceType.HOSTS, RbacPermission.READ)
@metrics.api_request_time.time()
def get_host_system_profile_by_id(
Expand Down Expand Up @@ -410,8 +416,8 @@ def patch_host_by_id(host_id_list, body, rbac_filter=None):
log_patch_host_failed(logger, host_id_list)
return flask.abort(HTTPStatus.NOT_FOUND, "Requested host not found.")

identity = get_current_identity()
staleness = get_staleness_obj(identity)
current_identity = get_current_identity()
staleness = get_staleness_obj(current_identity)

try:
for host in hosts_to_update:
Expand All @@ -433,6 +439,7 @@ def patch_host_by_id(host_id_list, body, rbac_filter=None):
serialized_host = serialize_host(host, staleness_timestamps(), staleness=staleness)
_emit_patch_event(serialized_host, host)

delete_keys(current_identity.org_id)
log_patch_host_success(logger, host_id_list)
return 200

Expand Down Expand Up @@ -508,6 +515,7 @@ def update_facts_by_namespace(operation, host_id_list, namespace, fact_dict, rba


@api_operation
@CACHE.cached(key_prefix=make_key)
@rbac(RbacResourceType.HOSTS, RbacPermission.READ)
@metrics.api_request_time.time()
def get_host_tag_count(host_id_list, page=1, per_page=100, order_by=None, order_how=None, rbac_filter=None):
Expand All @@ -528,6 +536,7 @@ def get_host_tag_count(host_id_list, page=1, per_page=100, order_by=None, order_


@api_operation
@CACHE.cached(key_prefix=make_key)
@rbac(RbacResourceType.HOSTS, RbacPermission.READ)
@metrics.api_request_time.time()
def get_host_tags(host_id_list, page=1, per_page=100, order_by=None, order_how=None, search=None, rbac_filter=None):
Expand Down Expand Up @@ -566,6 +575,7 @@ def host_checkin(body, rbac_filter=None):
db.session.commit()
serialized_host = serialize_host(existing_host, staleness_timestamps(), staleness=staleness)
_emit_patch_event(serialized_host, existing_host)
delete_keys(current_identity.org_id)
return flask_json_response(serialized_host, 201)
else:
flask.abort(404, "No hosts match the provided canonical facts.")
8 changes: 3 additions & 5 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ def topic(t):
self.unleash_cache_directory = os.getenv("UNLEASH_CACHE_DIR", "/tmp/.unleashcache")
if unleash:
self.unleash_token = unleash.clientAccessToken
unleash_url = f"{unleash.hostname}:{unleash.port}/api"
if unleash.port in (80, 8080):
self.unleash_url = f"http://{unleash_url}"
else:
self.unleash_url = f"https://{unleash_url}"
self.unleash_url = f"{unleash.scheme.value}://{unleash.hostname}:{unleash.port}/api"
else:
self.unleash_url = os.getenv("UNLEASH_URL")
self.unleash_token = os.getenv("UNLEASH_TOKEN")
Expand Down Expand Up @@ -117,6 +113,8 @@ def non_clowder_config(self):

self.unleash_url = os.environ.get("UNLEASH_URL", "http://unleash:4242/api")
self.unleash_token = os.environ.get("UNLEASH_TOKEN", "")
self._cache_host = os.environ.get("CACHE_HOST", "localhost")
self._cache_port = os.environ.get("CACHE_PORT", "6379")

def days_to_seconds(self, n_days):
factor = 86400
Expand Down
3 changes: 3 additions & 0 deletions app/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from marshmallow import ValidationError
from sqlalchemy.exc import OperationalError

from api.cache import delete_keys
from api.staleness_query import get_staleness_obj
from app.auth.identity import create_mock_identity_with_org_id
from app.auth.identity import Identity
Expand Down Expand Up @@ -297,6 +298,7 @@ def handle_message(message, event_producer, notification_event_producer, message
):
try:
host = validated_operation_msg["data"]
org_id = host["org_id"]

output_host, host_id, insights_id, operation_result = message_operation(
host, platform_metadata, validated_operation_msg.get("operation_args", {})
Expand All @@ -312,6 +314,7 @@ def handle_message(message, event_producer, notification_event_producer, message
output_host.get("system_profile", {}).get("operating_system", {}).get("name"),
)
event_producer.write_event(event, str(host_id), headers, wait=True)
delete_keys(org_id)
except ValidationException as ve:
logger.error(
"Validation error while adding or updating host: %s",
Expand Down
16 changes: 16 additions & 0 deletions deploy/clowdapp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ objects:
value: ${BYPASS_TENANT_TRANSLATION}
- name: CLOWDER_ENABLED
value: "true"
- name: INVENTORY_API_CACHE_TIMEOUT_SECONDS
value: "${INVENTORY_API_CACHE_TIMEOUT_SECONDS}"
- name: INVENTORY_API_CACHE_TYPE
value: "${INVENTORY_API_CACHE_TYPE}"
- name: UNLEASH_URL
value: ${UNLEASH_URL}
- name: UNLEASH_TOKEN
Expand Down Expand Up @@ -264,6 +268,10 @@ objects:
value: ${BYPASS_TENANT_TRANSLATION}
- name: CLOWDER_ENABLED
value: "true"
- name: INVENTORY_API_CACHE_TIMEOUT_SECONDS
value: "${INVENTORY_API_CACHE_TIMEOUT_SECONDS}"
- name: INVENTORY_API_CACHE_TYPE
value: "${INVENTORY_API_CACHE_TYPE}"
- name: UNLEASH_URL
value: ${UNLEASH_URL}
- name: UNLEASH_TOKEN
Expand Down Expand Up @@ -338,6 +346,10 @@ objects:
value: ${BYPASS_TENANT_TRANSLATION}
- name: CLOWDER_ENABLED
value: "true"
- name: INVENTORY_API_CACHE_TIMEOUT_SECONDS
value: "${INVENTORY_API_CACHE_TIMEOUT_SECONDS}"
- name: INVENTORY_API_CACHE_TYPE
value: "${INVENTORY_API_CACHE_TYPE}"
- name: UNLEASH_URL
value: ${UNLEASH_URL}
- name: UNLEASH_TOKEN
Expand Down Expand Up @@ -415,6 +427,10 @@ objects:
value: ${KAFKA_SASL_MECHANISM}
- name: CLOWDER_ENABLED
value: "true"
- name: INVENTORY_API_CACHE_TIMEOUT_SECONDS
value: "${INVENTORY_API_CACHE_TIMEOUT_SECONDS}"
- name: INVENTORY_API_CACHE_TYPE
value: "${INVENTORY_API_CACHE_TYPE}"
- name: UNLEASH_URL
value: ${UNLEASH_URL}
- name: UNLEASH_TOKEN
Expand Down
2 changes: 2 additions & 0 deletions host_reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker

from api.cache import init_cache
from app import create_app
from app.auth.identity import create_mock_identity_with_org_id
from app.config import Config
Expand Down Expand Up @@ -131,6 +132,7 @@ def run(config, logger, session, event_producer, shutdown_handler):

def main(logger):
config = _init_config()
init_cache(config, application)

registry = CollectorRegistry()
for metric in COLLECTED_METRICS:
Expand Down
2 changes: 2 additions & 0 deletions inv_mq_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from confluent_kafka import Consumer as KafkaConsumer
from prometheus_client import start_http_server

from api.cache import init_cache
from app import create_app
from app.environment import RuntimeEnvironment
from app.logging import get_logger
Expand All @@ -21,6 +22,7 @@
def main():
application = create_app(RuntimeEnvironment.SERVICE)
config = application.app.config["INVENTORY_CONFIG"]
init_cache(config, application)
start_http_server(config.metrics_port)

topic_to_handler = {config.host_ingress_topic: add_host, config.system_profile_topic: update_system_profile}
Expand Down
4 changes: 4 additions & 0 deletions lib/host_delete.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from confluent_kafka import KafkaException
from sqlalchemy.orm.base import instance_state

from api.cache import delete_keys
from app.auth.identity import to_auth_header
from app.logging import get_logger
from app.models import Host
Expand All @@ -18,6 +19,7 @@


def delete_hosts(select_query, event_producer, chunk_size, interrupt=lambda: False, identity=None):
cache_keys_to_invalidate = set()
with session_guard(select_query.session):
while select_query.count():
for host in select_query.limit(chunk_size):
Expand All @@ -29,6 +31,8 @@ def delete_hosts(select_query, event_producer, chunk_size, interrupt=lambda: Fal

if interrupt():
return
for org_id in cache_keys_to_invalidate:
delete_keys(org_id)


def _delete_host(session, event_producer, host, identity=None):
Expand Down

0 comments on commit 5e83f61

Please sign in to comment.