Skip to content

Commit

Permalink
wip: orchestrator update
Browse files Browse the repository at this point in the history
  • Loading branch information
patrick borowy committed Mar 20, 2024
1 parent 484940b commit c4ad24f
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 28 deletions.
168 changes: 141 additions & 27 deletions src/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ async def get_image_from_container(container):
return container, details['Config']['Image']

async def images_of_containers(containers):
r = await asyncio.gather(*[get_image_from_container(container) for container in containers])
r = await asyncio.gather(
*[get_image_from_container(container) for container in containers]
)
return list(set(r))

async def get_digests_for_imgs(imgs: list[str]):
Expand All @@ -85,60 +87,172 @@ async def safe_get_image_manifest(image):
logging.exception(f"getting manifest for '{image}': {e}")
return image, None

results = await asyncio.gather(*[safe_get_image_manifest(image) for __container__, image in imgs])
results = await asyncio.gather(
*[safe_get_image_manifest(image) for __container__, image in imgs]
)

# Filter out None results
filtered_results = filter(lambda x: x[1] is not None, results)

return {img: digest for img, digest in filtered_results}

async def update_orchestrator(
existing_container, details, module_digest_map, last_pull_times
):
"""
The dificulty is that closing the orchestrator process would close this
running thread.
A solution is to create the new container first and then destroy the old one.
We can pass,
- `module_digest_map` (MODULE_DIGEST_MAP),
- `last_pull_times` (LAST_PULL_TIMES)
- the `container_id` (CLOSE_CONTAINER_ID)
to the newly created container.
Which allows the new orchestrator to sync on the job,
- delete the parent properly
(in-proc closing prevents delete, and delete requires closes).
- continue on the same "phase"
It's a hassle to use the container_id in order to broadcast a selfupdate
procedure to N orchestrators. Running multiple orchestrator has no benefit
and this is therfor not supported. Doing this would result in clones and
conflicting orch.
"""

def increment_name(existing_name:str) -> str:
"""
We cannot use the same name because two containers have to live at the
same time.
We embrace the problem by adding a incrementation marker. This let
users know how many times the service has updated.
The format is
`{image_name}_number`
if there is no `_number` we start at 1 which should result in
`{image_name}_1`
else we increment the number
"""
# Check if the existing name ends with a numerical suffix
if "_" in existing_name and existing_name.rsplit("_", 1)[1].isdigit():
# Split the name from the number and increment the number
name_part, number_part = existing_name.rsplit("_", 1)
new_name = f"{name_part}_{int(number_part) + 1}"
else:
# If no numerical suffix, append '_1'
new_name = f"{existing_name}_1"

return new_name

docker = Docker()
existing_configuration = details["Config"]
logging.info("RECREATE ORCHESTRATOR")
logging.info(existing_configuration)
logging.info("RECREATE ORCHESTRATOR")
new_container = await docker.containers.create_or_replace(
name=increment_name(details['Name'][1:]),
config=existing_configuration
)
await new_container.start()


async def recreate_container(
docker, container, module_digest_map, last_pull_times
):
"""Recreates a container asynchronously."""
container_id = container.id
details = await container.show()
config = details["Config"]
if config['Image'] == "exordelabs/orchestrator":
await update_orchestrator(
container, details, module_digest_map, last_pull_times
)
await asyncio.sleep(60*2) # pause this after self update trigger
return
logging.info(f"Recreating container {container_id}")
logging.info(f"{json.dumps(details, indent=4)}")

new_container = await docker.containers.create_or_replace(
name=details['Name'][1:], config=config
)
await new_container.start()


def build_update_function(delay: int, validity_threshold_seconds: int):
"""Builds and returns an asynchronous function to update containers with specified delay and validity threshold for image pulls."""
"""
Builds and returns an asynchronous function to update containers with
specified delay and validity threshold for image pulls.
"""
images_to_update = {}
last_pull_times = {}

preloaded_last_pull_times = os.getenv('LAST_PULL_TIMES', '')
if preloaded_last_pull_times != '':
last_pull_times = json.loads(preloaded_last_pull_times)
else:
last_pull_times = {}

async def pull_image_if_needed(docker, image):
"""Pulls a Docker image if it hasn't been pulled recently."""
now = datetime.now()
if image not in last_pull_times or now - last_pull_times[image] > timedelta(seconds=validity_threshold_seconds):
print(f"Pulling image {image}...")
if (
image not in last_pull_times or
now - last_pull_times[image] > timedelta(
seconds=validity_threshold_seconds
)
):
logging.info(f"Pulling image {image}...")
await docker.images.pull(image)
print(f"Image {image} pulled.")
logging.info(f"Image {image} pulled.")
last_pull_times[image] = now
else:
print(f"Image {image} pull skipped due to recent pull.")

