From 039852fd88d58c7175ca07ad7f928e6ea988418a Mon Sep 17 00:00:00 2001 From: benoit74 Date: Mon, 27 Nov 2023 09:56:38 +0100 Subject: [PATCH] Fix update of worker last_seen property --- .../routes/requested_tasks/requested_task.py | 17 +- .../integration/routes/workers/test_worker.py | 160 +++++++++++++----- 2 files changed, 134 insertions(+), 43 deletions(-) diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 68805193..6efd6323 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -223,20 +223,27 @@ def get(self, session: so.Session, token: AccessToken.Payload): # record we've seen a worker, if applicable if token and worker_name: worker = dbm.Worker.get(session, worker_name, WorkerNotFound) + + # Update worker properties only if called as the worker itself, not as an + # admin if worker.user.username == token.username: worker.last_seen = getnow() # IP changed since last encounter - if str(worker.last_ip) != worker_ip: + ip_changed = str(worker.last_ip) != worker_ip + if ip_changed: logger.info( f"Worker IP changed detected for {worker_name}: " f"IP changed from {worker.last_ip} to {worker_ip}" ) worker.last_ip = worker_ip - # commit explicitely since we are not using an explicit transaction, - # and do it before calling Wasabi so that changes are propagated - # quickly and transaction is not blocking - session.commit() + + # commit explicitely since we are not using an explicit transaction, + # and do it before calling Wasabi so that changes are propagated + # quickly and transaction is not blocking + session.commit() + + if ip_changed: if constants.USES_WORKERS_IPS_WHITELIST: try: record_ip_change(session=session, worker_name=worker_name) diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index 4767bdc6..57a8c3c9 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,7 +1,8 @@ -from typing import List +from typing import Any, Dict, List import pytest +import db.models as dbm from common import constants from common.external import ExternalIpUpdater, build_workers_whitelist @@ -212,32 +213,130 @@ def test_checkin_another_user( class TestWorkerRequestedTasks: - def test_requested_task_worker_as_admin(self, client, access_token, worker): + @pytest.fixture() + def req_task_query_string(self, worker): + return { + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + } + + @pytest.fixture + def make_headers(self): + def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]: + return { + "Authorization": access_token, + "X-Forwarded-For": client_ip, + } + + return _make_headers + + @pytest.fixture + def admin_headers(self, make_headers, access_token, default_ip): + def _admin_headers( + access_token: str = access_token, client_ip: str = default_ip + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _admin_headers + + @pytest.fixture + def worker_headers(self, make_headers, make_access_token, worker, default_ip): + def _worker_headers( + access_token: str = make_access_token(worker["username"], "worker"), + client_ip: str = default_ip, + ) -> Dict[str, Any]: + return make_headers(access_token=access_token, client_ip=client_ip) + + return _worker_headers + + @pytest.fixture + def default_ip(self): + return "192.168.1.1" + + @pytest.fixture + def increase_ip(self): + def _increase_ip(prev_ip): + return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}" + + return _increase_ip + + def test_requested_task_worker_as_admin( + self, + client, + worker, + req_task_query_string, + admin_headers, + dbsession, + increase_ip, + ): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + + response = client.get( + "/requested-tasks/worker", + query_string=req_task_query_string, + headers=admin_headers(client_ip=increase_ip(last_ip)), + ) + assert response.status_code == 200 + + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are not updated when endpoint is called as admin + assert last_seen == db_worker.last_seen + assert last_ip == db_worker.last_ip + + def test_requested_task_worker_as_worker( + self, + client, + worker, + worker_headers, + req_task_query_string, + increase_ip, + dbsession, + ): + # Retrieve current object from DB + db_worker = dbm.Worker.get(dbsession, worker["name"]) + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip + new_ip = increase_ip(last_ip) + # Worker checks for requested tasks response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={"Authorization": access_token}, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) assert response.status_code == 200 - def test_requested_task_worker_as_worker(self, client, make_access_token, worker): + # Refresh current object from DB + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen and last_ip are updated in DB when endpoint is called as worker + assert last_seen != db_worker.last_seen + assert last_ip != db_worker.last_ip + assert str(db_worker.last_ip) == new_ip + + # second call will update only the last_seen attribute + last_seen = db_worker.last_seen + last_ip = db_worker.last_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={"Authorization": make_access_token(worker["username"], "worker")}, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) assert response.status_code == 200 + # Refresh current object from DB again + dbsession.expire(db_worker) + db_worker = dbm.Worker.get(dbsession, worker["name"]) + # last_seen has been updated again but not last_ip which did not changed + assert last_seen != db_worker.last_seen + assert str(db_worker.last_ip) == new_ip + @pytest.mark.parametrize( "prev_ip, new_ip, external_update_enabled, external_update_fails," " external_update_called", @@ -251,10 +350,11 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker def test_requested_task_worker_update_ip_whitelist( self, client, - make_access_token, worker, + req_task_query_string, prev_ip, new_ip, + worker_headers, external_update_enabled, external_update_fails, external_update_called, @@ -262,16 +362,8 @@ def test_requested_task_worker_update_ip_whitelist( # call it once to set prev_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={ - "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": prev_ip, - }, + query_string=req_task_query_string, + headers=worker_headers(client_ip=prev_ip), ) assert response.status_code == 200 @@ -293,16 +385,8 @@ def test_requested_task_worker_update_ip_whitelist( # call it once to set next_ip response = client.get( "/requested-tasks/worker", - query_string={ - "worker": worker["name"], - "avail_cpu": 4, - "avail_memory": 2048, - "avail_disk": 4096, - }, - headers={ - "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": new_ip, - }, + query_string=req_task_query_string, + headers=worker_headers(client_ip=new_ip), ) if external_update_fails: assert response.status_code == 503