Skip to content

Commit

Permalink
feat: Introduce Automated UATs (#1)
Browse files Browse the repository at this point in the history
* Add the following test notebooks:
  * Test Katib Integration
  * Test KFP Integration
  * Test MinIO Integration
  * Test MLFlow Integration
* Add a driver to automate running the test notebooks included in the
  'notebooks' directory using Pytest
* Add README with instructions for running the test notebooks in an
  automated manner and CONTRIBUTING.md with guidelines around
  contributing test notebooks
* Add a driver to allow running the UATs included in the 'tests'
  directory in an automated manner
* Add README at the top-level directory including general information
  around the design of the UATs and instructions for running them.

Signed-off-by: Phoevos Kalemkeris <[email protected]>
  • Loading branch information
phoevos authored Jul 31, 2023
1 parent 7f2521e commit 53e0389
Show file tree
Hide file tree
Showing 26 changed files with 3,043 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.tox/
venv/
__pycache__/
.pytest_cache/
.ipynb_checkpoints/
.idea/
.vscode/
107 changes: 107 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Charmed Kubeflow Automated UATs

Automated User Acceptance Tests (UATs) are essential for evaluating the stability of Charmed
Kubeflow, as well as catching issues early, and are intended to be an invaluable testing tool both
pre-release and post-installation. They combine different components of Charmed Kubeflow in a way
that gives us confidence that everything works as expected, and are meant to be used by end-users
as well as developers alike.

Charmed Kubeflow UATs are broken down in test scenarios implemented as Python notebooks, which are
easy to share, understand, and maintain. We provide a **standalone** test suite included in `tests`
that users can run directly from inside a Notebook with `pytest`, as well as a `driver` that
automates the execution on an existing Kubeflow cluster. More details on running the tests can be
found in the [Run the tests](#run-the-tests) section.

## Prerequisites

Executing the UATs requires a deployed Kubeflow cluster. That said, the deployment and
configuration steps are outside the scope of this project. In other words, the automated tests are
going to assume programmatic access to a Kubeflow installation. Such a deployment consists (at the
very least) of the following pieces:

* A **Kubernetes cluster**, e.g.
* MicroK8s
* Charmed Kubernetes
* EKS cluster
* **Charmed Kubeflow** deployed on top of it
* **MLFlow (optional)** deployed alongside Kubeflow

For instructions on deploying and getting started with Charmed Kubeflow, we recommend that you
start with [this guide](https://charmed-kubeflow.io/docs/get-started-with-charmed-kubeflow).

The UATs include tests that assume MLFlow is installed alongside Kubeflow, which will otherwise
fail. For instructions on deploying MLFlow you can start with [this
guide](https://discourse.charmhub.io/t/deploying-charmed-mlflow-v2-and-kubeflow-to-eks/10973),
ignoring the EKS specific steps.

## Run the tests

As mentioned before, when it comes to running the tests, you've got 2 options:
* Running the `tests` suite directly with `pytest` inside a Jupyter Notebook
* Running the tests on an existing cluster using the `driver` along with the provided automation

### Running inside a Notebook

* Create a new Notebook using the `jupyter-scipy` image:
* Navigate to `Advanced options` > `Configurations`
* Select all available configurations in order for Kubeflow integrations to work as expected
* Launch the Notebook and wait for it to be created
* Start a new terminal session and clone this repo locally:

```bash
git clone https://github.com/canonical/charmed-kubeflow-uats.git
```
* Navigate to the `tests` directory:

```bash
cd charmed-kubeflow-uats/tests
```
* Follow the instructions of the provided [README.md](tests/README.md) to execute the test suite
with `pytest`

### Running from a configured management environment using the `driver`

In order to run the tests using the `driver`:
* Clone this repo locally and navigate to the repo directory:

```bash
git clone https://github.com/canonical/charmed-kubeflow-uats.git
cd charmed-kubeflow-uats/
```
* Setup `tox`:

```bash
python3 -m venv venv
source venv/bin/activate
pip install tox
```
* Run the UATs:

```bash
# assumes an existing `kubeflow` Juju model
tox -e uats
```

#### Developer Notes

Any environment that can be used to access and configure the Charmed Kubeflow deployment is
considered a configured management environment. That is, essentially, any machine with `kubectl`
access to the underlying Kubernetes cluster. This is crucial, since the driver directly depends on
a Kubernetes Job to run the tests. More specifically, the `driver` executes the following steps:
1. Create a Kubeflow Profile (i.e. `test-kubeflow`) to run the tests in
2. Submit a Kubernetes Job (i.e. `test-kubeflow`) that runs `tests`
The Job performs the following:
* Mount the local `tests` directory to a Pod that uses `jupyter-scipy` as the container image
* Install python dependencies specified in the [requirements.txt](tests/requirements.txt)
* Run the test suite by executing `pytest`
3. Wait until the Job completes (regardless of the outcome)
4. Collect and report its logs, corresponding to the `pytest` execution of `tests`
5. Cleanup (remove created Job and Profile)

##### Limitations

With the current implementation we have to wait until the Job completes to fetch its logs. Of
course this makes for a suboptimal UX, since the user might have to wait long before they learn
about the outcome of their tests. Ideally, the Job logs should be streamed directly to the `pytest`
output, providing real-time insight. This is a known limitation that will be addressed in a future
iteration.
37 changes: 37 additions & 0 deletions assets/test-job.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: batch/v1
kind: Job
metadata:
name: {{ job_name }}
spec:
backoffLimit: 0
template:
metadata:
labels:
access-minio: "true"
access-ml-pipeline: "true"
mlflow-server-minio: "true"
spec:
serviceAccountName: default-editor
containers:
- name: {{ job_name }}
image: {{ test_image }}
command:
- bash
- -c
args:
- |
cd /tests;
pip install -r requirements.txt >/dev/null;
pytest;
# Kill Istio Sidecar after workload completes to have the Job status properly updated
# https://github.com/istio/istio/issues/6324
x=$(echo $?);
curl -fsI -X POST http://localhost:15020/quitquitquit >/dev/null && exit $x;
volumeMounts:
- name: test-volume
mountPath: /tests
volumes:
- name: test-volume
hostPath:
path: {{ test_dir }}
restartPolicy: Never
8 changes: 8 additions & 0 deletions assets/test-profile.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: kubeflow.org/v1
kind: Profile
metadata:
name: {{ namespace }}
spec:
owner:
kind: User
name: {{ namespace }}@email.com
110 changes: 110 additions & 0 deletions driver/test_kubeflow_workloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import os
from pathlib import Path

import pytest
from lightkube import ApiError, Client, codecs
from lightkube.generic_resource import create_global_resource, load_in_cluster_generic_resources
from utils import assert_namespace_active, delete_job, fetch_job_logs, wait_for_job

log = logging.getLogger(__name__)

ASSETS_DIR = Path("assets")
JOB_TEMPLATE_FILE = ASSETS_DIR / "test-job.yaml.j2"
PROFILE_TEMPLATE_FILE = ASSETS_DIR / "test-profile.yaml.j2"

TESTS_DIR = os.path.abspath(Path("tests"))
TESTS_IMAGE = "kubeflownotebookswg/jupyter-scipy:v1.7.0"

NAMESPACE = "test-kubeflow"
PROFILE_RESOURCE = create_global_resource(
group="kubeflow.org",
version="v1",
kind="profile",
plural="profiles",
)

JOB_NAME = "test-kubeflow"


@pytest.fixture(scope="module")
def lightkube_client():
"""Initialise Lightkube Client."""
lightkube_client = Client()
load_in_cluster_generic_resources(lightkube_client)
return lightkube_client


@pytest.fixture(scope="module")
def create_profile(lightkube_client):
"""Create Profile and handle cleanup at the end of the module tests."""
log.info(f"Creating Profile {NAMESPACE}...")
resources = list(
codecs.load_all_yaml(
PROFILE_TEMPLATE_FILE.read_text(),
context={"namespace": NAMESPACE},
)
)
assert len(resources) == 1, f"Expected 1 Profile, got {len(resources)}!"
lightkube_client.create(resources[0])

yield

# delete the Profile at the end of the module tests
log.info(f"Deleting Profile {NAMESPACE}...")
lightkube_client.delete(PROFILE_RESOURCE, name=NAMESPACE)


@pytest.mark.abort_on_fail
async def test_create_profile(lightkube_client, create_profile):
"""Test Profile creation.
This test relies on the create_profile fixture, which handles the Profile creation and
is responsible for cleaning up at the end.
"""
try:
profile_created = lightkube_client.get(
PROFILE_RESOURCE,
name=NAMESPACE,
)
except ApiError as e:
if e.status == 404:
profile_created = False
else:
raise
assert profile_created, f"Profile {NAMESPACE} not found!"

assert_namespace_active(lightkube_client, NAMESPACE)


def test_kubeflow_workloads(lightkube_client):
"""Run a K8s Job to execute the notebook tests."""
log.info(f"Starting Kubernetes Job {NAMESPACE}/{JOB_NAME} to run notebook tests...")
resources = list(
codecs.load_all_yaml(
JOB_TEMPLATE_FILE.read_text(),
context={"job_name": JOB_NAME, "test_dir": TESTS_DIR, "test_image": TESTS_IMAGE},
)
)
assert len(resources) == 1, f"Expected 1 Job, got {len(resources)}!"
lightkube_client.create(resources[0], namespace=NAMESPACE)

try:
wait_for_job(lightkube_client, JOB_NAME, NAMESPACE)
except ValueError:
pytest.fail(
f"Something went wrong while running Job {NAMESPACE}/{JOB_NAME}. Please inspect the"
" attached logs for more info..."
)
finally:
log.info("Fetching Job logs...")
fetch_job_logs(JOB_NAME, NAMESPACE)


def teardown_module():
"""Cleanup resources."""
log.info(f"Deleting Job {NAMESPACE}/{JOB_NAME}...")
delete_job(JOB_NAME, NAMESPACE)
91 changes: 91 additions & 0 deletions driver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import subprocess

import tenacity
from lightkube import Client
from lightkube.resources.batch_v1 import Job
from lightkube.resources.core_v1 import Namespace

log = logging.getLogger(__name__)


@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=2, min=1, max=10),
stop=tenacity.stop_after_attempt(30),
reraise=True,
)
def assert_namespace_active(
client: Client,
namespace: str,
):
"""Test that the provided namespace is Active.
Retries multiple times to allow for the K8s namespace to be created and reach Active status.
"""
# raises a 404 ApiError if the namespace doesn't exist
ns = client.get(Namespace, namespace)
phase = ns.status.phase

log.info(f"Waiting for namespace {namespace} to become 'Active': phase == {phase}")
assert phase == "Active", f"Waited too long for namespace {namespace}!"


def _log_before_sleep(retry_state):
"""Custom callback to log the number of seconds before the next attempt."""
next_attempt = retry_state.attempt_number
delay = retry_state.next_action.sleep
log.info(f"Retrying in {int(delay)} seconds (attempts: {next_attempt})")


@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=2, min=1, max=32),
retry=tenacity.retry_if_not_result(lambda result: result),
stop=tenacity.stop_after_delay(60 * 60),
before_sleep=_log_before_sleep,
reraise=True,
)
def wait_for_job(
client: Client,
job_name: str,
namespace: str,
):
"""Wait for a Kubernetes Job to complete.
Keep retrying (up to a maximum of 3600 seconds) while the Job is active or just not yet ready,
and stop once it becomes successful. This is implemented using the built-in
`retry_if_not_result` tenacity function, along with `wait_for_job` returning False or True,
respectively.
If the Job fails or lands in an unexpected state, this function will raise a ValueError and
fail immediately.
"""
# raises a 404 ApiError if the Job doesn't exist
job = client.get(Job, name=job_name, namespace=namespace)
if job.status.succeeded:
# stop retrying, Job succeeded
log.info(f"Job {namespace}/{job_name} completed successfully!")
return True
elif job.status.failed:
raise ValueError(f"Job {namespace}/{job_name} failed!")
elif not job.status.ready or job.status.active:
# continue retrying
status = "active" if job.status.active else "not ready"
log.info(f"Waiting for Job {namespace}/{job_name} to complete (status == {status})")
return False
else:
raise ValueError(f"Unknown status {job.status} for Job {namespace}/{job_name}!")


def fetch_job_logs(job_name, namespace):
"""Fetch the logs produced by a Kubernetes Job."""
command = ["kubectl", "logs", "-n", namespace, f"job/{job_name}"]
subprocess.check_call(command)


def delete_job(job_name, namespace, lightkube_client=None):
"""Delete a Kubernetes Job."""
client = lightkube_client or Client()
client.delete(Job, name=job_name, namespace=namespace)
Loading

0 comments on commit 53e0389

Please sign in to comment.