Skip to content

Commit

Permalink
Feature/notification settings
Browse files Browse the repository at this point in the history
Co-authored-by: ek-hystax <[email protected]>
  • Loading branch information
nk-hystax and ek-hystax authored Dec 3, 2024
1 parent 092a9bc commit f29e46b
Show file tree
Hide file tree
Showing 42 changed files with 1,633 additions and 109 deletions.
127 changes: 77 additions & 50 deletions docker_images/herald_executor/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
from kombu.utils.debug import setup_logging
from kombu import Exchange, Queue, binding
import urllib3

from currency_symbols.currency_symbols import CURRENCY_SYMBOLS_MAP
from optscale_client.config_client.client import Client as ConfigClient
from optscale_client.rest_api_client.client_v2 import Client as RestClient
from optscale_client.herald_client.client_v2 import Client as HeraldClient
from optscale_client.auth_client.client_v2 import Client as AuthClient
from currency_symbols.currency_symbols import CURRENCY_SYMBOLS_MAP
from tools.optscale_time import utcnow_timestamp, utcfromtimestamp

LOG = get_logger(__name__)
Expand Down Expand Up @@ -76,6 +75,16 @@ class HeraldTemplates(Enum):
FIRST_RUN_STARTED = 'first_run_started'


CONSTRAINT_TYPE_TEMPLATE_MAP = {
'expense_anomaly': HeraldTemplates.ANOMALY_DETECTION.value,
'resource_count_anomaly': HeraldTemplates.ANOMALY_DETECTION.value,
'expiring_budget': HeraldTemplates.EXPIRING_BUDGET.value,
'recurring_budget': HeraldTemplates.RECURRING_BUDGET.value,
'resource_quota': HeraldTemplates.RESOURCE_QUOTA.value,
'tagging_policy': HeraldTemplates.TAGGING_POLICY.value
}


class HeraldExecutorWorker(ConsumerMixin):
def __init__(self, connection, config_cl):
self.connection = connection
Expand Down Expand Up @@ -118,32 +127,39 @@ def get_consumers(self, consumer, channel):
return [consumer(queues=[TASK_QUEUE], accept=['json'],
callbacks=[self.process_task], prefetch_count=10)]

def get_auth_users(self, user_ids):
_, response = self.auth_cl.user_list(user_ids)
return response

def get_owner_manager_infos(self, organization_id,
tenant_auth_user_ids=None):
auth_users = []
if tenant_auth_user_ids:
auth_users = self.get_auth_users(tenant_auth_user_ids)
all_user_info = {auth_user['id']: {
'display_name': auth_user.get('display_name'),
'email': auth_user.get('email')
} for auth_user in auth_users}

_, org_managers = self.auth_cl.user_roles_get(
scope_ids=[organization_id],
role_purposes=[MANAGER_ROLE])
for manager in org_managers:
user_id = manager['user_id']
if not tenant_auth_user_ids or user_id not in tenant_auth_user_ids:
def get_owner_manager_infos(
self, organization_id, tenant_auth_user_ids=None,
email_template=None):
_, employees = self.rest_cl.employee_list(organization_id)
_, user_roles = self.auth_cl.user_roles_get(
scope_ids=[organization_id],
user_ids=[x['auth_user_id'] for x in employees['employees']]
)
all_user_info = {}
for user_role in user_roles:
user_id = user_role['user_id']
if (user_role['role_purpose'] == MANAGER_ROLE or
tenant_auth_user_ids and user_id in tenant_auth_user_ids):
all_user_info[user_id] = {
'display_name': manager.get('user_display_name'),
'email': manager.get('user_email')
'display_name': user_role.get('user_display_name'),
'email': user_role.get('user_email')
}
if email_template:
for employee in employees['employees']:
auth_user_id = employee['auth_user_id']
if (auth_user_id in all_user_info and
not self.is_email_enabled(employee['id'],
email_template)):
all_user_info.pop(auth_user_id, None)
return all_user_info

