Skip to content

Commit

Permalink
Merge pull request #513 from NajmudheenCT/telemetry_delete
Browse files Browse the repository at this point in the history
Update Device delete operation by adding Telemetry task delete
  • Loading branch information
skdwriting authored Mar 24, 2021
2 parents f8bfee3 + a087e99 commit f3ec06c
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 2 deletions.
8 changes: 8 additions & 0 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from delfin.i18n import _
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager.tasks import resources
from delfin.task_manager.tasks import telemetry as task_telemetry

LOG = log.getLogger(__name__)
CONF = cfg.CONF
Expand Down Expand Up @@ -141,6 +142,13 @@ def delete(self, req, id):
ctxt,
storage['id'],
subclass.__module__ + '.' + subclass.__name__)

for subclass in task_telemetry.TelemetryTask.__subclasses__():
self.task_rpcapi.remove_telemetry_instances(ctxt,
storage['id'],
subclass.__module__ +
'.'
+ subclass.__name__)
self.task_rpcapi.remove_storage_in_cache(ctxt, storage['id'])

@wsgi.response(202)
Expand Down
7 changes: 7 additions & 0 deletions delfin/db/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,3 +834,10 @@ def failed_task_delete(context, failed_task_id):
exist.
"""
return IMPL.failed_task_delete(context, failed_task_id)


def failed_task_delete_by_storage(context, storage_id):
"""Delete all failed tasks of given storage or raise an exception if it
does not exist.
"""
return IMPL.failed_task_delete_by_storage(context, storage_id)
11 changes: 10 additions & 1 deletion delfin/db/sqlalchemy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,9 @@ def task_get(context, tasks_id):

def task_delete_by_storage(context, storage_id):
"""Delete all the tasks of a storage device"""
_task_get_query(context).filter_by(storage_id=storage_id).delete()
delete_info = {'deleted': True, 'deleted_at': timeutils.utcnow()}
_task_get_query(context).filter_by(
storage_id=storage_id).update(delete_info)


def task_delete(context, tasks_id):
Expand Down Expand Up @@ -1810,6 +1812,13 @@ def failed_task_delete_by_task_id(context, task_id):
task_id=task_id).delete()


def failed_task_delete_by_storage(context, storage_id):
"""Delete all the failed tasks of a storage device"""
delete_info = {'deleted': True, 'deleted_at': timeutils.utcnow()}
_failed_tasks_get_query(context).filter_by(
storage_id=storage_id).update(delete_info)


def failed_task_delete(context, failed_task_id):
"""Delete a given failed task"""
_failed_tasks_get_query(context).filter_by(id=failed_task_id).delete()
Expand Down
7 changes: 6 additions & 1 deletion delfin/db/sqlalchemy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,22 @@ class Task(BASE, DelfinBase):
args = Column(JsonEncodedDict)
last_run_time = Column(Integer)
job_id = Column(String(36))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)


class FailedTask(BASE, DelfinBase):
"""Represents a failed task attributes."""
__tablename__ = 'failed_tasks'
id = Column(Integer, primary_key=True, autoincrement=True)
storage_id = Column(String(36))
task_id = Column(Integer)
interval = Column(Integer)
start_time = Column(Integer)
end_time = Column(Integer)
retry_count = Column(Integer)
method = Column(String(255))
job_id = Column(String(36))
result = Column(String(255))
job_id = Column(String(36))
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)
8 changes: 8 additions & 0 deletions delfin/task_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def remove_storage_in_cache(self, context, storage_id):
drivers = driver_manager.DriverManager()
drivers.remove_driver(storage_id)

def remove_telemetry_instances(self, context, storage_id, telemetry_task):
LOG.info('Remove telemetry instances for storage id:{0}')
cls = importutils.import_class(telemetry_task)
device_obj = cls()
return device_obj.remove_telemetry(context,
storage_id,
)

def sync_storage_alerts(self, context, storage_id, query_para):
LOG.info('Alert sync called for storage id:{0}'
.format(storage_id))
Expand Down
7 changes: 7 additions & 0 deletions delfin/task_manager/rpcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ def remove_storage_in_cache(self, context, storage_id):
'remove_storage_in_cache',
storage_id=storage_id)

def remove_telemetry_instances(self, context, storage_id, telemetry_task):
call_context = self.client.prepare(version='1.0', fanout=True)
return call_context.cast(context,
'remove_telemetry_instances',
storage_id=storage_id,
telemetry_task=telemetry_task)

def sync_storage_alerts(self, context, storage_id, query_para):
call_context = self.client.prepare(version='1.0')
return call_context.cast(context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ def __call__(self):
:return:
"""
try:
# Remove jobs from scheduler when marked for delete
filters = {'deleted': True}
failed_tasks = db.failed_task_get_all(self.ctx, filters=filters)
LOG.debug("Total failed_tasks found deleted "
"in this cycle:%s" % len(failed_tasks))
for failed_task in failed_tasks:
job_id = failed_task['job_id']
if job_id and self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
db.task_delete(self.ctx, failed_task['id'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))
try:
# Create the object of periodic scheduler
failed_tasks = db.failed_task_get_all(self.ctx)

if not len(failed_tasks):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ def __init__(self, ctxt):

