Skip to content

Commit

Permalink
Rework / simplify handling of external IP update
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit74 committed Nov 14, 2023
1 parent 7b534de commit 4fadd57
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 30 deletions.
12 changes: 1 addition & 11 deletions dispatcher/backend/src/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

from common.enum import SchedulePeriodicity


def refreshable_constant(fn):
"""Refreshable constants helper for those we have interest to live update"""
return fn


OPENSSL_BIN = os.getenv("OPENSSL_BIN", "/usr/bin/openssl")
MESSAGE_VALIDITY = 60 # number of seconds before a message expire

Expand Down Expand Up @@ -72,11 +66,7 @@ def refreshable_constant(fn):
# using the following, it is possible to automate
# the update of a whitelist of workers IPs on Wasabi (S3 provider)
# enable this feature (default is off)
# Nota: this is a refreshable constant so that it can be dynamically updated
# (including in tests)
USES_WORKERS_IPS_WHITELIST = refreshable_constant(
lambda: bool(os.getenv("USES_WORKERS_IPS_WHITELIST", ""))
)
USES_WORKERS_IPS_WHITELIST = bool(os.getenv("USES_WORKERS_IPS_WHITELIST"))
MAX_WORKER_IP_CHANGES_PER_DAY = 4
# wasabi URL with credentials to update policy
WASABI_URL = os.getenv("WASABI_URL", "")
Expand Down
4 changes: 2 additions & 2 deletions dispatcher/backend/src/common/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

def update_workers_whitelist(session: so.Session):
"""update whitelist of workers on external services"""
IpUpdater.update_fn(build_workers_whitelist(session=session))
ExternalIpUpdater.update_fn(build_workers_whitelist(session=session))


def build_workers_whitelist(session: so.Session) -> typing.List[str]:
Expand Down Expand Up @@ -149,7 +149,7 @@ def get_statement():
)


class IpUpdater:
class ExternalIpUpdater:
update_fn = update_wasabi_whitelist


Expand Down
10 changes: 3 additions & 7 deletions dispatcher/backend/src/routes/requested_tasks/requested_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
from marshmallow import ValidationError

import db.models as dbm
from common import WorkersIpChangesCounts, getnow
from common.constants import (
ENABLED_SCHEDULER,
MAX_WORKER_IP_CHANGES_PER_DAY,
USES_WORKERS_IPS_WHITELIST,
)
from common import WorkersIpChangesCounts, constants, getnow
from common.constants import ENABLED_SCHEDULER, MAX_WORKER_IP_CHANGES_PER_DAY
from common.external import update_workers_whitelist
from common.schemas.orms import RequestedTaskFullSchema, RequestedTaskLightSchema
from common.schemas.parameters import (
Expand All @@ -25,7 +21,7 @@
)
from common.utils import task_event_handler
from db import count_from_stmt, dbsession, dbsession_manual
from errors.http import InvalidRequestJSON, TaskNotFound, WorkerNotFound, HTTPBase
from errors.http import HTTPBase, InvalidRequestJSON, TaskNotFound, WorkerNotFound
from routes import auth_info_if_supplied, authenticate, require_perm, url_uuid
from routes.base import BaseRoute
from routes.errors import NotFound
Expand Down
100 changes: 90 additions & 10 deletions dispatcher/backend/src/tests/integration/routes/workers/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import os
from typing import List

import pytest

from common.external import IpUpdater, build_workers_whitelist
from common import constants
from common.external import ExternalIpUpdater, build_workers_whitelist


class TestWorkersCommon:
Expand Down Expand Up @@ -212,6 +212,8 @@ def test_checkin_another_user(


class TestWorkerRequestedTasks:
new_ip_address = "88.88.88.88"

def test_requested_task_worker_as_admin(self, client, access_token, worker):
response = client.get(
"/requested-tasks/worker",
Expand All @@ -238,18 +240,35 @@ def test_requested_task_worker_as_worker(self, client, make_access_token, worker
)
assert response.status_code == 200

new_ip_address = "88.88.88.88"

def custom_ip_update(self, ip_addresses: List):
self.ip_updated = True
assert TestWorkerRequestedTasks.new_ip_address in ip_addresses

def custom_failing_ip_update(self, ip_addresses: List):
raise Exception()

@pytest.mark.parametrize(
"prev_ip, new_ip, external_update_enabled, external_update_fails,"
" external_update_called",
[
("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled
("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed
("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated
("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails
],
)
def test_requested_task_worker_update_ip_whitelist(
self, client, make_access_token, worker
self,
client,
make_access_token,
worker,
prev_ip,
new_ip,
external_update_enabled,
external_update_fails,
external_update_called,
):
self.ip_updated = False
IpUpdater.update_fn = self.custom_ip_update
os.environ["USES_WORKERS_IPS_WHITELIST"] = "1"
# call it once to set prev_ip
response = client.get(
"/requested-tasks/worker",
query_string={
Expand All @@ -260,8 +279,69 @@ def test_requested_task_worker_update_ip_whitelist(
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": TestWorkerRequestedTasks.new_ip_address,
"X-Forwarded-For": prev_ip,
},
)
assert response.status_code == 200
assert self.ip_updated

# check prev_ip has been set
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == prev_ip

# setup custom ip updater to intercept Wasabi operations
updater = IpUpdaterAndChecker(should_fail=external_update_fails)
assert TestWorkerRequestedTasks.new_ip_address not in updater.ip_addresses
ExternalIpUpdater.update_fn = updater.ip_update
constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled

# call it once to set next_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": new_ip,
},
)
if external_update_fails:
assert response.status_code == 503
else:
assert response.status_code == 200
assert updater.ips_updated == external_update_called
if external_update_called:
assert new_ip in updater.ip_addresses

# check new_ip has been set (even if ip update is disabled or has failed)
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == new_ip


class IpUpdaterAndChecker:
"""Helper class to intercept Wasabi operations and perform assertions"""

def __init__(self, should_fail: bool) -> None:
self.ips_updated = False
self.should_fail = should_fail
self.ip_addresses = []

def ip_update(self, ip_addresses: List):
if self.should_fail:
raise Exception()
else:
self.ips_updated = True
self.ip_addresses = ip_addresses

0 comments on commit 4fadd57

Please sign in to comment.