Skip to content

Commit

Permalink
Fix update of worker last_seen property
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Nov 27, 2023
1 parent 355f828 commit 039852f
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 43 deletions.
17 changes: 12 additions & 5 deletions dispatcher/backend/src/routes/requested_tasks/requested_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,27 @@ def get(self, session: so.Session, token: AccessToken.Payload):
# record we've seen a worker, if applicable
if token and worker_name:
worker = dbm.Worker.get(session, worker_name, WorkerNotFound)

# Update worker properties only if called as the worker itself, not as an
# admin
if worker.user.username == token.username:
worker.last_seen = getnow()

# IP changed since last encounter
if str(worker.last_ip) != worker_ip:
ip_changed = str(worker.last_ip) != worker_ip
if ip_changed:
logger.info(
f"Worker IP changed detected for {worker_name}: "
f"IP changed from {worker.last_ip} to {worker_ip}"
)
worker.last_ip = worker_ip
# commit explicitely since we are not using an explicit transaction,
# and do it before calling Wasabi so that changes are propagated
# quickly and transaction is not blocking
session.commit()

# commit explicitely since we are not using an explicit transaction,
# and do it before calling Wasabi so that changes are propagated
# quickly and transaction is not blocking
session.commit()

if ip_changed:
if constants.USES_WORKERS_IPS_WHITELIST:
try:
record_ip_change(session=session, worker_name=worker_name)
Expand Down
160 changes: 122 additions & 38 deletions dispatcher/backend/src/tests/integration/routes/workers/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import List
from typing import Any, Dict, List

import pytest

import db.models as dbm
from common import constants
from common.external import ExternalIpUpdater, build_workers_whitelist

Expand Down Expand Up @@ -212,32 +213,130 @@ def test_checkin_another_user(


class TestWorkerRequestedTasks:
def test_requested_task_worker_as_admin(self, client, access_token, worker):
@pytest.fixture()
def req_task_query_string(self, worker):
return {
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
}

@pytest.fixture
def make_headers(self):
def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]:
return {
"Authorization": access_token,
"X-Forwarded-For": client_ip,
}

return _make_headers

@pytest.fixture
def admin_headers(self, make_headers, access_token, default_ip):
def _admin_headers(
access_token: str = access_token, client_ip: str = default_ip
) -> Dict[str, Any]:
return make_headers(access_token=access_token, client_ip=client_ip)

return _admin_headers

@pytest.fixture
def worker_headers(self, make_headers, make_access_token, worker, default_ip):
def _worker_headers(
access_token: str = make_access_token(worker["username"], "worker"),
client_ip: str = default_ip,
) -> Dict[str, Any]:
return make_headers(access_token=access_token, client_ip=client_ip)

return _worker_headers

@pytest.fixture
def default_ip(self):
return "192.168.1.1"

@pytest.fixture
def increase_ip(self):
def _increase_ip(prev_ip):
return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}"

return _increase_ip

def test_requested_task_worker_as_admin(
self,
client,
worker,
req_task_query_string,
admin_headers,
dbsession,
increase_ip,
):
# Retrieve current object from DB
db_worker = dbm.Worker.get(dbsession, worker["name"])
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip

response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=admin_headers(client_ip=increase_ip(last_ip)),
)
assert response.status_code == 200

# Refresh current object from DB
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen and last_ip are not updated when endpoint is called as admin
assert last_seen == db_worker.last_seen
assert last_ip == db_worker.last_ip

def test_requested_task_worker_as_worker(
self,
client,
worker,
worker_headers,
req_task_query_string,
increase_ip,
dbsession,
):
# Retrieve current object from DB
db_worker = dbm.Worker.get(dbsession, worker["name"])
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip
new_ip = increase_ip(last_ip)
# Worker checks for requested tasks
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": access_token},
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
assert response.status_code == 200

def test_requested_task_worker_as_worker(self, client, make_access_token, worker):
# Refresh current object from DB
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen and last_ip are updated in DB when endpoint is called as worker
assert last_seen != db_worker.last_seen
assert last_ip != db_worker.last_ip
assert str(db_worker.last_ip) == new_ip

# second call will update only the last_seen attribute
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": make_access_token(worker["username"], "worker")},
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
assert response.status_code == 200

# Refresh current object from DB again
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen has been updated again but not last_ip which did not changed
assert last_seen != db_worker.last_seen
assert str(db_worker.last_ip) == new_ip

@pytest.mark.parametrize(
"prev_ip, new_ip, external_update_enabled, external_update_fails,"
" external_update_called",
Expand All @@ -251,27 +350,20 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker
def test_requested_task_worker_update_ip_whitelist(
self,
client,
make_access_token,
worker,
req_task_query_string,
prev_ip,
new_ip,
worker_headers,
external_update_enabled,
external_update_fails,
external_update_called,
):
# call it once to set prev_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": prev_ip,
},
query_string=req_task_query_string,
headers=worker_headers(client_ip=prev_ip),
)
assert response.status_code == 200

Expand All @@ -293,16 +385,8 @@ def test_requested_task_worker_update_ip_whitelist(
# call it once to set next_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": new_ip,
},
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
if external_update_fails:
assert response.status_code == 503
Expand Down

0 comments on commit 039852f

Please sign in to comment.