def __call__(self, ctx):
""" Schedule the collection tasks based on interval """
try:
# Remove jobs from scheduler when marked for delete
filters = {'deleted': True}
tasks = db.task_get_all(ctx, filters=filters)
LOG.debug("Total tasks found deleted "
"in this cycle:%s" % len(tasks))
for task in tasks:
job_id = task['job_id']
if job_id and self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
db.task_delete(ctx, task['id'])
except Exception as e:
LOG.error("Failed to remove periodic scheduling job , reason: %s.",
six.text_type(e))
try:

filters = {'last_run_time': None}
Expand Down
13 changes: 13 additions & 0 deletions delfin/task_manager/tasks/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class TelemetryTask(object):
def collect(self, ctx, storage_id, args, start_time, end_time):
pass

@abc.abstractmethod
def remove_telemetry(self, ctx, storage_id):
pass


class PerformanceCollectionTask(TelemetryTask):
def __init__(self):
Expand Down Expand Up @@ -66,3 +70,12 @@ def collect(self, ctx, storage_id, args, start_time, end_time):
"storage id :{0}, reason:{1}".format(storage_id,
six.text_type(e)))
return TelemetryTaskStatus.TASK_EXEC_STATUS_FAILURE

def remove_telemetry(self, ctx, storage_id):
try:
db.task_delete_by_storage(ctx, storage_id)
db.failed_task_delete_by_storage(ctx, storage_id)
except Exception as e:
LOG.error("Failed to remove task entries from DB for "
"storage id :{0}, reason:{1}".format(storage_id,
six.text_type(e)))
2 changes: 2 additions & 0 deletions delfin/tests/unit/api/v1/test_storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def test_delete(self):
db.storage_get.assert_called_once_with(ctxt, 'fake_id')
self.task_rpcapi.remove_storage_resource.assert_called_with(
ctxt, 'fake_id', mock.ANY)
self.task_rpcapi.remove_telemetry_instances.assert_called_once_with(
ctxt, 'fake_id', mock.ANY)
self.task_rpcapi.remove_storage_in_cache.assert_called_once_with(
ctxt, 'fake_id')

Expand Down
9 changes: 9 additions & 0 deletions delfin/tests/unit/db/test_db_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,12 @@ def test_failed_task_delete_by_task_id(self, mock_session):
.failed_task_delete_by_task_id(context,
fake_failed_task_id)
assert result is None

@mock.patch('delfin.db.sqlalchemy.api.get_session')
def test_failed_task_delete_by_storage(self, mock_session):
fake_failed_task_storage_id = [models.FailedTask().storage_id]
mock_session.return_value.__enter__.return_value.query.return_value \
= fake_failed_task_storage_id
result = db_api \
.task_delete_by_storage(context, fake_failed_task_storage_id)
assert result is None
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@
fake_telemetry_job,
]

fake_telemetry_job_deleted = {
Task.id.name: 2,
Task.storage_id.name: uuidutils.generate_uuid(),
Task.args.name: {},
Task.interval.name: 10,
Task.method.name: constants.TelemetryCollection.PERFORMANCE_TASK_METHOD,
Task.last_run_time.name: None,
Task.deleted.name: True,
}

fake_telemetry_jobs_deleted = [
fake_telemetry_job_deleted,
]
# With method name as None
Incorrect_telemetry_job = {
Task.id.name: 2,
Expand Down Expand Up @@ -80,6 +93,25 @@ def test_telemetry_job_scheduling(self, mock_add_job):
mock.Mock())
@mock.patch('logging.LoggerAdapter.error')
def test_telemetry_job_scheduling_exception(self, mock_log_error):
ctx = context.get_admin_context()
telemetry_job = TelemetryJob(ctx)
# call telemetry job scheduling
telemetry_job(ctx)
self.assertEqual(mock_log_error.call_count, 2)

@mock.patch.object(db, 'task_delete',
mock.Mock())
@mock.patch.object(db, 'task_get_all',
mock.Mock(return_value=fake_telemetry_jobs_deleted))
@mock.patch.object(db, 'task_update',
mock.Mock(return_value=fake_telemetry_job))
@mock.patch.object(db, 'task_get',
mock.Mock(return_value=fake_telemetry_job))
@mock.patch(
'apscheduler.schedulers.background.BackgroundScheduler.add_job',
mock.Mock())
@mock.patch('logging.LoggerAdapter.error')
def test_telemetry_removal_success(self, mock_log_error):
ctx = context.get_admin_context()
telemetry_job = TelemetryJob(ctx)
# call telemetry job scheduling
Expand Down
13 changes: 13 additions & 0 deletions delfin/tests/unit/task_manager/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,16 @@ def test_performance_collection_failure(self, mock_collect_perf_metrics,
# when collect metric fails
self.assertEqual(mock_dispatch.call_count, 0)
self.assertEqual(mock_log_error.call_count, 1)

@mock.patch('delfin.db.failed_task_delete_by_storage')
@mock.patch('delfin.db.task_delete_by_storage')
def test_successful_remove(self, mock_task_del, mock_failed_task_del):
telemetry_obj = telemetry.PerformanceCollectionTask(
)
telemetry_obj.remove_telemetry(
context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda')

mock_task_del.assert_called_with(
context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda')
mock_failed_task_del.assert_called_with(
context, 'c5c91c98-91aa-40e6-85ac-37a1d3b32bda')

0 comments on commit f3ec06c

Please sign in to comment.