def is_email_enabled(self, employee_id, email_template):
_, employee_emails = self.rest_cl.employee_emails_get(
employee_id, email_template)
employee_email = employee_emails.get('employee_emails')
if employee_email:
return employee_email[0]['enabled']

def _get_service_emails(self):
return self.config_cl.optscale_email_recipient()

Expand Down Expand Up @@ -217,7 +233,7 @@ def format_remained_time(start_date, end_date):
shareable_booking_data = self._filter_bookings(
shareable_bookings.get('data', []), resource_id, now_ts)
for booking in shareable_booking_data:
acquired_by_id = booking.get('acquired_by_id')
acquired_by_id = booking.get('acquired_by', {}).get('id')
if acquired_by_id:
resource_tenant_ids.append(acquired_by_id)
_, employees = self.rest_cl.employee_list(org_id=organization_id)
Expand All @@ -227,11 +243,10 @@ def format_remained_time(start_date, end_date):
tenant_auth_user_ids = [
emp['auth_user_id'] for emp in list(employee_id_map.values())
]

for booking in shareable_booking_data:
acquired_since = booking['acquired_since']
released_at = booking['released_at']
acquired_by_id = booking.get('acquired_by_id')
acquired_by_id = booking.get('acquired_by', {}).get('id')
utc_acquired_since = int(
utcfromtimestamp(acquired_since).timestamp())
utc_released_at = int(
Expand Down Expand Up @@ -272,7 +287,8 @@ def format_remained_time(start_date, end_date):
}})

