diff --git a/arcee/Dockerfile b/arcee/Dockerfile index f633035e4..75ccbc322 100644 --- a/arcee/Dockerfile +++ b/arcee/Dockerfile @@ -6,6 +6,7 @@ ENV PYTHONPATH /usr/src/app/ COPY arcee/arcee_receiver/requirements.txt arcee/arcee_receiver/requirements.txt COPY optscale_client/aconfig_cl optscale_client/aconfig_cl +COPY optscale_client/config_client optscale_client/config_client COPY tools/optscale_time tools/optscale_time RUN pip install --no-cache-dir -r /usr/src/app/arcee/arcee_receiver/requirements.txt diff --git a/arcee/arcee_receiver/models.py b/arcee/arcee_receiver/models.py index 8be4b0761..7eb03421c 100644 --- a/arcee/arcee_receiver/models.py +++ b/arcee/arcee_receiver/models.py @@ -8,7 +8,7 @@ from typing_extensions import Annotated -class ArceeState: +class ArceeState(int, Enum): STARTED = 1 FINISHED = 2 ERROR = 3 @@ -115,6 +115,17 @@ class TaskPatchIn(BaseClass): owner_id: Optional[str] = None +class TaskPostIn(TaskPatchIn): + key: str + metrics: List[str] = [] + + +class Task(TaskPostIn): + id: str = id_ + token: str + deleted_at: int = 0 + + class LeaderboardFilter(BaseClass): id: str = Field(description='metric id to filter by') min: Optional[float] = None diff --git a/arcee/arcee_receiver/producer.py b/arcee/arcee_receiver/producer.py new file mode 100644 index 000000000..70455fadb --- /dev/null +++ b/arcee/arcee_receiver/producer.py @@ -0,0 +1,59 @@ +import os +import asyncio +from functools import partial +import concurrent.futures + +from kombu import Connection as QConnection, Exchange +from kombu.pools import producers + + +from optscale_client.config_client.client import Client as ConfigClient + + +class ActivitiesTaskProducer: + EXCHANGE_NAME = 'activities-tasks' + RETRY_POLICY = {'max_retries': 15, 'interval_start': 0, + 'interval_step': 1, 'interval_max': 3} + RESCHEDULE_TIMEOUT = 60 * 60 * 12 + + executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) + + def __init__(self): + self._config_cl = None + + @classmethod + async def run_async(cls, func, *args, loop=None, executor=None, **kwargs): + if loop is None: + loop = asyncio.get_event_loop() + if executor is None: + executor = cls.executor + pfunc = partial(func, *args, **kwargs) + return await loop.run_in_executor(executor, pfunc) + + @property + def config_cl(self): + if self._config_cl is None: + config_cl = ConfigClient( + host=os.environ.get('HX_ETCD_HOST'), + port=int(os.environ.get('HX_ETCD_PORT')), + ) + self._config_cl = config_cl + return self._config_cl + + def create_task(self, task, routing_key): + params = self.config_cl.read_branch('/rabbit') + conn_str = f'amqp://{params["user"]}:{params["pass"]}@' \ + f'{params["host"]}:{params["port"]}' + queue_conn = QConnection(conn_str, transport_options=self.RETRY_POLICY) + + task_exchange = Exchange(self.EXCHANGE_NAME, type='topic') + with producers[queue_conn].acquire(block=True) as producer: + producer.publish( + task, + serializer='json', + exchange=task_exchange, + declare=[task_exchange], + routing_key=routing_key, + retry=True, + retry_policy=self.RETRY_POLICY + ) diff --git a/arcee/arcee_receiver/requirements.txt b/arcee/arcee_receiver/requirements.txt index 7523af5c3..2af97bba5 100644 --- a/arcee/arcee_receiver/requirements.txt +++ b/arcee/arcee_receiver/requirements.txt @@ -2,10 +2,12 @@ aiohttp==3.10.2 sanic==23.12.1 sanic-ext==23.12.0 motor==3.6.0 +kombu==5.3.4 pymongo==4.9.1 python-etcd==0.4.5 mongodb-migrations==1.2.1 pydantic==2.4.2 # OptScale packages +-e optscale_client/config_client -e optscale_client/aconfig_cl -e tools/optscale_time diff --git a/arcee/arcee_receiver/server.py b/arcee/arcee_receiver/server.py index a86cf2605..485cc8efd 100644 --- a/arcee/arcee_receiver/server.py +++ b/arcee/arcee_receiver/server.py @@ -19,18 +19,20 @@ from sanic_ext import validate from arcee.arcee_receiver.models import ( - TaskPatchIn, Console, ConsolePostIn, Dataset, DatasetPatchIn, + ArtifactPostIn, ArtifactPatchIn, Artifact, ArtifactSearchParams, + ArceeState, Console, ConsolePostIn, Dataset, DatasetPatchIn, DatasetPostIn, Run, RunPatchIn, RunPostIn, Leaderboard, LeaderboardPatchIn, LeaderboardPostIn, LeaderboardTemplate, LeaderboardTemplatePostIn, LeaderboardTemplatePatchIn, Log, Platform, StatsPostIn, ModelPatchIn, ModelPostIn, Model, ModelVersionIn, ModelVersion, Metric, MetricPostIn, MetricPatchIn, - ArtifactPostIn, ArtifactPatchIn, Artifact, ArtifactSearchParams, + Task, TaskPatchIn, TaskPostIn ) from arcee.arcee_receiver.modules.leader_board import ( get_calculated_leaderboard, Tendencies) from arcee.arcee_receiver.modules.leader_board import ( get_metrics as _get_task_metrics) +from arcee.arcee_receiver.producer import ActivitiesTaskProducer from optscale_client.aconfig_cl.aconfig_cl import AConfigCl import tools.optscale_time as opttime @@ -76,6 +78,23 @@ async def get_cluster_secret() -> str: db = client[db_name] +async def publish_activities_task(profiling_token, object_id, object_name, + object_type, action, **kwargs): + task = { + "profiling_token": profiling_token, + "object_id": object_id, + "object_name": object_name, + "object_type": object_type, + "action": action, + **kwargs + } + routing_key = ".".join([object_type, action]) + producer = ActivitiesTaskProducer() + logger.info("Creating activities task:%s, routing_key:%s", task, + routing_key) + await producer.run_async(producer.create_task, task, routing_key) + + async def extract_token(request, raise_on=True): token = request.headers.get('x-api-key') if not token and raise_on: @@ -185,9 +204,10 @@ async def check_metrics(metrics): if not metrics: return existing_metrics = [ - doc["_id"] async for doc in db.metric.find({"_id": {"$in": metrics}}) + doc async for doc in db.metric.find({"_id": {"$in": metrics}}) ] - missing = list(filter(lambda x: x not in existing_metrics, metrics)) + existing_metrics_ids = [x["_id"] for x in existing_metrics] + missing = list(filter(lambda x: x not in existing_metrics_ids, metrics)) if missing: msg = "some metrics not exists in db: %s" % ",".join(missing) raise SanicException(msg, status_code=400) @@ -205,29 +225,45 @@ async def check_leaderboard_filters(leaderboard, updates): raise SanicException('Invalid filters', status_code=400) +async def _create_task(**kwargs) -> dict: + task = Task(**kwargs).model_dump(by_alias=True) + await db.task.insert_one(task) + return task + + @app.route('/arcee/v2/tasks', methods=["POST", ], ctx_label='token') -async def create_task(request): +@validate(json=TaskPostIn) +async def create_task(request, body: TaskPostIn): token = request.ctx.token - doc = request.json - # TODO: validators - key = doc.get("key") - if not key or not isinstance(key, str): - raise SanicException("Key should be str", status_code=400) - metrics = (doc.get("metrics") or list()) - await check_metrics(metrics) - display_name = doc.get("name", key) - description = doc.get("description") - doc.update({"token": token}) + await check_metrics(body.metrics) o = await db.task.find_one( - {"token": token, "key": key, "deleted_at": 0}) + {"token": token, "key": body.key, "deleted_at": 0}) if o: raise SanicException("Project exists", status_code=409) - doc["_id"] = str(uuid.uuid4()) - doc["name"] = display_name - doc["deleted_at"] = 0 - doc["description"] = description - await db.task.insert_one(doc) - return json(doc) + task = await _create_task( + token=token, **body.model_dump(exclude_unset=True)) + await publish_activities_task( + token, task["_id"], task.get("name"), "task", "task_created") + return json(task) + + +async def _send_task_metric_updated( + metrics_to_add: set, metrics_to_remove: set, token: str, + object_id: str, object_name: str): + all_metrics = metrics_to_add.union(metrics_to_remove) + existing_metrics = { + doc["_id"]: doc.get('name') + async for doc in db.metric.find({"_id": {"$in": list(all_metrics)}}) + } + for metric_id in all_metrics: + state = 'activated' + if metric_id in metrics_to_remove: + state = 'deactivated' + await publish_activities_task( + token, object_id, object_name, "task", "task_metric_updated", + metric_id=metric_id, metric_name=existing_metrics.get(metric_id), + state=state + ) @app.route('/arcee/v2/tasks/', methods=["PATCH", ], @@ -247,18 +283,28 @@ async def update_task(request, body: TaskPatchIn, id_: str): if not o: raise SanicException("Not found", status_code=404) d = body.model_dump(exclude_unset=True) + metrics_to_add = [] + metrics_to_remove = [] if d: metrics = d.get('metrics') if metrics is not None: await check_metrics(metrics) metrics_to_remove = set(o['metrics']) - set(metrics) + metrics_to_add = set(metrics) - set(o['metrics']) for metric_id in metrics_to_remove: if await _metric_used_in_lb(db, metric_id, task_id=id_): raise SanicException( f"Metric is used in task leaderboard(s)", status_code=409) - await db.task.update_one( - {"_id": id_}, {'$set': d}) + await db.task.update_one({"_id": id_}, {"$set": d}) + task_name = d.get("name") or o.get("name") + if metrics is None: + await publish_activities_task( + token, id_, task_name, "task", "task_updated" + ) + else: + await _send_task_metric_updated( + metrics_to_add, metrics_to_remove, token, id_, task_name) return json({"updated": bool(d), "id": id_}) @@ -364,8 +410,7 @@ async def delete_task(request, id_: str): deleted_consoles = 0 deleted_artifacts = 0 token = request.ctx.token - o = await db.task.find_one( - {"token": token, "_id": id_, "deleted_at": 0}) + o = await db.task.find_one({"token": token, "_id": id_, "deleted_at": 0}) if not o: raise SanicException("Not found", status_code=404) runs = [doc["_id"] async for doc in db.run.find({"task_id": id_})] @@ -406,6 +451,9 @@ async def delete_task(request, id_: str): {"$set": {"deleted_at": now}}) await db.task.update_one( {"_id": id_}, {"$set": {"deleted_at": now}}) + await publish_activities_task( + token, id_, o.get("name"), "task", "task_deleted" + ) return json({ "deleted": True, "_id": id_, @@ -432,18 +480,18 @@ async def create_task_run(request, body: RunPostIn, name: str): """ token = request.ctx.token - o = await db.task.find_one( - {"token": token, "key": name, "deleted_at": 0}) + o = await db.task.find_one({"token": token, "key": name, "deleted_at": 0}) if not o: raise SanicException("Not found", status_code=404) task_id = o["_id"] run_cnt = await db.run.count_documents({"task_id": task_id}) - r = Run( - task_id=task_id, number=run_cnt + 1, **body.model_dump() - ) + r = Run(task_id=task_id, number=run_cnt + 1, **body.model_dump()) await db.run.insert_one(r.model_dump(by_alias=True)) + await publish_activities_task( + token, r.id, r.name, "run", "run_started" + ) return json({"id": r.id}) @@ -645,6 +693,10 @@ async def update_run(request, body: RunPatchIn, run_id: str): await check_run_state(r) # check task await check_task(token, r) + else: + task = await db.task.find_one({"_id": r["task_id"]}) + if task: + token = task.get('token') d = body.model_dump(exclude_unset=True, exclude={'finish'}) # TODO: remove "finish" from PATCH payload. Set ts based on "state" @@ -656,8 +708,17 @@ async def update_run(request, body: RunPatchIn, run_id: str): existing_hyperparams.update(hyperparameters) d.update({"hyperparameters": existing_hyperparams}) - await db.run.update_one( - {"_id": run_id}, {'$set': d}) + await db.run.update_one({"_id": run_id}, {"$set": d}) + + if body.state and body.state == ArceeState.ERROR: + await publish_activities_task( + token, run_id, r.get("name"), "run", "run_failed" + ) + elif body.state: + await publish_activities_task( + token, run_id, r.get("name"), "run", "run_state_updated", + state=ArceeState(body.state).name + ) return json({"updated": True, "id": run_id}) @@ -718,14 +779,20 @@ async def collect(request, body: StatsPostIn): platform = body.platform instance_id = platform.instance_id + run_id = body.run + run = await db.run.find_one({"_id": run_id}) if instance_id: o = await db.platform.find_one({"instance_id": instance_id}) if not o: await db.platform.insert_one( Platform(**platform.model_dump()).model_dump(by_alias=True)) - - run_id = body.run - run = await db.run.find_one({"_id": run_id}) + run_name = None + if run: + run_name = run.get("name") + await publish_activities_task( + token, instance_id, instance_id, "platform", + "platform_created", run_id=run_id, run_name=run_name + ) if not run: raise SanicException("Not found", status_code=404) await check_run_state(run) @@ -926,6 +993,9 @@ async def delete_run(request, run_id: str): await db.model_version.update_many({'run_id': run['_id'], 'deleted_at': 0}, {'$set': {'deleted_at': now}}) await db.run.delete_one({'_id': run_id}) + await publish_activities_task( + token, run_id, run.get("name"), "run", "run_deleted" + ) return json({"deleted": True, "_id": run_id}) @@ -950,6 +1020,9 @@ async def create_metric(request, body: MetricPostIn): raise SanicException("Conflict", status_code=409) metric = await _create_metric( token=token, **body.model_dump(exclude_unset=True)) + await publish_activities_task( + token, metric["_id"], metric.get("name"), "metric", "metric_created" + ) return json(metric) @@ -1069,6 +1142,9 @@ async def delete_metric(request, metric_id: str): raise SanicException("Metric used in leaderboard template", status_code=409) await db.metric.delete_one({'_id': metric_id}) + await publish_activities_task( + token, metric_id, o.get("name"), "metric", "metric_deleted" + ) return json({"deleted": True, "_id": metric_id}) @@ -1101,13 +1177,16 @@ async def change_metric(request, body: MetricPatchIn, metric_id: str): :return: """ token = request.ctx.token - metric = await db.metric.find_one({'_id': metric_id, 'token': token}) - if not metric: + db_metric = await db.metric.find_one({"_id": metric_id, "token": token}) + if not db_metric: raise SanicException("Metric not found", status_code=404) metric = body.model_dump(exclude_unset=True) if metric: - await db.metric.update_one( - {"_id": metric_id}, {'$set': metric}) + await db.metric.update_one({"_id": metric_id}, {"$set": metric}) + await publish_activities_task( + token, metric_id, metric.get("name") or db_metric.get("name"), + "metric", "metric_updated" + ) return json({"updated": bool(metric), "id": metric_id}) @@ -1436,8 +1515,12 @@ async def create_leaderboard_template(request, body: LeaderboardTemplatePostIn, raise SanicException("Conflict", status_code=409) await check_metrics(body.metrics) leaderboard_template = await _create_leaderboard_template( - token=token, task_id=task_id, - **body.model_dump(exclude_unset=True)) + token=token, task_id=task_id, **body.model_dump(exclude_unset=True)) + await publish_activities_task( + token, leaderboard_template["_id"], None, "leaderboard_template", + "leaderboard_template_created", task_id=task_id, + task_name=task.get("name") + ) return json(leaderboard_template, status=201) @@ -1463,6 +1546,9 @@ async def create_leaderboard(request, body: LeaderboardPostIn, d = await _create_leaderboard( token=token, leaderboard_template_id=leaderboard_template_id, **body.model_dump()) + await publish_activities_task( + token, d["_id"], d.get("name"), "leaderboard", "leaderboard_created" + ) return json(d, status=201) @@ -1594,20 +1680,19 @@ async def get_leaderboards(request, leaderboard_template_id: str): async def update_leaderboard(request, body: LeaderboardPatchIn, id_: str): token = request.ctx.token o = await db.leaderboard.find_one( - {"$and": [ - {"token": token}, - {"_id": id_}, - {"deleted_at": 0} - ]}) + {"$and": [{"token": token}, {"_id": id_}, {"deleted_at": 0}]} + ) if not o: raise SanicException("Leaderboard not found", status_code=404) d = body.model_dump(exclude_unset=True) if d: await check_leaderboard_filters(o, d) LeaderboardPatchIn.remove_dup_ds_ids(d) - await db.leaderboard.update_one( - {"_id": id_}, {'$set': d}) + await db.leaderboard.update_one({"_id": id_}, {"$set": d}) o = await db.leaderboard.find_one({"_id": id_}) + await publish_activities_task( + token, o["_id"], o.get("name"), "leaderboard", "leaderboard_updated" + ) return json(o) @@ -1621,7 +1706,10 @@ async def delete_leaderboard(request, id_: str): await db.leaderboard.update_one({"_id": id_}, {'$set': { "deleted_at": opttime.utcnow_timestamp() }}) - return json('', status=204) + await publish_activities_task( + token, id_, o.get("name"), "leaderboard", "leaderboard_deleted" + ) + return json("", status=204) @app.route('/arcee/v2/leaderboards//details', methods=["GET", ], @@ -1746,6 +1834,11 @@ async def change_leaderboard_template( await db.leaderboard_template.update_one( {"_id": lb_id}, {'$set': lb}) o = await db.leaderboard_template.find_one({"_id": lb_id}) + await publish_activities_task( + token, lb_id, None, "leaderboard_template", + "leaderboard_template_updated", task_id=task_id, + task_name=task.get("name") + ) return json(LeaderboardTemplate(**o).model_dump(by_alias=True)) @@ -1759,6 +1852,10 @@ async def delete_leaderboard_template(request, task_id: str): :return: """ token = request.ctx.token + task = await db.task.find_one({ + '_id': task_id, 'token': token, 'deleted_at': 0}) + if not task: + raise SanicException("Task not found", status_code=404) leaderboard_template = await db.leaderboard_template.find_one( {"token": token, "task_id": task_id, "deleted_at": 0}) if not leaderboard_template: @@ -1772,7 +1869,12 @@ async def delete_leaderboard_template(request, task_id: str): {"_id": leaderboard_template['_id']}, {'$set': {'deleted_at': deleted_at}} ) - return json({"deleted": True, "_id": leaderboard_template['_id']}) + await publish_activities_task( + token, leaderboard_template["_id"], None, "leaderboard_template", + "leaderboard_template_deleted", task_id=task_id, + task_name=task.get("name") + ) + return json({"deleted": True, "_id": leaderboard_template["_id"]}) @app.route('/arcee/v2/leaderboards//generate', @@ -1809,6 +1911,9 @@ async def create_dataset(request, body: DatasetPostIn): raise SanicException("Dataset exists", status_code=409) d = await _create_dataset( token=token, **body.model_dump(exclude_unset=True)) + await publish_activities_task( + token, d["_id"], d.get("name"), "dataset", "dataset_created", + ) return json(d, status=201) @@ -1831,6 +1936,9 @@ async def register_dataset(request, body: DatasetPostIn, run_id: str): token=token, **body.model_dump(exclude_unset=True)) await db.run.update_one( {"_id": run_id}, {"$set": {"dataset_id": d["_id"]}}) + await publish_activities_task( + token, d["_id"], d.get("name"), "dataset", "dataset_created" + ) return json({"id": d["_id"]}) @@ -1873,18 +1981,18 @@ async def get_dataset(request, id_: str): async def update_dataset(request, body: DatasetPatchIn, id_: str): token = request.ctx.token o = await db.dataset.find_one( - {"$and": [ - {"token": token}, - {"_id": id_}, - {"deleted_at": 0} - ]}) + {"$and": [{"token": token}, {"_id": id_}, {"deleted_at": 0}]} + ) if not o: raise SanicException("Dataset not found", status_code=404) d = body.model_dump(exclude_unset=True) if d: - await db.dataset.update_one( - {"_id": id_}, {'$set': d}) + await db.dataset.update_one({"_id": id_}, {"$set": d}) o = await db.dataset.find_one({"_id": id_}) + await publish_activities_task( + token, o["_id"], d.get("name") or o.get("name"), "dataset", + "dataset_updated" + ) return json(Dataset(**o).model_dump(by_alias=True)) @@ -1952,7 +2060,10 @@ async def delete_dataset(request, id_: str): await db.dataset.update_one({"_id": id_}, {'$set': { "deleted_at": opttime.utcnow_timestamp() }}) - return json('', status=204) + await publish_activities_task( + token, id_, o.get("name"), "dataset", "dataset_deleted" + ) + return json("", status=204) @app.route('/arcee/v2/labels', methods=["GET", ], ctx_label='token') @@ -2054,6 +2165,9 @@ async def create_model(request, body: ModelPostIn): if not model: model = await _create_model( token=token, **body.model_dump(exclude_unset=True)) + await publish_activities_task( + token, model["_id"], model.get("name"), "model", "model_created" + ) return json(model, status=201) @@ -2201,17 +2315,22 @@ async def update_model(request, body: ModelPatchIn, id_: str): await _get_model(token, id_) model = body.model_dump(exclude_unset=True) if model: - await db.model.update_one( - {"_id": id_}, {'$set': model}) + await db.model.update_one({"_id": id_}, {"$set": model}) obj = await db.model.find_one({"_id": id_}) + await publish_activities_task( + token, obj["_id"], obj.get("name"), "model", "model_updated" + ) return json(Model(**obj).model_dump(by_alias=True)) @app.route('/arcee/v2/models/', methods=["DELETE", ], ctx_label='token') async def delete_model(request, id_: str): - await _get_model(request.ctx.token, id_) + model = await _get_model(request.ctx.token, id_) await db.model_version.delete_many({"model_id": id_}) await db.model.delete_one({"_id": id_}) + await publish_activities_task( + request.ctx.token, id_, model.get("name"), "model", "model_deleted" + ) return json({'deleted': True, '_id': id_}, status=204) @@ -2258,26 +2377,27 @@ async def _remove_used_aliases(aliases, model_id): async def create_model_version(request, body: ModelVersionIn, run_id: str, model_id: str): token = request.ctx.token - await _get_model(token, model_id) - run = await db.run.find_one({ - '_id': run_id - }) + model = await _get_model(token, model_id) + run = await db.run.find_one({"_id": run_id}) if not run: raise SanicException('Run not found', status_code=404) model_version = await db.model_version.find_one( - {"$and": [ - {"model_id": model_id}, - {"run_id": run_id} - ]}) + {"$and": [{"model_id": model_id}, {"run_id": run_id}]} + ) if model_version: raise SanicException("Model version already exists", status_code=409) body.version = await _get_next_version(model_id, body.version) if body.aliases: await _remove_used_aliases(body.aliases, model_id) - model = await _create_model_version( - run_id=run_id, model_id=model_id, - **body.model_dump(exclude_unset=True)) - return json(model, status=201) + model_version = await _create_model_version( + run_id=run_id, model_id=model_id, **body.model_dump(exclude_unset=True) + ) + await publish_activities_task( + token, model_version["_id"], model_version["version"], "model_version", + "model_version_created", model_id=model_id, + model_name=model.get("name") + ) + return json(model_version, status=201) @app.route('/arcee/v2/runs//models//version', @@ -2304,6 +2424,10 @@ async def update_model_version(request, body: ModelVersionIn, await db.model_version.update_one( {"_id": model_version_id}, {'$set': updates}) obj = await db.model_version.find_one({"_id": model_version_id}) + await publish_activities_task( + request.ctx.token, obj["_id"], obj["version"], "model_version", + "model_version_updated" + ) return json(ModelVersion(**obj).model_dump(by_alias=True)) @@ -2325,7 +2449,11 @@ async def delete_model_version(request, run_id: str, model_id: str): {'$set': { "deleted_at": int(datetime.now(tz=timezone.utc).timestamp())}} ) - return json('', status=204) + await publish_activities_task( + request.ctx.token, model_version_id, model_version["version"], + "model_version", "model_version_deleted" + ) + return json("", status=204) @app.route('/arcee/v2/tasks//model_versions', methods=["GET", ], @@ -2390,12 +2518,17 @@ async def _create_artifact(**kwargs) -> dict: @validate(json=ArtifactPostIn) async def create_artifact(request, body: ArtifactPostIn): token = request.ctx.token - run = await db.run.find_one({"_id": body.run_id, 'deleted_at': 0}) + run = await db.run.find_one({"_id": body.run_id, "deleted_at": 0}) if not run: raise SanicException("Run not found", status_code=404) artifact = await _create_artifact( - token=token, **body.model_dump(exclude_unset=True)) + token=token, **body.model_dump(exclude_unset=True) + ) artifact = _format_artifact(artifact, run) + await publish_activities_task( + token, artifact["_id"], artifact.get("name"), "artifact", + "artifact_created" + ) return json(artifact, status=201) @@ -2513,26 +2646,32 @@ async def get_artifact(request, id_: str): async def update_artifact(request, body: ArtifactPatchIn, id_: str): token = request.ctx.token artifact = await _get_artifact(token, id_) - run = await db.run.find_one( - {"_id": artifact['run_id'], 'deleted_at': 0}) + run = await db.run.find_one({"_id": artifact["run_id"], "deleted_at": 0}) if not run: raise SanicException("Run not found", status_code=404) updates = body.model_dump(exclude_unset=True) if updates: - await db.artifact.update_one( - {"_id": id_}, {'$set': updates}) + await db.artifact.update_one({"_id": id_}, {"$set": updates}) obj = await db.artifact.find_one({"_id": id_}) artifact = Artifact(**obj).model_dump(by_alias=True) artifact = _format_artifact(artifact, run) + await publish_activities_task( + token, artifact["_id"], artifact.get("name"), "artifact", + "artifact_updated" + ) return json(artifact) @app.route('/arcee/v2/artifacts/', methods=["DELETE", ], ctx_label='token') async def delete_artifact(request, id_: str): - await _get_artifact(request.ctx.token, id_) + artifact = await _get_artifact(request.ctx.token, id_) await db.artifact.delete_one({"_id": id_}) - return json({'deleted': True, '_id': id_}, status=204) + await publish_activities_task( + request.ctx.token, id_, artifact.get("name"), "artifact", + "artifact_deleted" + ) + return json({"deleted": True, "_id": id_}, status=204) if __name__ == '__main__': diff --git a/arcee/arcee_receiver/tests/conftest.py b/arcee/arcee_receiver/tests/conftest.py index ab874ef68..a9e2d8837 100644 --- a/arcee/arcee_receiver/tests/conftest.py +++ b/arcee/arcee_receiver/tests/conftest.py @@ -4,11 +4,17 @@ from arcee.arcee_receiver.tests.base import AConfigClMock, DB_MOCK +async def return_none(*_args, **_kwargs): + return None + + @pytest.fixture def mock_base(mocker): mocker.patch('optscale_client.aconfig_cl.aconfig_cl.AConfigCl', AConfigClMock) mocker.patch('arcee.arcee_receiver.server.db', DB_MOCK) + mocker.patch('arcee.arcee_receiver.server.publish_activities_task', + return_none) async def return_false(*_args): diff --git a/arcee/arcee_receiver/tests/test_leaderboard_template.py b/arcee/arcee_receiver/tests/test_leaderboard_template.py index 2c230d03a..1b86126ac 100644 --- a/arcee/arcee_receiver/tests/test_leaderboard_template.py +++ b/arcee/arcee_receiver/tests/test_leaderboard_template.py @@ -512,13 +512,25 @@ async def test_patch_invalid_metric(app): @pytest.mark.asyncio -async def test_delete_missing(app): +async def test_delete_missing_task(app): client = app.asgi_client await prepare_token() _, response = await client.delete( Urls.leaderboard_templates.format('fake'), headers={"x-api-key": TOKEN1}) assert response.status == 404 + assert "Task not found" in response.text + + +@pytest.mark.asyncio +async def test_delete_missing_lb(app): + client = app.asgi_client + await prepare_token() + task = await prepare_tasks() + _, response = await client.delete( + Urls.leaderboard_templates.format(task[0]['_id']), + headers={"x-api-key": TOKEN1}) + assert response.status == 404 assert "Not found" in response.text diff --git a/arcee/arcee_receiver/tests/test_task.py b/arcee/arcee_receiver/tests/test_task.py index 6000ca47b..6a89ee8b9 100644 --- a/arcee/arcee_receiver/tests/test_task.py +++ b/arcee/arcee_receiver/tests/test_task.py @@ -3,7 +3,7 @@ import pytest from arcee.arcee_receiver.tests.base import ( DB_MOCK, TOKEN1, Urls, prepare_tasks, prepare_metrics, prepare_token, - prepare_run, prepare_model_version, prepare_artifact) + prepare_run, prepare_model_version) @pytest.mark.asyncio @@ -53,7 +53,7 @@ async def test_create_task(app): @pytest.mark.asyncio -async def test_create_task(app): +async def test_create_task_invalid_metric(app): client = app.asgi_client await prepare_token() task = { @@ -82,7 +82,59 @@ async def test_create_task_missing_key(app): data=json.dumps(task), headers={"x-api-key": TOKEN1}) assert response.status == 400 - assert "Key should be str" in response.text + assert 'Field required' in response.text + + +@pytest.mark.asyncio +async def test_create_invalid_params_types(app): + client = app.asgi_client + await prepare_token() + task = { + 'key': 'key', + 'description': 'description', + 'name': 'name', + 'owner_id': 'owner_id', + 'metrics': ['test'] + } + for param in ["description", "name", "owner_id", "key"]: + for value in [1, {"test": 1}, ['test']]: + params = task.copy() + params[param] = value + _, response = await client.post( + Urls.tasks, data=json.dumps(params), + headers={"x-api-key": TOKEN1}) + assert response.status == 400 + assert "Input should be a valid string" in response.text + + for value in [1, "test", {"test": "test"}]: + params = task.copy() + params['metrics'] = value + _, response = await client.post( + Urls.tasks, data=json.dumps(params), + headers={"x-api-key": TOKEN1}) + assert response.status == 400 + assert "Input should be a valid list" in response.text + + +@pytest.mark.asyncio +async def test_create_unexpected(app): + client = app.asgi_client + await prepare_token() + task = { + 'key': 'key', + 'description': 'description', + 'name': 'name', + 'metrics': ['test'] + } + + for param in ['_id', 'deleted_at', 'token', 'test']: + data = task.copy() + data[param] = 'test' + _, response = await client.post(Urls.tasks, + data=json.dumps(data), + headers={"x-api-key": TOKEN1}) + assert response.status == 400 + assert "Extra inputs are not permitted" in response.text @pytest.mark.asyncio diff --git a/bulldozer/bulldozer_api/producer.py b/bulldozer/bulldozer_api/producer.py index 33e9c5361..7518040ed 100644 --- a/bulldozer/bulldozer_api/producer.py +++ b/bulldozer/bulldozer_api/producer.py @@ -9,10 +9,12 @@ from optscale_client.config_client.client import Client as ConfigClient +BULLDOZER_ROUTING_KEY = 'bulldozer-task' + class TaskProducer: - ROUTING_KEY = 'bulldozer-task' EXCHANGE_NAME = 'bulldozer-tasks' + EXCHANGE_TYPE = 'direct' RETRY_POLICY = {'max_retries': 15, 'interval_start': 0, 'interval_step': 1, 'interval_max': 3} RESCHEDULE_TIMEOUT = 60 * 60 * 12 @@ -41,20 +43,25 @@ def config_cl(self): self._config_cl = config_cl return self._config_cl - def create_task(self, task): + def create_task(self, task, routing_key=BULLDOZER_ROUTING_KEY): params = self.config_cl.read_branch('/rabbit') conn_str = f'amqp://{params["user"]}:{params["pass"]}@' \ f'{params["host"]}:{params["port"]}' queue_conn = QConnection(conn_str, transport_options=self.RETRY_POLICY) - task_exchange = Exchange(self.EXCHANGE_NAME, type='direct') + task_exchange = Exchange(self.EXCHANGE_NAME, type=self.EXCHANGE_TYPE) with producers[queue_conn].acquire(block=True) as producer: producer.publish( task, serializer='json', exchange=task_exchange, declare=[task_exchange], - routing_key=self.ROUTING_KEY, + routing_key=routing_key, retry=True, retry_policy=self.RETRY_POLICY ) + + +class ActivitiesTaskProducer(TaskProducer): + EXCHANGE_NAME = 'activities-tasks' + EXCHANGE_TYPE = 'topic' diff --git a/bulldozer/bulldozer_api/server.py b/bulldozer/bulldozer_api/server.py index 3a14ad994..d69ae1a4f 100644 --- a/bulldozer/bulldozer_api/server.py +++ b/bulldozer/bulldozer_api/server.py @@ -1,6 +1,7 @@ import datetime import asyncio from typing import Tuple +from enum import Enum import os import uuid from sanic import Sanic @@ -13,7 +14,9 @@ from mongodb_migrations.config import Configuration from bulldozer.bulldozer_api.cost_calc import CostCalc -from bulldozer.bulldozer_api.producer import TaskProducer +from bulldozer.bulldozer_api.producer import ( + ActivitiesTaskProducer, TaskProducer +) from bulldozer.bulldozer_api.name_generator import NameGenerator from bulldozer.bulldozer_api.utils import permutation @@ -28,8 +31,7 @@ config_client = AConfigCl(host=etcd_host, port=etcd_port) -class TaskState: - +class TaskState(int, Enum): STARTING_PREPARING = 1 STARTING = 2 STARTED = 3 @@ -41,8 +43,7 @@ class TaskState: WAITING_ARCEE = 10 -class RunsetState: - +class RunsetState(int, Enum): CREATED = 1 RUNNING = 2 STOPPING = 3 @@ -72,6 +73,23 @@ async def get_cluster_secret() -> str: cost_calc = CostCalc() +async def publish_activities_task(infrastructure_token, object_id, object_name, + object_type, action, **kwargs): + task = { + "infrastructure_token": infrastructure_token, + "object_id": object_id, + "object_name": object_name, + "object_type": object_type, + "action": action, + **kwargs + } + routing_key = ".".join([object_type, action]) + producer = ActivitiesTaskProducer() + logger.info("Creating activities task:%s, routing_key:%s", task, + routing_key) + await producer.run_async(producer.create_task, task, routing_key) + + async def extract_token(request): # TODO: middleware token = request.headers.get('x-api-key') @@ -233,8 +251,9 @@ async def create_template(request): status_code=400) tags = (doc.get("tags") or dict()) hyperparameters = doc.get("hyperparameters") + runset_template_id = str(uuid.uuid4()) d = { - "_id": str(uuid.uuid4()), + "_id": runset_template_id, "name": template_name, "task_ids": task_ids, "cloud_account_ids": cloud_account_ids, @@ -249,6 +268,10 @@ async def create_template(request): "deleted_at": 0 } await db.template.insert_one(d) + await publish_activities_task( + token, runset_template_id, template_name, "runset_template", + "runset_template_created" + ) return json(d, status=201) @@ -374,6 +397,9 @@ async def update_template(request, id_: str): await db.template.update_one( {"_id": id_}, {'$set': d}) o = await db.template.find_one({"_id": id_}) + await publish_activities_task( + token, id_, o.get("name"), "runset_template", "runset_template_updated" + ) return json(o) @@ -402,10 +428,10 @@ async def delete_template(request, id_: str): await db.template.update_one({"_id": id_}, {'$set': { "deleted_at": utcnow_timestamp() }}) - return json( - '', - status=204 + await publish_activities_task( + token, id_, o.get("name"), "runset_template", "runset_template_deleted" ) + return json("", status=204) async def _create_runner( @@ -424,6 +450,7 @@ async def _create_runner( spot_settings: dict, open_ingress: bool, ): + runner_name = f"{name_prefix}_{NameGenerator.get_random_name()}" runner = { "_id": str(uuid.uuid4()), "runset_id": runset_id, @@ -441,11 +468,14 @@ async def _create_runner( "created_at": created_at, "destroyed_at": 0, "started_at": 0, - "name": "", + "name": runner_name, "spot_settings": spot_settings, "open_ingress": open_ingress, } await db.runner.insert_one(runner) + await publish_activities_task( + token, runner["_id"], runner_name, "runner", "runner_created" + ) return runner["_id"] @@ -501,10 +531,11 @@ async def create_runset(request, template_id: str): open_ingress = doc.get("open_ingress", False) runset_id = str(uuid.uuid4()) runset_cnt = await db.runset.count_documents({"template_id": template_id}) + runset_name = NameGenerator.get_random_name() created_at = utcnow_timestamp() d = { "_id": runset_id, - "name": NameGenerator.get_random_name(), + "name": runset_name, "number": runset_cnt + 1, "template_id": template_id, "task_id": task_id, @@ -556,6 +587,9 @@ async def create_runset(request, template_id: str): ) await db.runset.insert_one(d) + await publish_activities_task( + token, runset_id, runset_name, "runset", "runset_created" + ) return json(d, status=201) @@ -628,6 +662,9 @@ async def set_runset_state(request, id_: str): if not o: raise SanicException("runset not found", status_code=404) + if not token: + token = o.get("token") + doc = request.json # TODO: validators? state = doc.get("state") @@ -658,6 +695,10 @@ async def set_runset_state(request, id_: str): "state": state}}) o = await db.runset.find_one({"_id": id_}) + await publish_activities_task( + token, id_, o.get("name"), "runset", "runset_state_updated", + state=state + ) return json(o) @@ -790,16 +831,25 @@ async def update_runner(request, id_: str): if started_at is not None: sd.update({"started_at": started_at}) + runset_id = o["runset_id"] + runset = await db.runset.find_one({"_id": runset_id}) + token = runset["token"] if sd: - await db.runner.update_one( - {"_id": id_}, {'$set': sd}) + await db.runner.update_one({"_id": id_}, {"$set": sd}) + if state: + await publish_activities_task( + token, id_, o.get("name"), "runner", + "runner_state_updated", state=TaskState(sd["state"]).name + ) + if destroyed_at: + await publish_activities_task( + token, id_, o.get("name"), "runner", "runner_destroyed" + ) o = await db.runner.find_one({"_id": id_}) - runset_id = o["runset_id"] sd = dict() runners = [doc async for doc in db.runner.find({"runset_id": runset_id})] - runset = await db.runset.find_one({"_id": runset_id}) if not runset.get("started_at", 0): started = sorted( list(filter(lambda x: x.get("started_at", 0) != 0, runners)), @@ -820,15 +870,17 @@ async def update_runner(request, id_: str): # TODO: check usage if all(map(lambda x: x["state"] == TaskState.ERROR, runners)): - sd.update({"state": RunsetState.ERROR}) - elif (any(map(lambda x: x["state"] == TaskState.STARTED, runners)) and not - all(map(lambda x: x["state"] in [ - TaskState.STARTING_PREPARING, - TaskState.STARTING, - TaskState.WAITING_ARCEE, - TaskState.DESTROYING_SCHEDULED, - TaskState.DESTROYING], runners))): + elif any(map(lambda x: x["state"] == TaskState.STARTED, runners) + ) and not all(map( + lambda x: x["state"] in [ + TaskState.STARTING_PREPARING, + TaskState.STARTING, + TaskState.WAITING_ARCEE, + TaskState.DESTROYING_SCHEDULED, + TaskState.DESTROYING], + runners, + )): sd.update({"state": RunsetState.STARTED}) elif any(map(lambda x: x["state"] in [ TaskState.STARTING_PREPARING, @@ -842,13 +894,14 @@ async def update_runner(request, id_: str): TaskState.DESTROY_PREPARING ], runners)): sd.update({"state": RunsetState.STOPPING}) - elif (any(map(lambda x: x["state"] == TaskState.DESTROYED, runners)) and - not all(map(lambda x: x["state"] in [ - TaskState.STARTING_PREPARING, - TaskState.STARTING, - TaskState.WAITING_ARCEE, - TaskState.DESTROYING_SCHEDULED, - TaskState.DESTROYING], runners))): + elif any(map(lambda x: x["state"] == TaskState.DESTROYED, runners) + ) and not all(map(lambda x: x["state"] in [ + TaskState.STARTING_PREPARING, + TaskState.STARTING, + TaskState.WAITING_ARCEE, + TaskState.DESTROYING_SCHEDULED, + TaskState.DESTROYING], + runners)): sd.update({"state": RunsetState.STOPPED}) # log update state map logger.info( @@ -859,8 +912,13 @@ async def update_runner(request, id_: str): ) if sd: - await db.runset.update_one( - {"_id": runset_id}, {'$set': sd}) + await db.runset.update_one({"_id": runset_id}, {"$set": sd}) + if "state" in sd and sd["state"] != runset["state"]: + await publish_activities_task( + token, runset["_id"], runset.get("name"), + "runset", "runset_state_updated", + state=RunsetState(sd["state"]).name + ) return json(o) diff --git a/bulldozer/bulldozer_worker/tasks.py b/bulldozer/bulldozer_worker/tasks.py index f56de76d2..d82a563cd 100644 --- a/bulldozer/bulldozer_worker/tasks.py +++ b/bulldozer/bulldozer_worker/tasks.py @@ -2,7 +2,6 @@ import logging from bulldozer.bulldozer_worker.infra import Infra, InfraException -from bulldozer.bulldozer_worker.name_generator import NameGenerator from tools.optscale_time import utcnow_timestamp LOG = logging.getLogger(__name__) @@ -394,7 +393,7 @@ def _exec(self): LOG.info("processing starting runner %s", runner_id) _, runner = self.bulldozer_cl.get_runner(runner_id) cloud_account_id = runner["cloud_account_id"] - prefix = runner.get("name_prefix", "") + name = runner["name"] user_data = "" hp = runner.get("hyperparameters") commands = runner.get("commands") @@ -409,11 +408,9 @@ def _exec(self): user_data += f"export {k}={v}\n" if commands is not None: user_data += commands - name = f"{prefix}_{NameGenerator.get_random_name()}" self.bulldozer_cl.update_runner( runner_id, - state=TaskState.STARTING, - name=name) + state=TaskState.STARTING) _, cloud_account = self.rest_cl.cloud_account_get( cloud_account_id, True) # TODO: get cloud type form cloud account to support multi-cloud diff --git a/docker_images/keeper_executor/Dockerfile b/docker_images/keeper_executor/Dockerfile index 2a786d9d2..91da2a260 100644 --- a/docker_images/keeper_executor/Dockerfile +++ b/docker_images/keeper_executor/Dockerfile @@ -12,6 +12,7 @@ COPY optscale_client/rest_api_client optscale_client/rest_api_client COPY docker_images/keeper_executor/requirements.txt docker_images/keeper_executor/ RUN pip install --no-cache-dir -r docker_images/keeper_executor/requirements.txt -COPY docker_images/keeper_executor/worker.py docker_images/keeper_executor/ +COPY docker_images/keeper_executor/*.py docker_images/keeper_executor/ +COPY docker_images/keeper_executor/executors/*.py docker_images/keeper_executor/executors/ CMD ["python3", "docker_images/keeper_executor/worker.py"] diff --git a/docker_images/keeper_executor/__init__.py b/docker_images/keeper_executor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/docker_images/keeper_executor/events.py b/docker_images/keeper_executor/events.py new file mode 100644 index 000000000..f89a601bc --- /dev/null +++ b/docker_images/keeper_executor/events.py @@ -0,0 +1,542 @@ +import enum + + +class Events(enum.Enum): + NXXXX = [ + "Mock description", # description + [], # parameters + "INFO" # event level + ] + N0027 = [ + 'Organization {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0028 = [ + 'Organization {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0029 = [ + 'Organization {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0065 = [ + '{contact_type} alert for pool {pool_name}{with_subpools} for ' + '{warn_type} {threshold_string} deleted', + ['contact_type', 'pool_name', 'with_subpools', 'warn_type', + 'threshold_string'], + "INFO" + ] + N0066 = [ + 'Cloud account {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0067 = [ + 'Cloud account {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0068 = [ + 'Cloud account {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0069 = [ + 'Billing data import for cloud account {object_name} ' + '({cloud_account_id}) finished successfully', + ['object_name', 'cloud_account_id'], + "INFO" + ] + N0070 = [ + 'Billing data import for cloud account {object_name} ' + '({cloud_account_id}) failed: {error_reason}', + ['object_name', 'cloud_account_id', 'error_reason'], + "ERROR" + ] + N0071 = [ + 'Assignment request from {object_name} ({object_id}) to ' + '{approver_name} ({approver_id}) for resource {resource_name} ' + '({resource_cloud_res_id}) was accepted', + ['object_name', 'object_id', 'approver_name', 'approver_id', + 'resource_name', 'resource_cloud_res_id'], + "INFO" + ] + N0072 = [ + 'Assignment request from {object_name} ({object_id}) to ' + '{approver_name} ({approver_id}) for resource ' + '{resource_name} ({resource_cloud_res_id}) was declined', + ['object_name', 'object_id', 'approver_name', 'approver_id', + 'resource_name', 'resource_cloud_res_id'], + "INFO" + ] + N0076 = [ + 'Invalid assignment tag detected. {resource_type} {res_name} ' + '({cloud_resource_id}) moved to organization pool', + ['resource_type', 'res_name', 'cloud_resource_id'], + "INFO" + ] + N0079 = [ + 'Assignment Rules processing for {target} completed. {total} ' + 'resources have been processed', + ['target', 'total'], + "INFO" + ] + N0080 = [ + "{total} new resources discovered for cloud account " + "{object_name} ({object_id})", + ['total', 'object_name', 'object_id'], + "INFO" + ] + N0081 = [ + 'Rule applied: {rule_count} resources have been automatically ' + 'assigned to pool {pool_name} ({pool_id}) according to rule ' + '{object_name} ({object_id})', + ['rule_count', 'pool_name', 'pool_id', 'object_name', 'object_id'], + "INFO" + ] + N0082 = [ + 'Rule is disabled: {object_name} ({object_id}) points to the invalid ' + 'pair of pool {pool_name} ({pool_id}) and owner {owner_name} ' + '({owner_id}). Rule has been disabled, please fix and reenable it', + ['object_name', 'object_id', 'pool_id', 'pool_name', 'owner_name', + 'owner_id'], + "INFO" + ] + N0083 = [ + 'Assignment Rules have been forced to run by {user_display_name} ' + '({user_email}). Target is {target}', + ['user_display_name', 'user_email', 'target'], + "INFO" + ] + N0084 = [ + 'Recommendation {recommendation} dismissed for resource ' + '{object_name} ({object_id}) by {user_display_name} ({user_email})', + ['recommendation', 'object_name', 'object_id', 'user_display_name', + 'user_email'], + "INFO" + ] + N0085 = [ + 'Recommendation {recommendation} reactivated for resource ' + '{object_name} ({object_id}) by {user_display_name} ({user_email})', + ['recommendation', 'object_name', 'object_id', 'user_display_name', + 'user_email'], + "INFO" + ] + N0086 = [ + 'Rule {rule_name} ({rule_id}) created for pool {pool_name} ' + '({pool_id}) by {user_display_name} ({user_email})', + ['rule_name', 'rule_id', 'pool_name', 'pool_id', 'user_display_name', + 'user_email'], + "INFO" + ] + N0087 = [ + 'Rule {rule_name} ({rule_id}) deleted by {user_display_name} ' + '({user_email})', + ['rule_name', 'rule_id', 'user_display_name', 'user_email'], + "INFO" + ] + N0088 = [ + 'Rule {rule_name} ({rule_id}) updated by {user_display_name} ' + '({user_email})', + ['rule_name', 'rule_id', 'user_display_name', 'user_email'], + "INFO" + ] + N0089 = [ + 'Pool {object_name} has been deleted by {user_display_name} ' + '({user_email}). {res_count} resources have been moved to pool ' + '{new_pool_name}. {rules_cnt} rules have been redirected', + ['object_name', 'user_display_name', 'user_email', 'res_count', + 'new_pool_name', 'rules_cnt'], + "INFO" + ] + N0090 = [ + 'Pool {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0091 = [ + 'Pool {object_name} ({object_id}) updated with parameters: {params} ' + 'by {user_display_name} ({user_email})', + ['object_name', 'object_id', 'params', 'user_display_name', + 'user_email'], + "INFO" + ] + N0092 = [ + '{policy_type} policy for pool {pool_name} ({pool_id}) ' + 'enabled by {user_name} ({user_email})', + ['policy_type', 'pool_name', 'pool_id', 'user_name', 'user_email'], + "INFO" + ] + N0093 = [ + '{policy_type} policy for pool {pool_name} ({pool_id}) ' + 'disabled by {user_name} ({user_email})', + ['policy_type', 'pool_name', 'pool_id', 'user_name', 'user_email'], + "INFO" + ] + N0094 = [ + '{policy_type} policy for pool {pool_name} ({pool_id}) ' + 'created by {user_name} ({user_email})', + ['policy_type', 'pool_name', 'pool_id', 'user_name', 'user_email'], + "INFO" + ] + N0095 = [ + '{policy_type} policy for pool {pool_name} ({pool_id}) ' + 'deleted by {user_name} ({user_email})', + ['policy_type', 'pool_name', 'pool_id', 'user_name', 'user_email'], + "INFO" + ] + N0096 = [ + '{policy_type} policy for pool {object_name} ({object_id}) updated ' + 'with parameters: {params} by {user_display_name} ({user_email})', + ['policy_type', 'pool_name', 'pool_id', 'params', + 'user_display_name', 'user_email'], + "INFO" + ] + N0097 = [ + 'Employee {email} invited by {user_display_name} ({user_email}) with ' + 'roles: {scope_purposes}', + ['email', 'user_display_name', 'user_email', 'scope_purposes'], + "INFO" + ] + N0098 = [ + '{total_count} resources assigned to pool {object_name} ({object_id}) ' + 'to {employee_name} ({employee_id}) by {user_display_name} ' + '({user_email})', + ['total_count', 'object_name', 'object_id', 'employee_name', + 'employee_id', 'user_display_name', 'user_email'], + "INFO" + ] + N0099 = [ + '{constraint_type} constraint for resource {object_name} ' + '({object_id}) created by {user_display_name} ({user_email})', + ['constraint_type', 'object_name', 'object_id', 'user_display_name', + 'user_email'], + "INFO" + ] + N0100 = [ + '{constraint_type} constraint for resource {object_name} ' + '({object_id}) deleted by {user_display_name} ({user_email})', + ['constraint_type', 'object_name', 'object_id', 'user_display_name', + 'user_email'], + "INFO" + ] + N0101 = [ + '{constraint_type} constraint for resource {object_name} ' + '({object_id}) updated with parameters: {params} by ' + '{user_display_name} ({user_email})', + ['constraint_type', 'object_name', 'object_id', 'params', + 'user_display_name', 'user_email'], + "INFO" + ] + N0102 = [ + 'Cloud account {object_name} ({object_id}) capabilities may be ' + 'degraded: {reason}', + ['object_name', 'object_id', 'reason'], + "INFO" + ] + N0103 = [ + '{contact_type} alert for pool {pool_name}{with_subpools} for ' + '{warn_type} {threshold_string} created', + ['contact_type', 'pool_name', 'with_subpools', 'warn_type', + 'threshold_string'], + "INFO" + ] + N0104 = [ + 'Cluster types have been forced to run by {user_display_name} ' + '({user_email})', + ['user_display_name', 'user_email'], + "INFO" + ] + N0105 = [ + 'Cluster type applied: {clustered_resources_count} resources have ' + 'been automatically grouped to {clusters_count} clusters according ' + 'to cluster type {object_name} ({object_id})', + ['clustered_resources_count', 'clusters_count', 'object_name', + 'object_id'], + "INFO" + ] + N0106 = [ + 'Cluster types reassignment completed. {total} resources have been ' + 'processed', + ['total'], + "INFO" + ] + N0107 = [ + 'Cluster type {cluster_type_name} ({cluster_type_id}) deleted, ' + '{modified_count} resources has been automatically ungrouped', + ['cluster_type_name', 'cluster_type_id', 'modified_count'], + "INFO" + ] + N0108 = [ + "{total} new resources discovered for cloud account " + "{object_name} ({object_id}). {clustered} of them were " + "assembled into {clusters} clusters", + ['total', 'object_name', 'object_id', 'clustered', 'clusters'], + "INFO" + ] + N0109 = [ + 'Cost model changed. Expense recalculation for cloud account ' + '{object_name} ({cloud_account_id}) started', + ['object_name', 'cloud_account_id'], + "INFO" + ] + N0110 = [ + 'Expense recalculation for cloud account {object_name} ' + '({cloud_account_id}) completed successfully', + ['object_name', 'cloud_account_id'], + "INFO" + ] + N0111 = [ + 'Expense recalculation for cloud account {object_name} ' + '({cloud_account_id}) failed: {error_reason}', + ['object_name', 'cloud_account_id', 'error_reason'], + "INFO" + ] + N0113 = [ + 'Booking of the resource {object_name} ({object_id}) was changed by ' + '{user_display_name}', + ['object_name', 'object_id', 'user_display_name'], + "INFO" + ] + N0114 = [ + 'Booking of the resource {object_name} ({object_id}) was deleted by ' + '{user_display_name}', + ['object_name', 'object_id', 'user_display_name'], + "INFO" + ] + N0115 = [ + 'Resource {object_name} ({object_id}) has been released', + ['object_name', 'object_id'], + "INFO" + ] + N0116 = [ + "Unable to clean up calendar {calendar_id} events during " + "disconnection", + ['calendar_id'], + "WARNING" + ] + N0117 = [ + 'Calendar {calendar_id} connected', + ['calendar_id'], + "INFO" + ] + N0118 = [ + 'Calendar {calendar_id} disconnected', + ['calendar_id'], + "INFO" + ] + N0119 = [ + 'Calendar {calendar_id} synchronization warning: {reason}', + ['calendar_id', 'reason'], + "WARNING" + ] + N0120 = [ + 'Organization {object_name} ({object_id}) has been submitted for ' + 'technical audit by employee {employee_name} ({employee_id})', + ['object_name', 'object_id', 'employee_name', 'employee_id'], + "INFO" + ] + N0122 = [ + 'Shared environment {object_name} ({object_id}) has been {state}', + ['object_name', 'object_id', 'state'], + "INFO" + ] + N0123 = [ + 'Task {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0124 = [ + 'Task {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0125 = [ + 'Task {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0126 = [ + 'Metric {metric_name} ({metric_id}) {state} for task {object_name} ' + '({object_id})', + ['object_name', 'object_id', 'metric_name', 'metric_id', 'state'], + "INFO" + ] + N0127 = [ + 'Metric {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0128 = [ + 'Metric {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0129 = [ + 'Metric {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0130 = [ + 'Run {object_name} ({object_id}) started', + ['object_name', 'object_id'], + "INFO" + ] + N0131 = [ + 'Run {object_name} ({object_id}) failed', + ['object_name', 'object_id'], + "ERROR" + ] + N0132 = [ + 'Run {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0133 = [ + 'Run {object_name} ({object_id}) state changed to {state}', + ['object_name', 'object_id', 'state'], + "INFO" + ] + N0134 = [ + 'New executor on instance {object_id} for run {run_name} ({run_id}) ' + 'detected', + ['object_id', 'run_name', 'run_id'], + "INFO" + ] + N0135 = [ + 'Leaderboard template with id {object_id} for task {task_name} ' + '({task_id}) created', + ['object_id', 'task_name', 'task_id'], + "INFO" + ] + N0136 = [ + 'Leaderboard template with id {object_id} for task {task_name} ' + '({task_id}) deleted', + ['object_id', 'task_name', 'task_id'], + "INFO" + ] + N0137 = [ + 'Leaderboard template with id {object_id} for task {task_name} ' + '({task_id}) updated', + ['object_id', 'task_name', 'task_id'], + "INFO" + ] + N0138 = [ + 'Leaderboard {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0139 = [ + 'Leaderboard {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0140 = [ + 'Leaderboard {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0141 = [ + 'Dataset {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0142 = [ + 'Dataset {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0143 = [ + 'Dataset {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0144 = [ + 'Model {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0145 = [ + 'Model {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0146 = [ + 'Model {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0147 = [ + 'Model version {object_name} ({object_id}) detected for model ' + '{model_name} ({model_id})', + ['object_name', 'object_id', 'model_name', 'model_id'], + "INFO" + ] + N0148 = [ + 'Model version {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0149 = [ + 'Model version {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0150 = [ + 'Artifact {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0151 = [ + 'Artifact {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0152 = [ + 'Artifact {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0153 = [ + 'Runset template {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0154 = [ + 'Runset template {object_name} ({object_id}) deleted', + ['object_name', 'object_id'], + "INFO" + ] + N0155 = [ + 'Runset template {object_name} ({object_id}) updated', + ['object_name', 'object_id'], + "INFO" + ] + N0156 = [ + 'Runset {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0157 = [ + 'Runset {object_name} ({object_id}) changed state to {state}', + ['object_name', 'object_id', 'state'], + "DEBUG" + ] + N0158 = [ + 'Executor {object_name} ({object_id}) created', + ['object_name', 'object_id'], + "INFO" + ] + N0159 = [ + 'Executor {object_name} ({object_id}) changed state to {state}', + ['object_name', 'object_id', 'state'], + "DEBUG" + ] + N0160 = [ + 'Executor {object_name} ({object_id}) destroyed', + ['object_name', 'object_id'], + "INFO" + ] diff --git a/docker_images/keeper_executor/executors/__init__.py b/docker_images/keeper_executor/executors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/docker_images/keeper_executor/executors/base.py b/docker_images/keeper_executor/executors/base.py new file mode 100644 index 000000000..736161c92 --- /dev/null +++ b/docker_images/keeper_executor/executors/base.py @@ -0,0 +1,118 @@ +import hashlib +import logging +import requests +from optscale_client.auth_client.client_v2 import Client as AuthClient +from optscale_client.report_client.client_v2 import Client as ReportClient +from optscale_client.rest_api_client.client_v2 import Client as RestClient + +LOG = logging.getLogger(__name__) + + +class BaseEventExecutor: + def __init__(self, config_client): + self.config_cl = config_client + self._auth_cl = None + self._report_cl = None + self._rest_cl = None + + @property + def action_event_map(self): + return {} + + def routing_keys(self): + return list(self.action_event_map.keys()) + + @property + def auth_cl(self): + if not self._auth_cl: + self._auth_cl = AuthClient( + url=self.config_cl.auth_url(), + secret=self.config_cl.cluster_secret() + ) + return self._auth_cl + + @property + def rest_cl(self): + if self._rest_cl is None: + self._rest_cl = RestClient( + url=self.config_cl.restapi_url(), + secret=self.config_cl.cluster_secret(), + verify=False) + return self._rest_cl + + @property + def report_cl(self): + if not self._report_cl: + self._report_cl = ReportClient( + url=self.config_cl.keeper_url(), + secret=self.config_cl.cluster_secret() + ) + return self._report_cl + + def get_user_id(self, token): + user_digest = hashlib.md5(token.encode('utf-8')).hexdigest() + _, token_meta = self.auth_cl.token_meta_get([user_digest]) + return token_meta.get(user_digest, {}).get('user_id') + + def _get_user(self, token): + user_id = self.get_user_id(token) + if user_id is None: + return {} + self.auth_cl.token = token + _, user = self.auth_cl.user_get(user_id) + return user + + def _get_user_info(self, token): + if not token: + return None, None, None + user_info = {} + try: + user_info = self._get_user(token) + except requests.exceptions.HTTPError as ex: + LOG.exception("Service call error: %s", str(ex)) + user_id = user_info.get('id', None) + user_display_name = user_info.get('display_name', None) + user_email = user_info.get('email', None) + return user_id, user_display_name, user_email + + @staticmethod + def get_localized(localized_code: str, param_list: list, params: dict): + localized_tmp = localized_code + '(' + ','.join( + ['{' + param + '}' for param in param_list]) + ')' + return localized_tmp.format(**params) + + def push_event( + self, evt_class, organization_id, level='INFO', object_id=None, + object_type=None, object_name=None, description=None, + localized=None, ack=False, user_display_name=None, + user_id=None): + event = { + 'level': level, + 'evt_class': evt_class, + 'object_id': object_id, + 'object_type': object_type, + 'object_name': object_name, + 'organization_id': organization_id, + 'description': description, + 'localized': localized, + 'ack': ack, + 'initiator_id': user_id, + 'initiator_name': user_display_name, + } + self.report_cl.event_submit(**event) + + def _execute(self, event, body): + raise NotImplementedError + + def _custom_format_params(self, task): + return task + + def execute(self, task): + action = task.get('action') + event = self.action_event_map.get(action) + if not event: + LOG.error('Invalid action: %s', action) + return + task = self._custom_format_params(task) + event_params = self._execute(event, task) + self.push_event(**event_params) diff --git a/docker_images/keeper_executor/executors/main_events.py b/docker_images/keeper_executor/executors/main_events.py new file mode 100644 index 000000000..48f77a16c --- /dev/null +++ b/docker_images/keeper_executor/executors/main_events.py @@ -0,0 +1,105 @@ +import logging +from docker_images.keeper_executor.executors.base import BaseEventExecutor +from docker_images.keeper_executor.events import Events + +LOG = logging.getLogger(__name__) + + +class MainEventExecutor(BaseEventExecutor): + @property + def action_event_map(self): + return { + 'organization_created': Events.N0027, + 'organization_deleted': Events.N0028, + 'organization_updated': Events.N0029, + 'alert_removed': Events.N0065, + 'cloud_account_created': Events.N0066, + 'cloud_account_updated': Events.N0067, + 'cloud_account_deleted': Events.N0068, + 'report_import_completed': Events.N0069, + 'report_import_failed': Events.N0070, + 'assignment_request_accepted': Events.N0071, + 'assignment_request_declined': Events.N0072, + 'root_assigned_resource': Events.N0076, + 'rules_processing_completed': Events.N0079, + 'resources_discovered': Events.N0080, + 'rule_applied': Events.N0081, + 'rule_deactivated': Events.N0082, + 'rules_processing_started': Events.N0083, + 'recommendations_dismissed': Events.N0084, + 'recommendations_reactivated': Events.N0085, + 'rule_created': Events.N0086, + 'rule_deleted': Events.N0087, + 'rule_updated': Events.N0088, + 'pool_deleted': Events.N0089, + 'pool_created': Events.N0090, + 'pool_updated': Events.N0091, + 'policy_enabled': Events.N0092, + 'policy_disabled': Events.N0093, + 'policy_created': Events.N0094, + 'policy_deleted': Events.N0095, + 'policy_updated': Events.N0096, + 'employee_invited': Events.N0097, + 'resource_assigned': Events.N0098, + 'constraint_created': Events.N0099, + 'constraint_deleted': Events.N0100, + 'constraint_updated': Events.N0101, + 'cloud_account_warning': Events.N0102, + 'alert_added': Events.N0103, + 'cluster_types_processing_started': Events.N0104, + 'cluster_type_applied': Events.N0105, + 'cluster_types_processing_done': Events.N0106, + 'cluster_resources_deleted': Events.N0107, + 'resources_clustered_discovered': Events.N0108, + 'recalculation_started': Events.N0109, + 'recalculation_completed': Events.N0110, + 'recalculation_failed': Events.N0111, + 'shareable_resource_changed': Events.N0113, + 'shareable_resource_deleted': Events.N0114, + 'shareable_booking_released': Events.N0115, + 'calendar_warning': Events.N0116, + 'calendar_connected': Events.N0117, + 'calendar_disconnected': Events.N0118, + 'calendar_observer_warning': Events.N0119, + 'technical_audit_submit': Events.N0120, + 'env_power_mngmt': Events.N0122, + } + + def _execute(self, event, task): + action = task.get('action') + organization_id = task.get('organization_id') + object_id = task.get('object_id') + object_type = task.get('object_type') + required_params = [organization_id, object_id, object_type, action] + if any(map(lambda x: x is None, required_params)): + raise Exception('Invalid task received: {}'.format(task)) + LOG.info('Started processing for %s object: %s, object type: %s, ' + 'organization %s' % (action, object_id, object_type, + organization_id)) + meta = task.get('meta') or {} + token = meta.get('token') + user_id, user_display_name, user_email = self._get_user_info(token) + description_tmp, param_list, level = event.value + params = { + param: task.get(param) or meta.get(param) for param in param_list + } + params.update({ + 'user_display_name': user_display_name, + 'user_id': user_id, + 'user_email': user_email + }) + description = description_tmp.format(**params) + localized = self.get_localized(event.name, param_list, params) + return { + 'evt_class': action.upper(), + 'organization_id': organization_id, + 'object_id': object_id, + 'object_name': meta.get('object_name') or task.get('object_name'), + 'object_type': object_type, + 'description': description, + 'localized': localized, + 'level': level, + 'user_id': user_id, + 'user_display_name': user_display_name, + 'ack': meta.get('ack', False) + } diff --git a/docker_images/keeper_executor/executors/ml_events.py b/docker_images/keeper_executor/executors/ml_events.py new file mode 100644 index 000000000..79141f36d --- /dev/null +++ b/docker_images/keeper_executor/executors/ml_events.py @@ -0,0 +1,74 @@ +import logging +from docker_images.keeper_executor.events import Events +from docker_images.keeper_executor.executors.main_events import ( + MainEventExecutor +) + +LOG = logging.getLogger(__name__) + + +class MlEventsExecutor(MainEventExecutor): + @property + def action_event_map(self): + return { + 'task_created': Events.N0123, + 'task_deleted': Events.N0124, + 'task_updated': Events.N0125, + 'task_metric_updated': Events.N0126, + 'metric_created': Events.N0127, + 'metric_deleted': Events.N0128, + 'metric_updated': Events.N0129, + 'run_started': Events.N0130, + 'run_failed': Events.N0131, + 'run_deleted': Events.N0132, + 'run_state_updated': Events.N0133, + 'platform_created': Events.N0134, + 'leaderboard_template_created': Events.N0135, + 'leaderboard_template_deleted': Events.N0136, + 'leaderboard_template_updated': Events.N0137, + 'leaderboard_created': Events.N0138, + 'leaderboard_deleted': Events.N0139, + 'leaderboard_updated': Events.N0140, + 'dataset_created': Events.N0141, + 'dataset_deleted': Events.N0142, + 'dataset_updated': Events.N0143, + 'model_created': Events.N0144, + 'model_deleted': Events.N0145, + 'model_updated': Events.N0146, + 'model_version_created': Events.N0147, + 'model_version_deleted': Events.N0148, + 'model_version_updated': Events.N0149, + 'artifact_created': Events.N0150, + 'artifact_deleted': Events.N0151, + 'artifact_updated': Events.N0152, + 'runset_template_created': Events.N0153, + 'runset_template_deleted': Events.N0154, + 'runset_template_updated': Events.N0155, + 'runset_created': Events.N0156, + 'runset_state_updated': Events.N0157, + 'runner_created': Events.N0158, + 'runner_state_updated': Events.N0159, + 'runner_destroyed': Events.N0160 + } + + def _execute(self, event, task): + action = task.get('action') + object_id = task.get('object_id') + object_type = task.get('object_type') + required_params = [object_id, object_type, action] + if any(map(lambda x: x is None, required_params)): + raise Exception('Invalid task received: {}'.format(task)) + profiling_token = task.get('profiling_token') + infrastructure_token = task.get('infrastructure_token') + if infrastructure_token: + _, data = self.rest_cl.profiling_token_by_infrastructure_token_get( + infrastructure_token + ) + elif profiling_token: + _, data = self.rest_cl.profiling_token_info_get( + profiling_token + ) + else: + raise Exception('Invalid task received: {}'.format(task)) + task['organization_id'] = data['organization_id'] + return super()._execute(event, task) diff --git a/docker_images/keeper_executor/worker.py b/docker_images/keeper_executor/worker.py index c06858bca..7a7c6e4c1 100644 --- a/docker_images/keeper_executor/worker.py +++ b/docker_images/keeper_executor/worker.py @@ -1,9 +1,7 @@ #!/usr/bin/env python -import hashlib import os import time from threading import Thread -import requests from kombu.mixins import ConsumerMixin from kombu.log import get_logger from kombu.utils.debug import setup_logging @@ -11,9 +9,11 @@ import urllib3 from optscale_client.config_client.client import Client as ConfigClient -from optscale_client.rest_api_client.client_v2 import Client as RestClient -from optscale_client.report_client.client_v2 import Client as ReportClient -from optscale_client.auth_client.client_v2 import Client as AuthClient + +from docker_images.keeper_executor.executors.main_events import ( + MainEventExecutor +) +from docker_images.keeper_executor.executors.ml_events import MlEventsExecutor LOG = get_logger(__name__) @@ -24,7 +24,11 @@ ROUTING_KEYS = [ 'pool.#', 'employee.#', 'calendar_synchronization.#', 'cloud_account.#', 'organization.*', 'cluster_type.#', 'report_import.#', 'resource.#', - 'alert.action.added', 'alert.action.removed', 'rule.#', 'environment.#'] + 'alert.action.added', 'alert.action.removed', 'rule.#', 'environment.#', + 'task.*', 'metric.*', 'run.*', 'platform.*', 'leaderboard_template.*', + 'leaderboard.*', 'dataset.*', 'model.*', 'model_version.*', 'artifact.*', + 'runset_template.*', 'runset.*', 'runner.*' +] TASK_QUEUE = Queue(QUEUE_NAME, TASK_EXCHANGE, bindings=[ binding(TASK_EXCHANGE, routing_key=routing_key) for routing_key in ROUTING_KEYS]) @@ -34,592 +38,26 @@ class KeeperExecutorWorker(ConsumerMixin): def __init__(self, connection, config_client): self.connection = connection self.config_cl = config_client - self._auth_cl = None - self._rest_cl = None - self._report_cl = None - self._action_func_details_map = {} self.running = True self.thread = Thread(target=self.heartbeat) self.thread.start() - @property - def rest_cl(self): - if self._rest_cl is None: - self._rest_cl = RestClient( - url=self.config_cl.restapi_url(), - secret=self.config_cl.cluster_secret(), - verify=False) - return self._rest_cl - - @property - def report_cl(self): - if not self._report_cl: - self._report_cl = ReportClient( - url=self.config_cl.keeper_url(), - secret=self.config_cl.cluster_secret() - ) - return self._report_cl - - @property - def auth_cl(self): - if not self._auth_cl: - self._auth_cl = AuthClient( - url=self.config_cl.auth_url(), - secret=self.config_cl.cluster_secret() - ) - return self._auth_cl - - @property - def action_func_details_map(self): - if not self._action_func_details_map: - self._action_func_details_map = { - 'resource_assigned': ( - self.execute_event_base, - ('{total_count} resources assigned to pool ' - '{object_name} ({object_id}) to {employee_name} ' - '({employee_id}) by {user_display_name} ' - '({user_email})', 'N0089', - ['total_count', 'object_name', 'object_id', - 'employee_name', - 'employee_id', 'user_display_name', 'user_email'])), - 'assignment_request_accepted': ( - self.execute_event_base, - ('Assignment request from {object_name} ({object_id}) to ' - '{approver_name} ({approver_id}) for resource ' - '{resource_name} ({resource_cloud_res_id}) was accepted', - 'N0071', ['object_name', 'object_id', 'approver_name', - 'approver_id', 'resource_name', - 'resource_cloud_res_id'])), - 'assignment_request_declined': ( - self.execute_event_base, - ('Assignment request from {object_name} ({object_id}) to ' - '{approver_name} ({approver_id}) for resource ' - '{resource_name} ({resource_cloud_res_id}) was declined', - 'N0072', ['object_name', 'object_id', 'approver_name', - 'approver_id', 'resource_name', - 'resource_cloud_res_id'])), - 'calendar_warning': (self.execute_calendar_warning, None), - 'calendar_disconnected': ( - self.execute_event_base, - ('Calendar {calendar_id} disconnected', 'N0118', - ['calendar_id'])), - 'calendar_connected': ( - self.execute_event_base, - ('Calendar {calendar_id} connected', 'N0117', - ['calendar_id'])), - 'resources_discovered': (self.execute_resources_discovered, - None), - 'pool_created': ( - self.execute_event_base, - ('Pool {object_name} ({object_id}) created by ' - '{user_display_name} ({user_email})', 'N0090', - ['object_name', 'object_id', 'user_display_name', - 'user_email'])), - 'pool_deleted': ( - self.execute_event_base, - ('Pool {object_name} has been deleted by ' - '{user_display_name} ({user_email}). {res_count} ' - 'resources have been moved to pool {new_pool_name}. ' - '{rules_cnt} rules have been redirected.', 'N0089', - ['object_name', 'user_display_name', 'user_email', - 'res_count', 'new_pool_name', 'rules_cnt'])), - 'pool_updated': ( - self.execute_event_base, - ('Pool {object_name} ({object_id}) updated with ' - 'parameters: {params} by {user_display_name} ' - '({user_email})', 'N0091', - ['object_name', 'object_id', 'params', 'user_display_name', - 'user_email'])), - 'rules_processing_started': ( - self.execute_event_base, - ('Assignment Rules have been forced to run by ' - '{user_display_name} ({user_email}). Target is {target}.', - 'N0082', ['user_display_name', 'user_email', 'target'])), - 'rules_processing_completed': ( - self.execute_event_base, - ('Assignment Rules processing for {target} completed. ' - '{total} resources have been processed.', - 'N0082', ['target', 'total'])), - 'root_assigned_resource': ( - self.execute_event_base, - ('Invalid assignment tag detected. {resource_type} ' - '{res_name} ({cloud_resource_id}) moved to organization ' - 'pool', 'N0076', - ['resource_type', 'res_name', 'cloud_resource_id'])), - 'rule_created': ( - self.execute_event_base, - ('Rule {rule_name} ({rule_id}) created for pool ' - '{pool_name} ({pool_id}) by {user_display_name} ' - '({user_email}).', 'N0086', - ['rule_name', 'rule_id', 'pool_name', 'pool_id', - 'user_display_name', 'user_email'])), - 'rule_updated': ( - self.execute_event_base, - ('Rule {rule_name} ({rule_id}) updated by ' - '{user_display_name} ({user_email}).', - 'N0088', ['rule_name', 'rule_id', 'user_display_name', - 'user_email'])), - 'rule_deleted': ( - self.execute_event_base, - ('Rule {rule_name} ({rule_id}) deleted by ' - '{user_display_name} ({user_email}).', - 'N0087', ['rule_name', 'rule_id', 'user_display_name', - 'user_email'])), - 'rule_applied': ( - self.execute_event_base, - ('Rule applied: {rule_count} resources have been ' - 'automatically assigned to pool {pool_name} ({pool_id}) ' - 'according to rule {object_name} ({object_id})', 'N0081', - ['rule_count', 'pool_name', 'pool_id', 'object_name', - 'object_id'])), - 'rule_deactivated': ( - self.execute_event_base, - ('Rule is disabled: {object_name} ({object_id}) points to ' - 'the invalid pair of pool {pool_name} ({pool_id}) and ' - 'owner {owner_name} ({owner_id}). Rule has been disabled, ' - 'please fix and reenable it.', 'N0082', - ['object_name', 'object_id', 'pool_id', 'pool_name', - 'owner_name', 'owner_id'])), - 'cluster_types_processing_started': ( - self.execute_event_base, - ('Cluster types have been forced to run by ' - '{user_display_name} ({user_email}).', 'N0104', - ['user_display_name', 'user_email'])), - 'cluster_types_processing_done': ( - self.execute_event_base, - ('Cluster types reassignment completed. {total} resources ' - 'have been processed.', 'N0106', - ['total'])), - 'cluster_resources_deleted': ( - self.execute_event_base, - ('Cluster Type {cluster_type_name} ({cluster_type_id}) ' - 'deleted, {modified_count} resources has been ' - 'automatically ungrouped.', 'N0107', - ['cluster_type_name', 'cluster_type_id', - 'modified_count'])), - 'cluster_type_applied': ( - self.execute_event_base, - ('Cluster type applied: {clustered_resources_count} ' - 'resources have been automatically grouped to ' - '{clusters_count} clusters according to cluster type ' - '{object_name} ({object_id})', 'N0105', - ['clustered_resources_count', 'clusters_count', - 'object_name', 'object_id'])), - 'shareable_resource_deleted': ( - self.execute_event_base, - ('Booking of the resource {object_name} ({object_id}) was ' - 'deleted by {user_display_name}', - 'N0114', - ['object_name', 'object_id', 'user_display_name'])), - 'shareable_resource_changed': ( - self.execute_event_base, - ('Booking of the resource {object_name} ({object_id}) was ' - 'changed by {user_display_name}', - 'N0113', - ['object_name', 'object_id', 'user_display_name'])), - 'shareable_booking_released': ( - self.execute_event_base, - ('Resource {object_name} ({object_id}) has been released', - 'N0115', - ['object_name', 'object_id'])), - 'constraint_updated': ( - self.execute_event_base, - ('{constraint_type} constraint for resource {object_name} ' - '({object_id}) updated with parameters: {params} by ' - '{user_display_name} ({user_email})', - 'N0101', - ['constraint_type', 'object_name', 'object_id', 'params', - 'user_display_name', 'user_email'])), - 'constraint_deleted': ( - self.execute_event_base, - ('{constraint_type} constraint for resource {object_name} ' - '({object_id}) deleted by {user_display_name} ' - '({user_email})', 'N0100', - ['constraint_type', 'object_name', 'object_id', - 'user_display_name', 'user_email'])), - 'constraint_created': ( - self.execute_event_base, - ('{constraint_type} constraint for resource {object_name} ' - '({object_id}) created by {user_display_name} ' - '({user_email})', 'N0099', - ['constraint_type', 'object_name', 'object_id', - 'user_display_name', 'user_email'])), - 'policy_updated': ( - self.execute_event_base, - ('{policy_type} policy for pool {object_name} ' - '({object_id}) updated with parameters: {params} by ' - '{user_display_name} ({user_email})', 'N0096', - ['policy_type', 'object_name', 'object_id', 'params', - 'user_display_name', 'user_email'])), - 'policy_enabled': (self.execute_policy_action, None), - 'policy_disabled': (self.execute_policy_action, None), - 'policy_created': (self.execute_policy_action, None), - 'policy_deleted': (self.execute_policy_action, None), - 'recommendations_dismissed': ( - self.execute_event_base, - ('Recommendation {recommendation} dismissed for resource ' - '{object_name} ({object_id}) by {user_display_name} ' - '({user_email})', 'N0084', - ['recommendation', 'object_name', 'object_id', - 'user_display_name', 'user_email'])), - 'recommendations_reactivated': ( - self.execute_event_base, - ('Recommendation {recommendation} reactivated for resource ' - '{object_name} ({object_id}) by {user_display_name} ' - '({user_email})', 'N0085', - ['recommendation', 'object_name', 'object_id', - 'user_display_name', 'user_email'])), - 'recalculation_started': ( - self.execute_event_base, - ('Cost model changed. Expense recalculation for cloud ' - 'account {object_name} ({cloud_account_id}) started.', - 'N0109', ['object_name', 'cloud_account_id'])), - 'recalculation_completed': ( - self.execute_event_base, - ('Expense recalculation for cloud account {object_name} ' - '({cloud_account_id}) completed successfully.', 'N0110', - ['object_name', 'cloud_account_id'])), - 'recalculation_failed': ( - self.execute_event_base, - ('Expense recalculation for cloud account {object_name} ' - '({cloud_account_id}) failed: {error_reason}', 'N0111', - ['object_name', 'cloud_account_id', 'error_reason'])), - 'report_import_completed': ( - self.execute_event_base, - ('Billing data import for cloud account {object_name} ' - '({cloud_account_id}) finished successfully', 'N0069', - ['object_name', 'cloud_account_id'])), - 'report_import_failed': ( - self.execute_event_base, - ('Billing data import for cloud account {object_name} ' - '({cloud_account_id}) failed: {error_reason}', 'N0070', - ['object_name', 'cloud_account_id', 'error_reason'])), - 'cloud_account_warning': ( - self.execute_event_base, - ('Cloud account {object_name} ({object_id}) ' - 'capabilities may be degraded: {reason}', 'N0102', - ['object_name', 'object_id', 'reason'])), - 'cloud_account_created': ( - self.execute_event_base, - ('Cloud account {object_name} ({object_id}) created', - 'N0066', ['object_name', 'object_id'])), - 'cloud_account_updated': ( - self.execute_event_base, - ('Cloud account {object_name} ({object_id}) updated', - 'N0067', ['object_name', 'object_id'])), - 'cloud_account_deleted': ( - self.execute_event_base, - ('Cloud account {object_name} ({object_id}) deleted', - 'N0068', ['object_name', 'object_id'])), - 'employee_invited': ( - self.execute_event_base, - ('Employee {email} invited by {user_display_name} ' - '({user_email}) with roles: {scope_purposes}', 'N0097', - ['email', 'user_display_name', 'user_email', - 'scope_purposes'])), - 'organization_updated': ( - self.execute_event_base, - ('Organization {object_name} ({object_id}) updated', - 'N0029', ['object_name', 'object_id'])), - 'organization_created': ( - self.execute_event_base, - ('Organization {object_name} ({object_id}) created', - 'N0027', ['object_name', 'object_id'])), - 'organization_deleted': ( - self.execute_event_base, - ('Organization {object_name} ({object_id}) deleted', - 'N0028', ['object_name', 'object_id'])), - 'alert_added': (self.execute_alert_added_removed, None), - 'alert_removed': (self.execute_alert_added_removed, None), - 'technical_audit_submit': ( - self.execute_event_base, - ('Organization {object_name} ({object_id}) has been ' - 'submitted for technical audit by employee ' - '{employee_name} ({employee_id})', 'N0120', - ['object_name', 'object_id', 'employee_name', - 'employee_id'])), - 'env_power_mngmt': ( - self.execute_event_base, - ('Shared environment {object_name} ({object_id}) has been ' - '{state}', 'N0122', - ['object_name', 'object_id', 'state'])), - } - return self._action_func_details_map - def get_consumers(self, consumer, channel): return [consumer(queues=[TASK_QUEUE], accept=['json'], callbacks=[self.process_task], prefetch_count=10)] - def get_user_id(self, token): - user_digest = hashlib.md5(token.encode('utf-8')).hexdigest() - _, token_meta = self.auth_cl.token_meta_get([user_digest]) - return token_meta.get(user_digest, {}).get('user_id') - - def _get_user(self, token): - user_id = self.get_user_id(token) - if user_id is None: - return {} - self.auth_cl.token = token - _, user = self.auth_cl.user_get(user_id) - return user - - def _get_user_info(self, token): - if not token: - return None, None, None - user_info = {} - try: - user_info = self._get_user(token) - except requests.exceptions.HTTPError as ex: - LOG.exception("Service call error: %s", str(ex)) - user_id = user_info.get('id', None) - user_display_name = user_info.get('display_name', None) - user_email = user_info.get('email', None) - return user_id, user_display_name, user_email - - def _get_action_func_details_map(self, action): - return self.action_func_details_map.get(action) - - def execute_event( - self, evt_class, organization_id, level='INFO', object_id=None, - object_type=None, object_name=None, description=None, - localized=None, ack=False, token=None, user_display_name=None, - user_id=None): - if token and not user_id and not user_display_name: - user_id, user_display_name, user_email = self._get_user_info(token) - event = { - 'level': level, - 'evt_class': evt_class, - 'object_id': object_id, - 'object_type': object_type, - 'object_name': object_name, - 'organization_id': organization_id, - 'description': description, - 'localized': localized, - 'ack': ack, - 'initiator_id': user_id, - 'initiator_name': user_display_name, - } - self.report_cl.event_submit(**event) - - def execute_event_base( - self, action, organization_id, level='INFO', object_id=None, - object_type=None, object_name=None, ack=False, token=None, - meta=None): - action_value = self._get_action_func_details_map(action) - if not action_value: - LOG.error('Invalid action: %s', action) - return - _, (description_tmp, localized_code, param_list) = action_value - user_id, user_display_name, user_email = self._get_user_info(token) - if not meta: - meta = {} - meta['organization_id'] = organization_id - meta['object_id'] = object_id - meta['object_type'] = object_type - meta['user_id'] = user_id - meta['user_display_name'] = user_display_name - meta['user_email'] = user_email - params = {param: meta.get(param) for param in param_list} - description = description_tmp.format(**params) - localized_tmp = localized_code + '(' + ','.join( - ['{' + param + '}' for param in param_list]) + ')' - localized = localized_tmp.format(**params) - self.execute_event( - action.upper(), organization_id, object_id=object_id, - object_type=object_type, object_name=object_name, - description=description, localized=localized, level=level, - token=token, user_id=user_id, user_display_name=user_display_name, - ack=ack) - - def execute_resources_discovered( - self, action, organization_id, level='INFO', object_id=None, - object_type=None, object_name=None, ack=False, token=None, - meta=None): - stat = meta.get('stat') - fmt_args = { - 'discovered': stat['total'], - 'cloud_acc_id': object_id, - 'cloud_acc_name': object_name - } - desc = ("{discovered} new resources discovered for cloud account " - "{cloud_acc_name} ({cloud_acc_id})") - localized = 'N0080({discovered},{cloud_acc_name},{cloud_acc_id})' - if stat['clusters']: - fmt_args.update({ - 'clusters': len(stat['clusters']), - 'clustered': stat['clustered'] - }) - desc += ('. {clustered} of them were assembled into {clusters} ' - 'clusters') - localized = ('N0080({discovered},{cloud_acc_name},{cloud_acc_id}' - ',{clustered},{clusters})') - - self.execute_event( - action.upper(), organization_id, object_id=object_id, - object_type=object_type, object_name=object_name, - description=desc.format(**fmt_args), - localized=localized.format(**fmt_args), ack=ack, token=token, - level=level) - - def execute_calendar_warning( - self, action, organization_id, level='WARNING', object_id=None, - object_type=None, object_name=None, ack=False, token=None, - meta=None): - is_observer_message = meta.pop('is_observer', False) - calendar_id = meta.pop('calendar_id', None) - reason = meta.pop('reason', None) - if is_observer_message: - description = ('Calendar {calendar_id} synchronization warning: ' - '{reason}'.format(calendar_id=calendar_id, - reason=reason)) - localized = 'N0119({calendar_id}, {reason})'.format( - calendar_id=calendar_id, reason=reason) + @staticmethod + def get_executor_class(task): + if 'profiling_token' in task or 'infrastructure_token' in task: + executor_class = MlEventsExecutor else: - description = ("Unable to clean up calendar %s " - "events during disconnection" % calendar_id) - if reason is not None: - description = '%s - %s' % (description, reason) - localized = 'N0116({calendar_id})'.format(calendar_id=calendar_id) - self.execute_event( - action.upper(), organization_id, level=level, object_id=object_id, - object_type=object_type, description=description, - localized=localized, token=token, ack=ack) - - def execute_policy_action( - self, action, organization_id, level='INFO', object_id=None, - object_type=None, object_name=None, ack=False, token=None, - meta=None): - localized_code = { - 'policy_enabled': 'N0092', - 'policy_disabled': 'N0093', - 'policy_created': 'N0094', - 'policy_deleted': 'N0095' - }.get(action) - if not localized_code: - LOG.error('Invalid action: %s', action) - return - action_type = action.split('_', 1)[1] - policy_type = meta.pop('policy_type', None) - user_id, user_display_name, user_email = self._get_user_info(token) - description = ('{policy_type} policy for pool {pool_name} ({pool_id}) ' - '{action_type} by {user_name} ({user_email})').format( - policy_type=policy_type, pool_name=object_name, pool_id=object_id, - action_type=action_type, user_name=user_display_name, - user_email=user_email) - localized = ('{localized_code}({policy_type},{pool_name},{pool_id},' - '{action_type},{user_name},{user_email})').format( - localized_code=localized_code, policy_type=policy_type, - pool_name=object_name, pool_id=object_id, - action_type=action_type, user_name=user_display_name, - user_email=user_email) - self.execute_event( - action.upper(), organization_id, level=level, object_id=object_id, - object_type=object_type, object_name=object_name, - description=description, localized=localized, token=token, - user_id=user_id, user_display_name=user_display_name, ack=ack) - - def execute_alert_added_removed( - self, action, organization_id, level='INFO', object_id=None, - object_type=None, object_name=None, ack=False, token=None, - meta=None): - action_map = { - 'alert_added': 'created', - 'alert_removed': 'deleted' - } - alert = meta.pop('alert', None) - pool_name = meta.pop('pool_name', None) - with_subpools = meta.pop('with_subpools', None) - warn_type = meta.pop('warn_type', None) - threshold = str(alert['threshold']) - if alert['threshold_type'] == 'percentage': - threshold += '%' - action_value = action_map.get(action) - if not action_value: - LOG.error('Invalid action: %s', action) - return - contact_type = [] - if any(bool( - contact['slack_channel_id']) for contact in alert['contacts']): - contact_type.append('slack') - if any(bool( - contact['employee_id']) for contact in alert['contacts']): - contact_type.append('email') - contact_type = "/".join(contact_type) - user_id, user_display_name, user_email = self._get_user_info(token) - params = { - 'username': user_display_name, - 'email': user_email, - 'action': action_value, - 'contact_type': contact_type, - 'pool_name': pool_name, - 'with_subpools': with_subpools, - 'warn_type': warn_type - } - if warn_type in ['constraint violation', 'environment changes']: - description = ( - '{username}({email}) has {action} {contact_type} alert ' - 'with {warn_type} type for pool {pool_name}{with_subpools}' - ).format(**params) - localized = ( - 'N0121({username}, {email}, {action}, {contact_type},' - '{warn_type}, {pool_name}, {with_subpools})').format(**params) - elif warn_type in ['expenses', 'forecast']: - params.update({'threshold': threshold}) - description = ( - '{username}({email}) has {action} {contact_type} alert ' - 'for pool {pool_name}{with_subpools}, with {warn_type} ' - 'threshold {threshold} of pool limit').format(**params) - localized = ( - 'N0103({username}, {email}, {action}, {contact_type},' - '{pool_name}, {with_subpools}, {warn_type}, {threshold})' - ).format(**params) - else: - LOG.error('Invalid warn_type: %s', warn_type) - return - self.execute_event( - ('ALERT_' + action_value).upper(), organization_id, level=level, - object_id=object_id, object_type=object_type, - object_name=object_name, description=description, - localized=localized, token=token, user_id=user_id, - user_display_name=user_display_name, ack=ack) - - def execute(self, task): - organization_id = task.get('organization_id') - action = task.get('action') - object_id = task.get('object_id') - object_type = task.get('object_type') - meta = task.get('meta', {}) - object_name = meta.get('object_name', None) - level = meta.get('level', 'INFO') - ack = meta.get('ack', False) - token = meta.get('token', None) - task_params = { - 'action': action, - 'organization_id': organization_id, - 'level': level, - 'object_id': object_id, - 'object_type': object_type, - 'object_name': object_name, - 'ack': ack, - 'token': token, - 'meta': meta - } - required_params = ['organization_id', 'object_id', 'object_type', - 'action', 'level', 'ack'] - if any(map(lambda x: task_params.get(x) is None, required_params)): - raise Exception('Invalid task received: {}'.format(task)) - action_func, _ = (self._get_action_func_details_map(action) or - (None, None)) - LOG.info('Started processing for object %s task type for %s ' - 'for organization %s' % (object_id, action, organization_id)) - if not action_func: - LOG.warning('Unknown action type: %s. Skipping' % action) - return - action_func(**task_params) + executor_class = MainEventExecutor + return executor_class def process_task(self, body, message): try: - self.execute(body) + executor = self.get_executor_class(body) + executor(self.config_cl).execute(body) except Exception as exc: LOG.exception('Processing task failed: %s', str(exc)) message.ack() @@ -631,7 +69,9 @@ def heartbeat(self): if __name__ == '__main__': - urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) + urllib3.disable_warnings( + category=urllib3.exceptions.InsecureRequestWarning + ) debug = os.environ.get('DEBUG', False) log_level = 'DEBUG' if debug else 'INFO' setup_logging(loglevel=log_level, loggers=['']) diff --git a/docker_images/slacker_executor/worker.py b/docker_images/slacker_executor/worker.py index 9cb234b5f..ffa1de95b 100644 --- a/docker_images/slacker_executor/worker.py +++ b/docker_images/slacker_executor/worker.py @@ -403,8 +403,7 @@ def execute_constraint_violated(self, organization_id, object_id, def execute_alert_added_removed(self, organization_id, alert_id, action, object_type, meta): _, organization = self.rest_cl.organization_get(organization_id) - alert = meta.get('alert', {}) - pool_id = meta.get('alert', {}).get('pool_id') + pool_id = meta.get('pool_id') _, pool = self.rest_cl.pool_get(pool_id) params = { 'pool_name': pool['name'], @@ -417,12 +416,12 @@ def execute_alert_added_removed(self, organization_id, alert_id, action, 'currency': organization['currency'] } for p in ['based', 'threshold', 'threshold_type', 'include_children']: - params[p] = alert.get(p) + params[p] = meta.get(p) - for contact in alert['contacts']: + for contact in meta['contacts']: if contact.get('slack_channel_id'): warning_params = self.get_warning_params( - alert, pool, organization, contact['slack_channel_id']) + meta, pool, organization, contact['slack_channel_id']) self.send( ACTION_MSG_MAP.get(action), params, contact['slack_channel_id'], contact['slack_team_id'], diff --git a/ngui/server/api/keeper/client.ts b/ngui/server/api/keeper/client.ts index 021d020ab..8c89f4e38 100644 --- a/ngui/server/api/keeper/client.ts +++ b/ngui/server/api/keeper/client.ts @@ -18,17 +18,29 @@ class KeeperClient extends BaseClient { descriptionLike: "description_like", }; - // This is temporary. All URL parameters must be strings. - // Mapping will be done elsewhere, not clear how at this point. - Object.entries(requestParams).forEach(([key, value]) => { - const stringValue = value.toString(); + const appendParameter = (key, value) => { + const parameterName = paramsMapping[key]; - const mappedParam = paramsMapping[key]; + if (parameterName) { + params.append(parameterName, value); + } else { + params.append(key, value); + } + }; - if (mappedParam) { - params.append(mappedParam, stringValue); + /** + * This is temporary + * Mapping will be done elsewhere, not clear how at this point + */ + Object.entries(requestParams).forEach(([key, value]) => { + if (Array.isArray(value) && value.length > 0) { + value.forEach((datum) => { + const stringValue = datum.toString(); + appendParameter(key, stringValue); + }); } else { - params.append(key, stringValue); + const stringValue = value.toString(); + appendParameter(key, stringValue); } }); diff --git a/ngui/server/graphql/resolvers/keeper.generated.ts b/ngui/server/graphql/resolvers/keeper.generated.ts index 51f0cdbd0..09a552c46 100644 --- a/ngui/server/graphql/resolvers/keeper.generated.ts +++ b/ngui/server/graphql/resolvers/keeper.generated.ts @@ -33,11 +33,18 @@ export type Event = { time?: Maybe; }; +export enum EventLevel { + Debug = 'DEBUG', + Error = 'ERROR', + Info = 'INFO', + Warning = 'WARNING' +} + export type EventsRequestParams = { descriptionLike?: InputMaybe; includeRead?: InputMaybe; lastId?: InputMaybe; - level?: InputMaybe; + level?: InputMaybe>; limit?: InputMaybe; readOnGet?: InputMaybe; timeEnd?: InputMaybe; @@ -128,6 +135,7 @@ export type DirectiveResolverFn; Event: ResolverTypeWrapper; + EventLevel: EventLevel; EventsRequestParams: EventsRequestParams; ID: ResolverTypeWrapper; Int: ResolverTypeWrapper; diff --git a/ngui/server/graphql/schemas/keeper.graphql b/ngui/server/graphql/schemas/keeper.graphql index 1a47fda72..6b1a10074 100644 --- a/ngui/server/graphql/schemas/keeper.graphql +++ b/ngui/server/graphql/schemas/keeper.graphql @@ -14,10 +14,17 @@ type Event { acknowledged_user: String } +enum EventLevel { + INFO + WARNING + ERROR + DEBUG +} + # TODO: circle back on types - required types must be exclamated (see organizationId below and generated resolversTypes) input EventsRequestParams { limit: Int = 80 - level: String + level: [EventLevel!] timeStart: Int timeEnd: Int lastId: String diff --git a/ngui/ui/src/components/Events/Events.tsx b/ngui/ui/src/components/Events/Events.tsx index 1b8ea75ea..51823df9c 100644 --- a/ngui/ui/src/components/Events/Events.tsx +++ b/ngui/ui/src/components/Events/Events.tsx @@ -1,13 +1,15 @@ import { useEffect, useState, useMemo } from "react"; import ErrorIcon from "@mui/icons-material/Error"; import InfoIcon from "@mui/icons-material/Info"; -import { Stack } from "@mui/material"; +import PestControlIcon from "@mui/icons-material/PestControl"; +import { FormControlLabel, Stack } from "@mui/material"; import Box from "@mui/material/Box"; import CircularProgress from "@mui/material/CircularProgress"; import Typography from "@mui/material/Typography"; import { FormattedMessage } from "react-intl"; import Accordion from "components/Accordion"; import ActionBar from "components/ActionBar"; +import Checkbox from "components/Checkbox"; import { getBasicRangesSet } from "components/DateRangePicker/defaults"; import LinearSelector from "components/LinearSelector"; import PageContentWrapper from "components/PageContentWrapper"; @@ -120,10 +122,11 @@ const EventIcon = ({ eventLevel }) => ({ [EVENT_LEVEL.INFO]: , [EVENT_LEVEL.WARNING]: , - [EVENT_LEVEL.ERROR]: + [EVENT_LEVEL.ERROR]: , + [EVENT_LEVEL.DEBUG]: })[eventLevel]; -const Events = ({ eventLevel, descriptionLike, onScroll, applyFilter, events, isLoading = false }) => { +const Events = ({ eventLevel, includeDebugEvents, descriptionLike, onScroll, applyFilter, events, isLoading = false }) => { const [expanded, setExpanded] = useState(""); const queryParams = getQueryParams(); @@ -234,7 +237,7 @@ const Events = ({ eventLevel, descriptionLike, onScroll, applyFilter, events, is - {event.description} + {`${formatUTC(event.time, EN_FULL_FORMAT)} | ${event.description}`} {getAccordionContent(event)} @@ -269,8 +272,22 @@ const Events = ({ eventLevel, descriptionLike, onScroll, applyFilter, events, is - - + + + { + applyFilter({ + includeDebugEvents: !includeDebugEvents + }); + }} + /> + } + label={} + /> { timeStart, timeEnd, lastId, - descriptionLike + descriptionLike, + includeDebugEvents = false } = getQueryParams() as Partial<{ level: keyof typeof EVENT_LEVEL; timeStart: string; timeEnd: string; lastId: string; descriptionLike: string; + includeDebugEvents: string | boolean; }>; // Undefined query parameters are ignored in the API calls @@ -43,19 +46,31 @@ const EventsContainer = () => { timeStart: timeStart === undefined ? timeStart : Number(timeStart), timeEnd: timeEnd === undefined ? timeEnd : Number(timeEnd), lastId, - descriptionLike + descriptionLike, + includeDebugEvents: Boolean(includeDebugEvents) }); const [events, setEvents] = useState([]); + const getLevelParameter = () => { + const levels = + requestParams.level === EVENT_LEVEL.ALL + ? [EVENT_LEVEL.INFO, EVENT_LEVEL.WARNING, EVENT_LEVEL.ERROR] + : [requestParams.level]; + + return includeDebugEvents ? [...levels, EVENT_LEVEL.DEBUG] : levels; + }; + const { loading } = useQuery(GET_EVENTS, { variables: { organizationId, requestParams: { - ...requestParams, + timeStart: requestParams.timeStart, + timeEnd: requestParams.timeEnd, + lastId: requestParams.lastId, + descriptionLike: requestParams.descriptionLike, limit: EVENTS_LIMIT, - // The events API doesn't support the "ALL" level string, so we need to use the "undefined" in order to get a list of all events - level: requestParams.level === EVENT_LEVEL.ALL ? undefined : requestParams.level + level: getLevelParameter() } }, onCompleted: (data) => { @@ -68,7 +83,8 @@ const EventsContainer = () => { level: newFilterParams.level ?? requestParams.level, timeStart: newFilterParams.timeStart ?? requestParams.timeStart, timeEnd: newFilterParams.timeEnd ?? requestParams.timeEnd, - descriptionLike: newFilterParams.descriptionLike ?? requestParams.descriptionLike + descriptionLike: newFilterParams.descriptionLike ?? requestParams.descriptionLike, + includeDebugEvents: newFilterParams.includeDebugEvents ?? requestParams.includeDebugEvents }; const areParamsDifferent = (Object.keys(newRequestParams) as FilterNames[]).some( @@ -104,6 +120,7 @@ const EventsContainer = () => { return ( [^/]+)", 'tags_collection': r"%s/organizations/(?P[^/]+)/" r"tasks/(?P[^/]+)/tags", - 'restore_password': r"%s/restore_password" + 'restore_password': r"%s/restore_password", + 'profiling_token_info': r"%s/profiling_tokens/(?P[^/]+)", }) diff --git a/rest_api/rest_api_server/controllers/base.py b/rest_api/rest_api_server/controllers/base.py index 91f01c883..c76a75f5e 100644 --- a/rest_api/rest_api_server/controllers/base.py +++ b/rest_api/rest_api_server/controllers/base.py @@ -363,7 +363,12 @@ def publish_activities_task(self, organization_id, object_id, object_type, if add_token and self.token: if not meta: meta = {} - meta.update({'token': self.token}) + user_info = self.get_user_info() + meta.update({ + 'user_display_name': user_info.get('display_name'), + 'user_email': user_info.get('email'), + 'user_id': user_info.get('id') + }) task = { 'organization_id': organization_id, 'object_id': object_id, diff --git a/rest_api/rest_api_server/controllers/calendar_observer.py b/rest_api/rest_api_server/controllers/calendar_observer.py index fb2473013..1cf61934b 100644 --- a/rest_api/rest_api_server/controllers/calendar_observer.py +++ b/rest_api/rest_api_server/controllers/calendar_observer.py @@ -92,8 +92,8 @@ def observe(self, organization_id): } self.publish_activities_task( calendar_sync.organization_id, calendar_sync.id, - 'calendar_synchronization', 'calendar_warning', - meta, 'calendar_synchronization.calendar_warning', + 'calendar_synchronization', 'calendar_observer_warning', + meta, 'calendar_synchronization.calendar_observer_warning', add_token=True) def is_deleted_event(self, event): diff --git a/rest_api/rest_api_server/controllers/cloud_resource.py b/rest_api/rest_api_server/controllers/cloud_resource.py index e2fb19782..0ea27b208 100644 --- a/rest_api/rest_api_server/controllers/cloud_resource.py +++ b/rest_api/rest_api_server/controllers/cloud_resource.py @@ -963,12 +963,15 @@ def save_bulk(self, cloud_account_id, resources, behavior, return_resources, cloud_account = cloud_account_map[acc_id] meta = { 'object_name': cloud_account.name, - 'stat': stat + **stat } + action = 'resources_discovered' + if meta.get('clusters'): + action = 'resources_clustered_discovered' self.publish_activities_task( cloud_account.organization_id, cloud_account.id, - 'cloud_account', 'resources_discovered', meta, - 'cloud_account.resources_discovered', add_token=True) + 'cloud_account', action, meta, 'cloud_account.' + action, + add_token=True) if return_resources: resources = self.get_resources_by_hash_or_id( diff --git a/rest_api/rest_api_server/controllers/invite.py b/rest_api/rest_api_server/controllers/invite.py index ca0d50de1..ff196ad8f 100644 --- a/rest_api/rest_api_server/controllers/invite.py +++ b/rest_api/rest_api_server/controllers/invite.py @@ -136,12 +136,14 @@ def get_highest_role(current, new): if show_link: invite_dict['url'] = invite_url self.send_notification( - email, invite_url, organization.name, organization.id, organization.currency) + email, invite_url, organization.name, organization.id, + organization.currency) meta = { 'object_name': organization.name, 'email': email, - 'scope_purposes': ', '.join('%s: %s' % (k, v) - for k, v in scope_name_role_map.items()) + 'scope_purposes': ', '.join( + '%s: %s' % (k, v.split('_')[-1]) + for k, v in scope_name_role_map.items()) } self.publish_activities_task( diff --git a/rest_api/rest_api_server/controllers/pool.py b/rest_api/rest_api_server/controllers/pool.py index 55c9713d8..bd42d9846 100644 --- a/rest_api/rest_api_server/controllers/pool.py +++ b/rest_api/rest_api_server/controllers/pool.py @@ -152,7 +152,7 @@ def create(self, auto_extension=False, **kwargs): } self.publish_activities_task( pool.organization_id, pool.id, 'pool', - 'pool_created', meta, 'pool.pool_created', add_token=True) + 'pool_created', meta, 'pool.pool_created') return pool def edit(self, item_id, auto_extension=False, **kwargs): diff --git a/rest_api/rest_api_server/controllers/pool_alert.py b/rest_api/rest_api_server/controllers/pool_alert.py index 9813af6f1..42bf1e193 100644 --- a/rest_api/rest_api_server/controllers/pool_alert.py +++ b/rest_api/rest_api_server/controllers/pool_alert.py @@ -8,13 +8,18 @@ WrongArgumentsException) from rest_api.rest_api_server.controllers.pool import PoolController from rest_api.rest_api_server.exceptions import Err -from rest_api.rest_api_server.models.enums import ThresholdTypes, ThresholdBasedTypes -from rest_api.rest_api_server.models.models import (PoolAlert, Pool, AlertContact, - Organization) -from rest_api.rest_api_server.controllers.base import (BaseController, - BaseHierarchicalController) +from rest_api.rest_api_server.models.enums import (ThresholdTypes, + ThresholdBasedTypes) +from rest_api.rest_api_server.models.models import ( + PoolAlert, Pool, AlertContact, Organization +) +from rest_api.rest_api_server.controllers.base import ( + BaseController, BaseHierarchicalController +) from rest_api.rest_api_server.controllers.employee import EmployeeController -from rest_api.rest_api_server.controllers.base_async import BaseAsyncControllerWrapper +from rest_api.rest_api_server.controllers.base_async import ( + BaseAsyncControllerWrapper +) LOG = logging.getLogger(__name__) NOTIFICATION_INTERVAL = 43200 # 12 hours @@ -51,7 +56,9 @@ def process_alerts(self, organization_id): ).get_pool_hierarchy_costs(organization.pool_id) def get_pools_children_ids(pool_id): - children = {b_id for b_id in pool_limit_costs.get(pool_id)['children']} + children = { + b_id for b_id in pool_limit_costs.get(pool_id)['children'] + } for child in children.copy(): children.update(get_pools_children_ids(child)) return children @@ -151,14 +158,30 @@ def get_alert_task_meta(self, alert, user_info): with_subpools = ' with sub-pools' alert['based'] = alert_based_value alert['threshold_type'] = alert['threshold_type'].value + if alert['threshold_type'] == 'percentage': + alert['threshold'] = str(alert['threshold']) + '%' + contact_type = [] + if any(bool( + contact['slack_channel_id']) for contact in + alert['contacts']): + contact_type.append('slack') + if any(bool( + contact['employee_id']) for contact in alert['contacts']): + contact_type.append('email') + contact_type = "/".join(contact_type) + alert['contact_type'] = contact_type[0].upper() + contact_type[1:] + threshold_string = "" + if warn_type in ['forecast', 'expenses']: + threshold_string = 'threshold %s of pool limit' % alert['threshold'] + alert['threshold_string'] = threshold_string return { 'initiator_name': user_info['display_name'], 'initiator_email': user_info['email'], - 'alert': alert, 'pool_name': pool_name, 'with_subpools': with_subpools, 'warn_type': warn_type, - 'object_name': 'alert(%s)' % alert['id'] + 'object_name': 'alert(%s)' % alert['id'], + **alert } def create(self, **kwargs): @@ -171,7 +194,7 @@ def create(self, **kwargs): meta = self.get_alert_task_meta(alert.to_dict(), user_info) self.publish_activities_task( alert.pool.organization_id, alert.to_dict()['id'], 'pool_alert', - 'alert_added', meta, 'alert.action.added', add_token=True) + 'alert_added', meta, 'alert.action.added') return alert def delete(self, item_id): @@ -198,7 +221,7 @@ def delete(self, item_id): meta = self.get_alert_task_meta(alert_dict, user_info) self.publish_activities_task( alert.pool.organization_id, alert_dict['id'], 'pool_alert', - 'alert_removed', meta, 'alert.action.removed', add_token=True) + 'alert_removed', meta, 'alert.action.removed') def edit(self, item_id, **kwargs): alert = self.get(item_id) diff --git a/rest_api/rest_api_server/controllers/pool_policy.py b/rest_api/rest_api_server/controllers/pool_policy.py index 6ed7168ed..81f9567c0 100644 --- a/rest_api/rest_api_server/controllers/pool_policy.py +++ b/rest_api/rest_api_server/controllers/pool_policy.py @@ -32,12 +32,13 @@ def create(self, **kwargs): policy = super().create(**kwargs) pool = policy.pool meta = { - 'object_name': pool.name, + 'pool_name': pool.name, + 'pool_id': pool.id, 'policy_type': policy.type.value } action = 'policy_created' self.publish_activities_task( - pool.organization_id, pool.id, 'pool', action, meta, + pool.organization_id, pool.id, 'pool_policy', action, meta, 'pool.{action}'.format(action=action), add_token=True) return policy @@ -45,37 +46,41 @@ def delete(self, item_id): policy = super().delete(item_id) pool = policy.pool meta = { - 'object_name': pool.name, + 'pool_name': pool.name, + 'pool_id': pool.id, 'policy_type': policy.type.value } action = 'policy_deleted' self.publish_activities_task( - pool.organization_id, pool.id, 'pool', action, meta, + pool.organization_id, pool.id, 'pool_policy', action, meta, 'pool.{action}'.format(action=action), add_token=True) return policy def edit(self, item_id, **kwargs): policy = self.get(item_id) policy_dict = policy.to_dict() - updates = {k: v for k, v in kwargs.items() if policy_dict.get(k) != kwargs[k]} + updates = {k: v for k, v in kwargs.items() + if policy_dict.get(k) != kwargs[k]} upd_policy = super().edit(item_id, **kwargs) pool = policy.pool meta = { 'object_name': pool.name, - 'policy_type': policy.type.value + 'policy_type': policy.type.value, + 'pool_id': pool.id, + 'pool_name': pool.name } if upd_policy and upd_policy.active != policy_dict['active']: updates.pop('active', None) action_type = 'enabled' if upd_policy.active else 'disabled' action = 'policy_{action_type}'.format(action_type=action_type) self.publish_activities_task( - pool.organization_id, pool.id, 'pool', action, meta, + pool.organization_id, pool.id, 'pool_policy', action, meta, 'pool.{action}'.format(action=action), add_token=True) if updates: meta.update({'params': ', '.join( ['%s: %s' % (k, v) for k, v in updates.items()])}) self.publish_activities_task( - pool.organization_id, pool.id, 'pool', + pool.organization_id, pool.id, 'pool_policy', 'policy_updated', meta, 'pool.policy_updated', add_token=True) return upd_policy diff --git a/rest_api/rest_api_server/controllers/profiling/profiling_token.py b/rest_api/rest_api_server/controllers/profiling/profiling_token.py index de028d201..5d5e32566 100644 --- a/rest_api/rest_api_server/controllers/profiling/profiling_token.py +++ b/rest_api/rest_api_server/controllers/profiling/profiling_token.py @@ -1,6 +1,12 @@ +from tools.optscale_exceptions.common_exc import NotFoundException +from rest_api.rest_api_server.exceptions import Err from rest_api.rest_api_server.models.models import ProfilingToken -from rest_api.rest_api_server.controllers.profiling.base import BaseProfilingController -from rest_api.rest_api_server.controllers.base_async import BaseAsyncControllerWrapper +from rest_api.rest_api_server.controllers.profiling.base import ( + BaseProfilingController +) +from rest_api.rest_api_server.controllers.base_async import ( + BaseAsyncControllerWrapper +) class ProfilingTokenController(BaseProfilingController): @@ -10,6 +16,16 @@ def _get_model_type(self): def get(self, organization_id, **kwargs): return super().get_or_create_profiling_token(organization_id) + def get_profiling_token_info(self, profiling_token): + token = self.session.query(ProfilingToken).filter( + ProfilingToken.deleted.is_(False), + ProfilingToken.token == profiling_token + ).one_or_none() + if not token: + raise NotFoundException( + Err.OE0002, [ProfilingToken.__name__, profiling_token]) + return token + class ProfilingTokenAsyncController(BaseAsyncControllerWrapper): def _get_controller_class(self): diff --git a/rest_api/rest_api_server/controllers/resource_observer.py b/rest_api/rest_api_server/controllers/resource_observer.py index 4aa3cd41e..e516baf55 100644 --- a/rest_api/rest_api_server/controllers/resource_observer.py +++ b/rest_api/rest_api_server/controllers/resource_observer.py @@ -145,12 +145,14 @@ def observe(self, organization_id): cloud_account = cloud_accounts_map[acc_id] meta = { 'object_name': cloud_account.name, - 'stat': stat + **stat } + action = 'resources_discovered' + if meta.get('clusters'): + action = 'resources_clustered_discovered' self.publish_activities_task( organization_id, cloud_account.id, 'cloud_account', - 'resources_discovered', meta, - 'cloud_account.resources_discovered', add_token=True) + action, meta, 'cloud_account.' + action, add_token=True) pools_for_org = PoolController( self.session, self._config, self.token ).get_organization_pools(organization_id) diff --git a/rest_api/rest_api_server/controllers/rule.py b/rest_api/rest_api_server/controllers/rule.py index a426f6d76..d8da8f047 100644 --- a/rest_api/rest_api_server/controllers/rule.py +++ b/rest_api/rest_api_server/controllers/rule.py @@ -288,13 +288,15 @@ def extend_rule_output(rules, pool_details_map, return formatted_rules @retry(**RULE_PRIORITY_RETRIES) - def create_rule(self, user_id, organization_id, token, is_deprioritized=False, **kwargs): + def create_rule(self, user_id, organization_id, token, + is_deprioritized=False, **kwargs): # TODO implement permissions check OSB-412 self._validate_parameters(**kwargs) employee = self.employee_ctrl.get_employee_by_user_and_organization( user_id, organization_id=organization_id) try: - result = self._prepare_rule_data(employee, organization_id, is_deprioritized, **kwargs) + result = self._prepare_rule_data(employee, organization_id, + is_deprioritized, **kwargs) rule, pool, owner = result if owner and pool: if not self.assignment_ctrl.validate_owner(owner, pool, token): @@ -314,7 +316,7 @@ def create_rule(self, user_id, organization_id, token, is_deprioritized=False, * 'pool_id': pool.id } self.publish_activities_task( - rule.organization_id, rule.organization_id, 'organization', + rule.organization_id, rule.organization_id, 'rule', 'rule_created', meta, 'organization.rule_created', add_token=True) return self.get_rule_info(rule) @@ -450,7 +452,7 @@ def edit_rule(self, item_id, token, **kwargs): 'rule_id': rule.id, } self.publish_activities_task( - rule.organization_id, rule.organization_id, 'organization', + rule.organization_id, rule.organization_id, 'rule', 'rule_updated', meta, 'organization.rule_updated', add_token=True) return self.get_rule_info(rule) @@ -482,7 +484,7 @@ def delete(self, item_id, **kwargs): 'rule_id': rule.id, } self.publish_activities_task( - rule.organization_id, rule.organization_id, 'organization', + rule.organization_id, rule.organization_id, 'rule', 'rule_deleted', meta, 'organization.rule_deleted', add_token=True) except IntegrityError as exc: diff --git a/rest_api/rest_api_server/handlers/v2/profiling/profiling_tokens.py b/rest_api/rest_api_server/handlers/v2/profiling/profiling_tokens.py index e94e19590..d6dbc102a 100644 --- a/rest_api/rest_api_server/handlers/v2/profiling/profiling_tokens.py +++ b/rest_api/rest_api_server/handlers/v2/profiling/profiling_tokens.py @@ -1,6 +1,9 @@ +from tools.optscale_exceptions.common_exc import NotFoundException +from tools.optscale_exceptions.http_exc import OptHTTPError from rest_api.rest_api_server.controllers.profiling.profiling_token import ( ProfilingTokenAsyncController) -from rest_api.rest_api_server.handlers.v1.base_async import BaseAsyncCollectionHandler +from rest_api.rest_api_server.handlers.v1.base_async import ( + BaseAsyncCollectionHandler, BaseAsyncItemHandler) from rest_api.rest_api_server.handlers.v1.base import BaseAuthHandler from rest_api.rest_api_server.utils import run_task @@ -68,3 +71,66 @@ async def get(self, organization_id, **url_params): res = await run_task(self.controller.get, organization_id=organization_id) self.write(res.to_json()) + + +class ProfilingTokenInfoAsyncItemHandler(BaseAsyncItemHandler, + BaseAuthHandler): + def _get_controller_class(self): + return ProfilingTokenAsyncController + + async def patch(self, profiling_token, **kwargs): + self.raise405() + + async def delete(self, profiling_token, **kwargs): + self.raise405() + + async def get(self, profiling_token, **url_params): + """ + --- + description: | + Get profiling token info + Required permission: CLUSTER_SECRET + tags: [profiling_tokens] + summary: Get profiling token info by profiling token value + parameters: + - name: profiling_token + in: path + description: Profiling token value + required: true + type: string + responses: + 200: + description: Organization profiling token info + schema: + type: object + properties: + id: + type: string + description: Unique profiling token id + token: + type: string + description: Profiling token + organization_id: + type: string + description: Organization id + created_at: + type: integer + description: Organization id + deleted_at: + type: integer + description: Organization id + 401: + description: | + Unauthorized: + - OE0237: This resource requires authorization + 403: + description: | + Forbidden: + - OE0236: Bad secret + security: + - secret: [] + """ + self.check_cluster_secret(raises=True) + res = await run_task(self.controller.get_profiling_token_info, + profiling_token=profiling_token) + self.write(res.to_json()) diff --git a/rest_api/rest_api_server/server.py b/rest_api/rest_api_server/server.py index e9e9f5265..d7ea918ff 100644 --- a/rest_api/rest_api_server/server.py +++ b/rest_api/rest_api_server/server.py @@ -138,6 +138,10 @@ def get_handlers(handler_kwargs, version=None): (urls_v2.tags_collection, h_v2.profiling.tags.TagsAsyncCollectionHandler, handler_kwargs), + (urls_v2.profiling_token_info, + h_v2.profiling.profiling_tokens.ProfilingTokenInfoAsyncItemHandler, + handler_kwargs + ) ] infrastructure_urls = [ (urls_v2.infra_profiling_token, diff --git a/rest_api/rest_api_server/tests/unittests/test_api_base.py b/rest_api/rest_api_server/tests/unittests/test_api_base.py index 89f239019..7f6a747ee 100644 --- a/rest_api/rest_api_server/tests/unittests/test_api_base.py +++ b/rest_api/rest_api_server/tests/unittests/test_api_base.py @@ -173,8 +173,8 @@ def get_bulk_allowed_action_pools_map(self, user_ids, actions): return result @staticmethod - def get_publish_activity_tuple(org_id, object_id, object_type, action, meta, - routing_key=None): + def get_publish_activity_tuple(org_id, object_id, object_type, action, + meta, routing_key=None): if routing_key is None: routing_key = '{object_type}.{action}'.format( object_type=object_type, action=action) diff --git a/rest_api/rest_api_server/tests/unittests/test_api_pool_policies.py b/rest_api/rest_api_server/tests/unittests/test_api_pool_policies.py index e90217712..2721cff65 100644 --- a/rest_api/rest_api_server/tests/unittests/test_api_pool_policies.py +++ b/rest_api/rest_api_server/tests/unittests/test_api_pool_policies.py @@ -279,9 +279,13 @@ def test_policy_events(self): self.organization['pool_id'], valid_policy) self.assertEqual(code, 201) activity_param_tuples = self.get_publish_activity_tuple( - self.organization['id'], self.organization['pool_id'], 'pool', - 'policy_created', {'object_name': 'test organization', - 'policy_type': 'total_expense_limit'}) + self.organization['id'], self.organization['pool_id'], + 'pool_policy', 'policy_created', + {'pool_name': 'test organization', + 'pool_id': self.organization['pool_id'], + 'policy_type': 'total_expense_limit'}, + routing_key='pool.policy_created' + ) p_publish_activity.assert_called_once_with(*activity_param_tuples, add_token=True) @@ -294,9 +298,13 @@ def test_policy_events(self): self.assertEqual(code, 200) self.assertFalse(resp['active']) activity_param_tuples = self.get_publish_activity_tuple( - self.organization['id'], self.organization['pool_id'], 'pool', - 'policy_disabled', {'object_name': 'test organization', - 'policy_type': 'total_expense_limit'}) + self.organization['id'], self.organization['pool_id'], + 'pool_policy', 'policy_disabled', + {'object_name': 'test organization', + 'policy_type': 'total_expense_limit', + 'pool_id': self.organization['pool_id'], + 'pool_name': 'test organization'}, + routing_key='pool.policy_disabled') p_publish_activity.assert_called_once_with(*activity_param_tuples, add_token=True) @@ -308,10 +316,15 @@ def test_policy_events(self): policy['id'], {'active': False, 'limit': 50}) self.assertEqual(code, 200) activity_param_tuples = self.get_publish_activity_tuple( - self.organization['id'], self.organization['pool_id'], 'pool', - 'policy_updated', {'object_name': 'test organization', - 'policy_type': 'total_expense_limit', - 'params': 'limit: 50'}) + self.organization['id'], self.organization['pool_id'], + 'pool_policy', 'policy_updated', + {'object_name': 'test organization', + 'policy_type': 'total_expense_limit', + 'pool_id': self.organization['pool_id'], + 'pool_name': 'test organization', + 'params': 'limit: 50'}, + routing_key='pool.policy_updated' + ) p_publish_activity.assert_called_once_with(*activity_param_tuples, add_token=True) @@ -327,9 +340,14 @@ def test_policy_events(self): policy['id'], {'active': True}) self.assertEqual(code, 200) activity_param_tuples = self.get_publish_activity_tuple( - self.organization['id'], self.organization['pool_id'], 'pool', - 'policy_enabled', {'object_name': 'test organization', - 'policy_type': 'total_expense_limit'}) + self.organization['id'], self.organization['pool_id'], + 'pool_policy', 'policy_enabled', + {'object_name': 'test organization', + 'policy_type': 'total_expense_limit', + 'pool_id': self.organization['pool_id'], + 'pool_name': 'test organization'}, + routing_key='pool.policy_enabled' + ) p_publish_activity.assert_called_once_with(*activity_param_tuples, add_token=True) @@ -340,8 +358,12 @@ def test_policy_events(self): code, _ = self.client.pool_policy_delete(policy['id']) self.assertEqual(code, 204) activity_param_tuples = self.get_publish_activity_tuple( - self.organization['id'], self.organization['pool_id'], 'pool', - 'policy_deleted', {'object_name': 'test organization', - 'policy_type': 'total_expense_limit'}) + self.organization['id'], self.organization['pool_id'], + 'pool_policy', 'policy_deleted', + {'pool_name': 'test organization', + 'pool_id': self.organization['pool_id'], + 'policy_type': 'total_expense_limit'}, + routing_key='pool.policy_deleted' + ) p_publish_activity.assert_called_once_with(*activity_param_tuples, add_token=True) diff --git a/rest_api/rest_api_server/tests/unittests/test_budget_alerts.py b/rest_api/rest_api_server/tests/unittests/test_budget_alerts.py index e29eba619..4fa2feefe 100644 --- a/rest_api/rest_api_server/tests/unittests/test_budget_alerts.py +++ b/rest_api/rest_api_server/tests/unittests/test_budget_alerts.py @@ -4,7 +4,9 @@ from freezegun import freeze_time -from rest_api.rest_api_server.models.enums import ThresholdBasedTypes, ThresholdTypes +from rest_api.rest_api_server.models.enums import ( + ThresholdBasedTypes, ThresholdTypes +) from rest_api.rest_api_server.tests.unittests.test_api_base import TestApiBase @@ -969,15 +971,26 @@ def test_create_activities_task(self): meta = { 'initiator_name': ANY, 'initiator_email': ANY, - 'alert': ANY, 'pool_name': ANY, 'with_subpools': ANY, - 'warn_type': ANY, - 'object_name': 'alert(%s)' % response['id'] + 'warn_type': 'expenses', + 'object_name': 'alert(%s)' % response['id'], + 'deleted_at': 0, + 'id': ANY, + 'created_at': ANY, + 'pool_id': self.organization_1['pool_id'], + 'threshold': 80, + 'threshold_type': 'absolute', + 'based': 'cost', + 'last_shoot_at': 0, + 'include_children': False, + 'contacts': ANY, + 'contact_type': 'Email', + 'threshold_string': 'threshold 80 of pool limit' } self.p_send_message.assert_called_once_with( self.organization_1['id'], response['id'], 'pool_alert', - 'alert_added', meta, 'alert.action.added', add_token=True) + 'alert_added', meta, 'alert.action.added') def test_delete_activities_task(self): body = { @@ -998,12 +1011,23 @@ def test_delete_activities_task(self): meta = { 'initiator_name': ANY, 'initiator_email': ANY, - 'alert': ANY, 'pool_name': ANY, 'with_subpools': ANY, - 'warn_type': ANY, - 'object_name': ANY + 'warn_type': 'expenses', + 'object_name': ANY, + 'deleted_at': 0, + 'id': ANY, + 'created_at': ANY, + 'pool_id': self.organization_1['pool_id'], + 'threshold': 80, + 'threshold_type': 'absolute', + 'based': 'cost', + 'last_shoot_at': 0, + 'include_children': False, + 'contacts': ANY, + 'contact_type': 'Email', + 'threshold_string': 'threshold 80 of pool limit' } self.p_send_message.assert_called_once_with( self.organization_1['id'], alert['id'], 'pool_alert', - 'alert_removed', meta, 'alert.action.removed', add_token=True) + 'alert_removed', meta, 'alert.action.removed') diff --git a/rest_api/rest_api_server/tests/unittests/test_calendar_observer.py b/rest_api/rest_api_server/tests/unittests/test_calendar_observer.py index 2264e262b..9971301a5 100644 --- a/rest_api/rest_api_server/tests/unittests/test_calendar_observer.py +++ b/rest_api/rest_api_server/tests/unittests/test_calendar_observer.py @@ -212,28 +212,33 @@ def test_observe_event_changed(self, p_create_event, p_public_ip): 'rest_api.google_calendar_client.client.' 'GoogleCalendarClient.list_events', return_value=[event] ) as p_list_event, patch( - 'rest_api.google_calendar_client.client.GoogleCalendarClient.update_event' + 'rest_api.google_calendar_client.client.' + 'GoogleCalendarClient.update_event' ) as p_patch_event: code, _ = self.client.observe_calendar(self.org_id) self.assertEqual(code, 204) p_patch_event.assert_called_once_with( c_sync['calendar_id'], event['id'], - summary=f"{resource.get('name')} is acquired by {self.employee['name']}") + summary=f"{resource.get('name')} is acquired by " + f"{self.employee['name']}") p_list_event.assert_called_once_with( c_sync['calendar_id'], ANY, ANY, opttime.utcnow() - timedelta(days=28) ) with patch( - 'optscale_client.config_client.client.Client.google_calendar_service_key', + 'optscale_client.config_client.client.Client.' + 'google_calendar_service_key', return_value={'client_email': 'example@hystax.com'}): code, res = self.client.organization_calendar_get(self.org_id) self.assertEqual(code, 200) c_sync = res['calendar_synchronization'] with patch( - 'rest_api.google_calendar_client.client.GoogleCalendarClient.update_event' + 'rest_api.google_calendar_client.client.' + 'GoogleCalendarClient.update_event' ), patch( - 'rest_api.google_calendar_client.client.GoogleCalendarClient.list_events', + 'rest_api.google_calendar_client.client.' + 'GoogleCalendarClient.list_events', return_value=[event] ) as p_list_event: code, _ = self.client.observe_calendar(self.org_id) @@ -246,14 +251,15 @@ def test_observe_event_changed(self, p_create_event, p_public_ip): 'rest_api.rest_api_server.controllers.base.BaseController.' 'publish_activities_task' ).start() - with patch('rest_api.google_calendar_client.client.GoogleCalendarClient.list_events', + with patch('rest_api.google_calendar_client.client.' + 'GoogleCalendarClient.list_events', side_effect=FailedDependency(Err.OE0490, ['not found'])): code, _ = self.client.observe_calendar(self.org_id) self.assertEqual(code, 204) error = 'Unable to list calendar events: not found' activity_param_tuples = self.get_publish_activity_tuple( self.org_id, c_sync['id'], 'calendar_synchronization', - 'calendar_warning', { + 'calendar_observer_warning', { 'calendar_id': c_sync['calendar_id'], 'reason': error, 'is_observer': True, diff --git a/rest_api/rest_api_server/tests/unittests/test_cloud_resources_bulk.py b/rest_api/rest_api_server/tests/unittests/test_cloud_resources_bulk.py index 86e87ac52..100f052a3 100644 --- a/rest_api/rest_api_server/tests/unittests/test_cloud_resources_bulk.py +++ b/rest_api/rest_api_server/tests/unittests/test_cloud_resources_bulk.py @@ -326,7 +326,7 @@ def test_create_resource_with_system_tags_first(self): self.org1_id, self.cloud_acc1_id, 'cloud_account', 'resources_discovered', { 'object_name': self.cloud_acc1['name'], - 'stat': {'total': 1, 'clusters': [], 'clustered': 0} + 'total': 1, 'clusters': 0, 'clustered': 0 }) publish_discovered_cluster_resource_activity.assert_called_once_with( *activity_param_tuples, add_token=True) diff --git a/rest_api/rest_api_server/tests/unittests/test_invite_api.py b/rest_api/rest_api_server/tests/unittests/test_invite_api.py index 76a5db2f2..f8a4668f2 100644 --- a/rest_api/rest_api_server/tests/unittests/test_invite_api.py +++ b/rest_api/rest_api_server/tests/unittests/test_invite_api.py @@ -17,10 +17,12 @@ def setUp(self, version='v2'): self.user_id = self.gen_id() self._mock_auth_user(self.user_id) patch( - 'rest_api.rest_api_server.controllers.invite.InviteController.check_user_exists', + 'rest_api.rest_api_server.controllers.invite.' + 'InviteController.check_user_exists', return_value=(False, {})).start() patch( - 'rest_api.rest_api_server.controllers.invite.InviteController.get_invite_expiration_days', + 'rest_api.rest_api_server.controllers.invite.' + 'InviteController.get_invite_expiration_days', return_value=30).start() _, root_organization = self.client.organization_create( {'name': 'root org'}) @@ -78,15 +80,17 @@ def setUp(self, version='v2'): 'role_scope': None }] - patch('rest_api.rest_api_server.controllers.invite.InviteController.' - 'get_user_auth_assignments', return_value=user_assignments).start() - patch('rest_api.rest_api_server.controllers.employee.EmployeeController.' - 'notification_domain_blacklist').start() + patch('rest_api.rest_api_server.controllers.invite.' + 'InviteController.get_user_auth_assignments', + return_value=user_assignments).start() + patch('rest_api.rest_api_server.controllers.employee.' + 'EmployeeController.notification_domain_blacklist').start() self.mock_user_info(self.owner_email) def mock_user_info(self, email, name='default'): patch( - 'rest_api.rest_api_server.handlers.v1.base.BaseAuthHandler._get_user_info', + 'rest_api.rest_api_server.handlers.v1.base.' + 'BaseAuthHandler._get_user_info', return_value={ 'id': self.user_id, 'display_name': name, @@ -159,7 +163,8 @@ def test_employee_invited_event(self): 'initiator_email': 'user@ema.il', 'user_email': self.email_2, 'scope_purposes': '%s: %s' % ( - self.org_name, 'optscale_engineer') # org name is equal to root pool name + # org name is equal to root pool name + self.org_name, 'optscale_engineer') } self.p_get_user_info.return_value = { 'display_name': fmt_args['initiator_name'], 'id': self._user_id, @@ -174,7 +179,8 @@ def test_employee_invited_event(self): 'employee_invited', { 'object_name': self.org_name, 'email': self.email_2, - 'scope_purposes': '%s: %s' % (self.org_name, 'optscale_engineer') + 'scope_purposes': '%s: %s' % (self.org_name, + 'engineer') }) p_publish_activities.assert_called_once_with( *activity_param_tuples, add_token=True @@ -190,7 +196,8 @@ def test_employee_member_invited_event(self): 'initiator_email': 'user@ema.il', 'user_email': self.email_2, 'scope_purposes': '%s: %s' % ( - self.org_name, 'optscale_member') # org name is equal to root pool name + # org name is equal to root pool name + self.org_name, 'optscale_member') } self.p_get_user_info.return_value = { 'display_name': fmt_args['initiator_name'], 'id': self._user_id, @@ -205,7 +212,7 @@ def test_employee_member_invited_event(self): 'employee_invited', { 'object_name': self.org_name, 'email': self.email_2, - 'scope_purposes': '%s: %s' % (self.org_name, 'optscale_member') + 'scope_purposes': '%s: %s' % (self.org_name, 'member') }) p_publish_activities.assert_called_once_with( *activity_param_tuples, add_token=True diff --git a/rest_api/rest_api_server/tests/unittests/test_pools.py b/rest_api/rest_api_server/tests/unittests/test_pools.py index 61126f49f..f8de1ac7e 100644 --- a/rest_api/rest_api_server/tests/unittests/test_pools.py +++ b/rest_api/rest_api_server/tests/unittests/test_pools.py @@ -1113,9 +1113,8 @@ def test_pool_events(self): 'pool_created', { 'object_name': pool['name'] }) - p_publish_activities.assert_called_once_with( - *activity_param_tuples, add_token=True - ) + p_publish_activities.assert_called_once_with(*activity_param_tuples) + p_publish_activities = patch( 'rest_api.rest_api_server.controllers.base.BaseController.' 'publish_activities_task' diff --git a/rest_api/rest_api_server/tests/unittests/test_profiling_tokens.py b/rest_api/rest_api_server/tests/unittests/test_profiling_tokens.py index 5b541078b..1a1e6df4e 100644 --- a/rest_api/rest_api_server/tests/unittests/test_profiling_tokens.py +++ b/rest_api/rest_api_server/tests/unittests/test_profiling_tokens.py @@ -67,3 +67,15 @@ def test_error_on_create(self): code, resp = self.client.profiling_token_get(self.org['id']) self.assertEqual(code, 200) self.assertEqual(len(self._get_db_profiling_tokens(self.org['id'])), 1) + + def test_get_token_info(self): + code, token = self.client.profiling_token_get(self.org['id']) + self.assertEqual(code, 200) + code, data = self.client.profiling_token_info_get(token['token']) + self.assertEqual(code, 200) + self.assertEqual(data['organization_id'], self.org['id']) + + def test_get_invalid_token_info(self): + code, data = self.client.profiling_token_info_get('token') + self.assertEqual(code, 404) + self.assertEqual(data['error']['error_code'], 'OE0002') diff --git a/rest_api/rest_api_server/tests/unittests/test_resources_observer.py b/rest_api/rest_api_server/tests/unittests/test_resources_observer.py index 46054dad4..299c17c78 100644 --- a/rest_api/rest_api_server/tests/unittests/test_resources_observer.py +++ b/rest_api/rest_api_server/tests/unittests/test_resources_observer.py @@ -174,7 +174,7 @@ def test_observe_newly_discovered_resources(self): self.assertEqual(code, 204) activity_param_tuples = self.get_publish_activity_tuple( self.org_id, ANY, 'cloud_account', 'resources_discovered', { - 'stat': {'total': 3, 'clusters': [], 'clustered': 0}, + 'total': 3, 'clusters': 0, 'clustered': 0, 'object_name': ANY }) p_publish_activities.assert_has_calls([ @@ -214,11 +214,8 @@ def test_observe_newly_discovered_resources_clustered(self): code, _ = self.client.observe_resources(self.org_id) self.assertEqual(code, 204) activity_param_tuples = self.get_publish_activity_tuple( - self.org_id, ANY, 'cloud_account', 'resources_discovered', { - 'stat': {'clustered': 4, 'clusters': [cluster['_id']], - 'total': 4}, - 'object_name': ANY - }) + self.org_id, ANY, 'cloud_account', 'resources_clustered_discovered', + {'clustered': 4, 'clusters': 1, 'total': 4, 'object_name': ANY}) p_publish_activities.assert_has_calls([ call(*activity_param_tuples, add_token=True) ]) @@ -227,9 +224,9 @@ def test_observe_newly_discovered_resources_clustered(self): code, _ = self.client.observe_resources(self.org_id) self.assertEqual(code, 204) activity_param_tuples = self.get_publish_activity_tuple( - self.org_id, ANY, 'cloud_account', 'resources_discovered', { - 'stat': {'clustered': 4, 'clusters': [cluster['_id']], - 'total': 4}, + self.org_id, ANY, 'cloud_account', + 'resources_clustered_discovered', { + 'clustered': 4, 'clusters': 1, 'total': 4, 'object_name': ANY }) p_publish_activities.assert_has_calls([ diff --git a/rest_api/rest_api_server/utils.py b/rest_api/rest_api_server/utils.py index 0a4befd80..2f8fd0a1c 100644 --- a/rest_api/rest_api_server/utils.py +++ b/rest_api/rest_api_server/utils.py @@ -464,7 +464,7 @@ def generate_discovered_cluster_resources_stat( stat['clusters'].add(r.get('cluster_id')) for statistic in list(newly_discovered_stat.values()): if 'clusters' in statistic: - statistic['clusters'] = list(statistic['clusters']) + statistic['clusters'] = len(statistic['clusters']) return newly_discovered_stat diff --git a/slacker/slacker_server/message_templates/alerts.py b/slacker/slacker_server/message_templates/alerts.py index 87272892d..acd3e4e76 100644 --- a/slacker/slacker_server/message_templates/alerts.py +++ b/slacker/slacker_server/message_templates/alerts.py @@ -189,8 +189,11 @@ def get_alert_section(public_ip, pool_id, organization_id, pool_name, include_children, currency='USD'): c_sign = CURRENCY_MAP.get(currency, '') threshold_type_str = 'Expenses' if based == 'cost' else 'Forecast' + threshold = str(threshold) if threshold_type == 'absolute': exceed_str = f'*{c_sign}{threshold}*' + elif threshold.endswith('%'): + exceed_str = f'*{threshold}*' else: exceed_str = f'*{threshold}%*' with_subs = ' (with subs)' if include_children else ''