async def recreate_container(docker, container, image):
"""Recreates a container asynchronously."""
container_id = container.id # Adjusted to access the container's ID property
print(f"Recreating container {container_id} with image {image}...")
await container.stop()
await container.delete()
print(f"Container {container_id} stopped and deleted.")

async def schedule_update(container, image: str):
"""Schedules an update for a container, ensuring the image is pulled only once within the validity window."""
logging.info(f"Image {image} pull skipped due to recent pull.")

async def schedule_update(container, image: str, module_digest_map):
"""
Schedules an update for a container, ensuring the image is pulled only
once within the validity window.
"""
if image not in images_to_update:
images_to_update[image] = []
images_to_update[image].append(container)

# Pull and recreate immediately within the context of scheduling to honor delay and validity
# Pull and recreate immediately within the context of scheduling
# to honor delay and validity
async with Docker() as docker:
await pull_image_if_needed(docker, image)
await asyncio.sleep(delay) # Delay before recreating the container
await recreate_container(docker, container, image)
await recreate_container(
docker, container, module_digest_map, last_pull_times
)

return schedule_update
# Example usage:
schedule_update = build_update_function(delay=5, validity_threshold_seconds=30) # Set delay to 5 seconds and validity threshold to 30 seconds

schedule_update = build_update_function(
delay=120, validity_threshold_seconds=30
)

def build_updater():
module_digest_map = {} # stores the digests
preloaded_module_digest_map = os.getenv('MODULE_DIGEST_MAP', '')
if preloaded_module_digest_map != '':
module_digest_map = json.loads(preloaded_module_digest_map)
else:
module_digest_map = {}
async def enforce_versioning(client):
"""
1. retrieve the latest digest for each currently running container (module_digest_map)
2. if a digest is different in current module_digest_map (or empty) -> trigger update for that container
1. retrieve the latest digest for each currently running container
(module_digest_map)
2. if a digest is different in current module_digest_map (or empty)
-> trigger update for that container
3. update module_digest_map
"""
logging.info("Enforcing versioning")
Expand All @@ -156,7 +270,7 @@ async def enforce_versioning(client):
# If digest is different or container is not in module_digest_map, trigger update
if current_digest is None or current_digest != latest_digest:
logging.info(f"Scheduling an update for {img}")
await schedule_update(container, img)
await schedule_update(container, img, module_digest_map)
# Update the module_digest_map with the latest digest
module_digest_map[container] = latest_digest
logging.info("Versioning loop complete")
Expand Down
71 changes: 71 additions & 0 deletions tests/test_container_recreation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import pytest
import json
import logging
from aiodocker import Docker
from update import recreate_container
import asyncio


@pytest.mark.asyncio
async def test_get_container_configuration():
client = Docker()
container = await client.containers.create_or_replace(
name="test_container",
config={
"Image": "exordelabs/orchestrator",
"Labels": {
"network.exorde.test": "true"
}
}
)
await container.start()

info = await container.show()
config = info["Config"]

logging.info(f"{json.dumps(config, indent=4)}")

await container.stop()
await container.delete()
await client.close()

@pytest.fixture
def container_cleanup():
yield
async def run():
logging.info("Removing dangling test containers")
client = Docker()
containers = await client.containers.list(
filters={'label': ["network.exorde.test=true"] }, all=True
)
for container in containers:
await container.stop()
await container.delete()
await client.close()
asyncio.run(run())

@pytest.mark.asyncio
async def test_container_recreation(container_cleanup):
client = Docker()
container = await client.containers.create_or_replace(
name="test_container",
config={
"Image": "exordelabs/transactioneer",
"Labels": {
"network.exorde.test": "true"
}
}
)
await container.start()

containers = await client.containers.list(
filters={'label': ["network.exorde.test=true"] }, all=True
)
assert len(containers) == 1
await recreate_container(client, container)

containers = await client.containers.list(
filters={'label': ["network.exorde.test=true"] }, all=True
)
assert len(containers) == 1
await client.close()
2 changes: 1 addition & 1 deletion tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ async def test_update():

await enforce_versioning(client)

# enforce_versioning should kill the online containers after a pull
# enforce_versioning should recreate containers after a pull
containers_to_watch = await retrieve_list_of_containers_to_watch(client)
assert len(containers_to_watch) == 0

0 comments on commit c4ad24f

Please sign in to comment.