all_user_info = self.get_owner_manager_infos(
cloud_account['organization_id'], tenant_auth_user_ids)
cloud_account['organization_id'], tenant_auth_user_ids,
HeraldTemplates.ENVIRONMENT_CHANGES.value)
env_properties_list = [
{'env_key': env_prop_key, 'env_value': env_prop_value}
for env_prop_key, env_prop_value in env_properties.items()
Expand Down Expand Up @@ -409,9 +425,11 @@ def execute_expense_alert(self, pool_id, organization_id, meta):
employee_id = contact.get('employee_id')
if employee_id:
_, employee = self.rest_cl.employee_get(employee_id)
_, user = self.auth_cl.user_get(employee['auth_user_id'])
self.send_expenses_alert(
user['email'], alert, pool['name'], organization)
if self.is_email_enabled(employee['id'],
HeraldTemplates.POOL_ALERT.value):
_, user = self.auth_cl.user_get(employee['auth_user_id'])
self.send_expenses_alert(
user['email'], alert, pool['name'], organization)

def execute_constraint_violated(self, object_id, organization_id, meta,
object_type):
Expand All @@ -423,6 +441,14 @@ def execute_constraint_violated(self, object_id, organization_id, meta,
if user.get('slack_connected'):
return

_, employees = self.rest_cl.employee_list(organization_id)
employee = next((x for x in employees['employees']
if x['auth_user_id'] == object_id), None)
if employee and not self.is_email_enabled(
employee['id'],
HeraldTemplates.RESOURCE_OWNER_VIOLATION_ALERT.value):
return

hit_list = meta.get('violations')
resource_type_map = {
'ttl': 'TTL',
Expand Down Expand Up @@ -536,14 +562,6 @@ def _get_org_constraint_link(self, constraint, created_at, filters):
def _get_org_constraint_template_params(self, organization, constraint,
constraint_data, hit_date,
latest_hit, link, user_info):
constraint_template_map = {
'expense_anomaly': HeraldTemplates.ANOMALY_DETECTION.value,
'resource_count_anomaly': HeraldTemplates.ANOMALY_DETECTION.value,
'expiring_budget': HeraldTemplates.EXPIRING_BUDGET.value,
'recurring_budget': HeraldTemplates.RECURRING_BUDGET.value,
'resource_quota': HeraldTemplates.RESOURCE_QUOTA.value,
'tagging_policy': HeraldTemplates.TAGGING_POLICY.value
}
if 'anomaly' in constraint['type']:
title = 'Anomaly detection alert'
else:
Expand Down Expand Up @@ -578,7 +596,7 @@ def _get_org_constraint_template_params(self, organization, constraint,
if without_tag:
conditions.append(f'without tag "{without_tag}"')
params['texts']['conditions'] = ', '.join(conditions)
return params, title, constraint_template_map[constraint['type']]
return params, title

def execute_organization_constraint_violated(self, constraint_id,
organization_id):
Expand All @@ -587,7 +605,8 @@ def execute_organization_constraint_violated(self, constraint_id,
LOG.warning('Organization %s was not found, error code: %s' % (
organization_id, code))
return
code, constraint = self.rest_cl.organization_constraint_get(constraint_id)
code, constraint = self.rest_cl.organization_constraint_get(
constraint_id)
if not constraint:
LOG.warning(
'Organization constraint %s was not found, error code: %s' % (
Expand Down Expand Up @@ -616,9 +635,11 @@ def execute_organization_constraint_violated(self, constraint_id,
constraint_data['definition']['start_date'] = utcfromtimestamp(
int(constraint_data['definition']['start_date'])).strftime(
'%m/%d/%Y %I:%M %p UTC')
managers = self.get_owner_manager_infos(organization_id)
template = CONSTRAINT_TYPE_TEMPLATE_MAP[constraint['type']]
managers = self.get_owner_manager_infos(
organization_id, email_template=template)
for user_id, user_info in managers.items():
params, subject, template = self._get_org_constraint_template_params(
params, subject = self._get_org_constraint_template_params(
organization, constraint, constraint_data, hit_date,
latest_hit, link, user_info)
self.herald_cl.email_send(
Expand All @@ -634,7 +655,9 @@ def execute_new_security_recommendation(self, organization_id,
return
for i, data_dict in enumerate(module_count_list):
module_count_list[i] = data_dict
managers = self.get_owner_manager_infos(organization_id)
managers = self.get_owner_manager_infos(
organization_id,
email_template=HeraldTemplates.NEW_SECURITY_RECOMMENDATION.value)
for user_id, user_info in managers.items():
template_params = {
'texts': {
Expand Down Expand Up @@ -662,7 +685,8 @@ def execute_saving_spike(self, organization_id, meta):
opt['saving'] = round(opt['saving'], 2)
top3[i] = opt

managers = self.get_owner_manager_infos(organization_id)
managers = self.get_owner_manager_infos(
organization_id, email_template=HeraldTemplates.SAVING_SPIKE.value)
for user_id, user_info in managers.items():
template_params = {
'texts': {
Expand All @@ -683,18 +707,21 @@ def execute_saving_spike(self, organization_id, meta):

def execute_report_imports_passed_for_org(self, organization_id):
_, organization = self.rest_cl.organization_get(organization_id)
managers = self.get_owner_manager_infos(organization_id)
managers = self.get_owner_manager_infos(
organization_id,
email_template=HeraldTemplates.REPORT_IMPORT_PASSED.value)
emails = [x['email'] for x in managers.values()]
subject = 'Expenses initial processing completed'
template_params = {
'texts': {
'organization': self._get_organization_params(organization),
}
}
self.herald_cl.email_send(
emails, subject,
template_type=HeraldTemplates.REPORT_IMPORT_PASSED.value,
template_params=template_params)
if emails:
self.herald_cl.email_send(
emails, subject,
template_type=HeraldTemplates.REPORT_IMPORT_PASSED.value,
template_params=template_params)

def execute_insider_prices(self):
self._send_service_email('Insider faced Azure SSLError',
Expand Down
2 changes: 1 addition & 1 deletion docker_images/webhook_executor/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def get_environment_meta(self, webhook, meta_info):
ssh_key_map = json.loads(ssh_key_map_json)
ssh_key = ssh_key_map.get('key')
booking['ssh_key'] = ssh_key
owner_id = booking.get('acquired_by_id')
owner_id = booking.get('acquired_by', {}).get('id')
owner = {}
if owner_id:
_, owner = self.rest_cl.employee_get(owner_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
""""checking_email_settings_task_state"
Revision ID: f15bef09d604
Revises: 66dbed1e88e6
Create Date: 2024-08-30 12:02:40.374500
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.orm import Session
from sqlalchemy.sql import table, column
from sqlalchemy import update, String

# revision identifiers, used by Alembic.
revision = "f15bef09d604"
down_revision = "66dbed1e88e6"
branch_labels = None
depends_on = None


old_states = sa.Enum(
"created",
"started",
"getting_scopes",
"got_scopes",
"getting_recipients",
"got_recipients",
"generating_data",
"generated_data",
"putting_to_object_storage",
"put_to_object_storage",
"putting_to_herald",
"completed",
"error",
)
new_states = sa.Enum(
"created",
"started",
"getting_scopes",
"got_scopes",
"getting_recipients",
"got_recipients",
"checking_email_settings",
"generating_data",
"generated_data",
"putting_to_object_storage",
"put_to_object_storage",
"putting_to_herald",
"completed",
"error",
)


def upgrade():
op.alter_column("task", "state", existing_type=new_states, nullable=False)


def downgrade():
task_table = table(
"task",
column("state", String(128)),
)
bind = op.get_bind()
session = Session(bind=bind)
try:
update_task_stmt = (
update(task_table)
.values(state="started")
.where(task_table.c.state == "checking_email_settings")
)
session.execute(update_task_stmt)
session.commit()
finally:
session.close()

op.alter_column("task", "state", existing_type=old_states, nullable=False)
2 changes: 1 addition & 1 deletion katara/katara_service/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def save(self, host, username, password, db, file_name='alembic.ini'):
config.write(fh)


def execute(cmd, path='..'):
def execute(cmd, path='../..'):
LOG.debug('Executing command %s', ''.join(cmd))
myenv = os.environ.copy()
myenv['PYTHONPATH'] = path
Expand Down
1 change: 1 addition & 0 deletions katara/katara_service/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TaskState(enum.Enum):
got_scopes = 'got_scopes'
getting_recipients = 'getting_recipients'
got_recipients = 'got_recipients'
checking_email_settings = 'checking_email_settings'
generating_data = 'generating_data'
generated_data = 'generated_data'
putting_to_herald = 'putting_to_herald'
Expand Down
1 change: 1 addition & 0 deletions katara/katara_worker/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class TaskState(object):
GOT_SCOPES = 'got_scopes'
GETTING_RECIPIENTS = 'getting_recipients'
GOT_RECIPIENTS = 'got_recipients'
CHECKING_EMAIL_SETTINGS = 'checking_email_settings'
GENERATING_DATA = 'generating_data'
GENERATED_DATA = 'generated_data'
PUTTING_TO_HERALD = 'putting_to_herald'
Expand Down
15 changes: 15 additions & 0 deletions katara/katara_worker/reports_generators/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import os
from currency_symbols.currency_symbols import CURRENCY_SYMBOLS_MAP
from optscale_client.auth_client.client_v2 import Client as AuthClient
from optscale_client.rest_api_client.client_v2 import Client as RestClient


MODULE_NAME_EMAIL_TEMPLATE = {
'organization_expenses': 'weekly_expense_report',
'pool_limit_exceed': 'pool_exceed_report',
'pool_limit_exceed_resources': 'pool_exceed_resources_report',
'violated_constraints': 'resource_owner_violation_report',
'violated_constraints_diff': 'pool_owner_violation_report'
}


class Base(object):
def __init__(self, organization_id, report_data, config_client):
self.organization_id = organization_id
Expand Down Expand Up @@ -30,3 +40,8 @@ def auth_cl(self):
@staticmethod
def get_currency_code(currency):
return CURRENCY_SYMBOLS_MAP.get(currency, '')

@staticmethod
def get_template_type(path):
return MODULE_NAME_EMAIL_TEMPLATE[(os.path.splitext(
os.path.basename(path)))[0]]
Loading

0 comments on commit f29e46b

Please sign in to comment.