From 4fadd5793f013d95fc560b8b9f238154ddbac615 Mon Sep 17 00:00:00 2001 From: benoit74 Date: Tue, 14 Nov 2023 15:54:37 +0100 Subject: [PATCH] Rework / simplify handling of external IP update --- dispatcher/backend/src/common/constants.py | 12 +-- dispatcher/backend/src/common/external.py | 4 +- .../routes/requested_tasks/requested_task.py | 10 +- .../integration/routes/workers/test_worker.py | 100 ++++++++++++++++-- 4 files changed, 96 insertions(+), 30 deletions(-) diff --git a/dispatcher/backend/src/common/constants.py b/dispatcher/backend/src/common/constants.py index fc8ac106..c61ce894 100644 --- a/dispatcher/backend/src/common/constants.py +++ b/dispatcher/backend/src/common/constants.py @@ -3,12 +3,6 @@ from common.enum import SchedulePeriodicity - -def refreshable_constant(fn): - """Refreshable constants helper for those we have interest to live update""" - return fn - - OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl") MESSAGE_VALIDITY = 60 # number of seconds before a message expire @@ -72,11 +66,7 @@ def refreshable_constant(fn): # using the following, it is possible to automate # the update of a whitelist of workers IPs on Wasabi (S3 provider) # enable this feature (default is off) -# Nota: this is a refreshable constant so that it can be dynamically updated -# (including in tests) -USES_WORKERS_IPS_WHITELIST = refreshable_constant( - lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", "")) -) +USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST")) MAX_WORKER_IP_CHANGES_PER_DAY = 4 # wasabi URL with credentials to update policy WASABI_URL = os.getenv("WASABI_URL", "") diff --git a/dispatcher/backend/src/common/external.py b/dispatcher/backend/src/common/external.py index 5df271ab..fd5de704 100644 --- a/dispatcher/backend/src/common/external.py +++ b/dispatcher/backend/src/common/external.py @@ -29,7 +29,7 @@ def update_workers_whitelist(session: so.Session): """update whitelist of workers on external services""" - IpUpdater.update_fn(build_workers_whitelist(session=session)) + ExternalIpUpdater.update_fn(build_workers_whitelist(session=session)) def build_workers_whitelist(session: so.Session) -> typing.List[str]: @@ -149,7 +149,7 @@ def get_statement(): ) -class IpUpdater: +class ExternalIpUpdater: update_fn = update_wasabi_whitelist diff --git a/dispatcher/backend/src/routes/requested_tasks/requested_task.py b/dispatcher/backend/src/routes/requested_tasks/requested_task.py index 6474e7b6..68805193 100644 --- a/dispatcher/backend/src/routes/requested_tasks/requested_task.py +++ b/dispatcher/backend/src/routes/requested_tasks/requested_task.py @@ -9,12 +9,8 @@ from marshmallow import ValidationError import db.models as dbm -from common import WorkersIpChangesCounts, getnow -from common.constants import ( - ENABLED_SCHEDULER, - MAX_WORKER_IP_CHANGES_PER_DAY, - USES_WORKERS_IPS_WHITELIST, -) +from common import WorkersIpChangesCounts, constants, getnow +from common.constants import ENABLED_SCHEDULER, MAX_WORKER_IP_CHANGES_PER_DAY from common.external import update_workers_whitelist from common.schemas.orms import RequestedTaskFullSchema, RequestedTaskLightSchema from common.schemas.parameters import ( @@ -25,7 +21,7 @@ ) from common.utils import task_event_handler from db import count_from_stmt, dbsession, dbsession_manual -from errors.http import InvalidRequestJSON, TaskNotFound, WorkerNotFound, HTTPBase +from errors.http import HTTPBase, InvalidRequestJSON, TaskNotFound, WorkerNotFound from routes import auth_info_if_supplied, authenticate, require_perm, url_uuid from routes.base import BaseRoute from routes.errors import NotFound diff --git a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py index fac50461..3be8cf17 100644 --- a/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py +++ b/dispatcher/backend/src/tests/integration/routes/workers/test_worker.py @@ -1,9 +1,9 @@ -import os from typing import List import pytest -from common.external import IpUpdater, build_workers_whitelist +from common import constants +from common.external import ExternalIpUpdater, build_workers_whitelist class TestWorkersCommon: @@ -212,6 +212,8 @@ def test_checkin_another_user( class TestWorkerRequestedTasks: + new_ip_address = "88.88.88.88" + def test_requested_task_worker_as_admin(self, client, access_token, worker): response = client.get( "/requested-tasks/worker", @@ -238,18 +240,35 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker ) assert response.status_code == 200 - new_ip_address = "88.88.88.88" - def custom_ip_update(self, ip_addresses: List): self.ip_updated = True assert TestWorkerRequestedTasks.new_ip_address in ip_addresses + def custom_failing_ip_update(self, ip_addresses: List): + raise Exception() + + @pytest.mark.parametrize( + "prev_ip, new_ip, external_update_enabled, external_update_fails," + " external_update_called", + [ + ("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled + ("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed + ("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated + ("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails + ], + ) def test_requested_task_worker_update_ip_whitelist( - self, client, make_access_token, worker + self, + client, + make_access_token, + worker, + prev_ip, + new_ip, + external_update_enabled, + external_update_fails, + external_update_called, ): - self.ip_updated = False - IpUpdater.update_fn = self.custom_ip_update - os.environ["USES_WORKERS_IPS_WHITELIST"] = "1" + # call it once to set prev_ip response = client.get( "/requested-tasks/worker", query_string={ @@ -260,8 +279,69 @@ def test_requested_task_worker_update_ip_whitelist( }, headers={ "Authorization": make_access_token(worker["username"], "worker"), - "X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address, + "X-Forwarded-For": prev_ip, }, ) assert response.status_code == 200 - assert self.ip_updated + + # check prev_ip has been set + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == prev_ip + + # setup custom ip updater to intercept Wasabi operations + updater = IpUpdaterAndChecker(should_fail=external_update_fails) + assert TestWorkerRequestedTasks.new_ip_address not in updater.ip_addresses + ExternalIpUpdater.update_fn = updater.ip_update + constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled + + # call it once to set next_ip + response = client.get( + "/requested-tasks/worker", + query_string={ + "worker": worker["name"], + "avail_cpu": 4, + "avail_memory": 2048, + "avail_disk": 4096, + }, + headers={ + "Authorization": make_access_token(worker["username"], "worker"), + "X-Forwarded-For": new_ip, + }, + ) + if external_update_fails: + assert response.status_code == 503 + else: + assert response.status_code == 200 + assert updater.ips_updated == external_update_called + if external_update_called: + assert new_ip in updater.ip_addresses + + # check new_ip has been set (even if ip update is disabled or has failed) + response = client.get("/workers/") + assert response.status_code == 200 + response_data = response.get_json() + for item in response_data["items"]: + if item["name"] != worker["name"]: + continue + assert item["last_ip"] == new_ip + + +class IpUpdaterAndChecker: + """Helper class to intercept Wasabi operations and perform assertions""" + + def __init__(self, should_fail: bool) -> None: + self.ips_updated = False + self.should_fail = should_fail + self.ip_addresses = [] + + def ip_update(self, ip_addresses: List): + if self.should_fail: + raise Exception() + else: + self.ips_updated = True + self.ip_addresses = ip_addresses