Skip to content

Commit

Permalink
Merge pull request #106 from ImperialCollegeLondon/messages-in-user-s…
Browse files Browse the repository at this point in the history
…ession

Store Kafka messages in user session
  • Loading branch information
cc-a authored Sep 25, 2024
2 parents 3864185 + 76c440a commit a34e2c9
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 65 deletions.
26 changes: 20 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ services:
app:
build: .
command:
- sh
- bash
- -c
- |
python manage.py migrate
python scripts/kafka_consumer.py &
python manage.py runserver 0:8000
ports:
- 127.0.0.1:8000:8000
Expand All @@ -15,14 +14,12 @@ services:
- db:/usr/src/app/db
environment:
- PROCESS_MANAGER_URL=drunc:10054
- KAFKA_URL=kafka:9092
depends_on:
kafka:
condition: service_healthy
- drunc
drunc:
build: ./drunc_docker_service/
command:
- sh
- bash
- -c
- |
service ssh start &&
Expand All @@ -48,5 +45,22 @@ services:
interval: 1s
timeout: 6s
retries: 20
kafka_consumer:
build: .
command:
- bash
- -c
- |
python manage.py kafka_consumer --debug
environment:
- KAFKA_ADDRESS=kafka:9092
volumes:
- .:/usr/src/app
- db:/usr/src/app/db
depends_on:
kafka:
condition: service_healthy
app:
condition: service_started
volumes:
db:
8 changes: 8 additions & 0 deletions dune_processes/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "db" / "db.sqlite3",
# avoid database locking issues between Kafka consumer and web app
# https://docs.djangoproject.com/en/5.1/ref/databases/#database-is-locked-errors
"OPTIONS": {
"timeout": 5,
"transaction_mode": "IMMEDIATE",
},
}
}

Expand Down Expand Up @@ -143,3 +149,5 @@
INSTALLED_APPS += ["crispy_forms", "crispy_bootstrap5"]
CRISPY_ALLOWED_TEMPLATE_PACKS = "bootstrap5"
CRISPY_TEMPLATE_PACK = "bootstrap5"

KAFKA_ADDRESS = os.getenv("KAFKA_ADDRESS", "kafka:9092")
1 change: 1 addition & 0 deletions main/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Django management module."""
51 changes: 51 additions & 0 deletions main/management/commands/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Django management command to populate Kafka messages into application database."""

from argparse import ArgumentParser
from typing import Any

from django.conf import settings
from django.contrib.sessions.backends.db import SessionStore
from django.contrib.sessions.models import Session
from django.core.management.base import BaseCommand
from django.db import transaction
from druncschema.broadcast_pb2 import BroadcastMessage
from kafka import KafkaConsumer


class Command(BaseCommand):
"""Consumes messages from Kafka and stores them in active user sessions."""

help = __doc__

def add_arguments(self, parser: ArgumentParser) -> None:
"""Add commandline options."""
parser.add_argument("--debug", action="store_true")

def handle(self, debug: bool = False, **kwargs: Any) -> None: # type: ignore[misc]
"""Command business logic."""
consumer = KafkaConsumer(bootstrap_servers=[settings.KAFKA_ADDRESS])
consumer.subscribe(pattern="control.*.process_manager")
# TODO: determine why the below doesn't work
# consumer.subscribe(pattern="control.no_session.process_manager")

self.stdout.write("Listening for messages from Kafka.")
while True:
for messages in consumer.poll(timeout_ms=500).values():
message_bodies = []
for message in messages:
if debug:
self.stdout.write(f"Message received: {message}")
self.stdout.flush()
bm = BroadcastMessage()
bm.ParseFromString(message.value)
message_bodies.append(bm.data.value.decode("utf-8"))

if message_bodies:
with transaction.atomic():
# atomic here to prevent race condition with messages being
# popped by the web application
sessions = Session.objects.all()
for session in sessions:
store = SessionStore(session_key=session.session_key)
store.setdefault("messages", []).extend(message_bodies)
store.save()
1 change: 0 additions & 1 deletion main/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@
path("process_action/", views.process_action, name="process_action"),
path("logs/<uuid:uuid>", views.logs, name="logs"),
path("boot_process/", views.BootProcessView.as_view(), name="boot_process"),
path("message/", views.deposit_message, name="message"),
]
31 changes: 6 additions & 25 deletions main/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import asyncio
import uuid
from enum import Enum
from http import HTTPStatus

import django_tables2
from django.conf import settings
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.db import transaction
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse, reverse_lazy
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
from django.views.generic.edit import FormView
from drunc.process_manager.process_manager_driver import ProcessManagerDriver
from drunc.utils.shell_utils import DecodedResponse, create_dummy_token_from_uname
Expand All @@ -28,11 +26,6 @@
from .forms import BootProcessForm
from .tables import ProcessTable

# extreme hackiness suitable only for demonstration purposes
# TODO: replace this with per-user session storage - once we've added auth
MESSAGES: list[str] = []
"""Broadcast messages to display to the user."""


def get_process_manager_driver() -> ProcessManagerDriver:
"""Get a ProcessManagerDriver instance."""
Expand Down Expand Up @@ -77,8 +70,11 @@ def index(request: HttpRequest) -> HttpResponse:
table_configurator = django_tables2.RequestConfig(request)
table_configurator.configure(table)

global MESSAGES
MESSAGES, messages = [], MESSAGES
with transaction.atomic():
# atomic to avoid race condition with kafka consumer
messages = request.session.load().get("messages", [])
request.session.pop("messages", [])
request.session.save()

context = {"table": table, "messages": messages}
return render(request=request, context=context, template_name="main/index.html")
Expand Down Expand Up @@ -203,18 +199,3 @@ def form_valid(self, form: BootProcessForm) -> HttpResponse:
"""
asyncio.run(_boot_process("root", form.cleaned_data))
return super().form_valid(form)


@require_POST
@csrf_exempt
def deposit_message(request: HttpRequest) -> HttpResponse:
"""Upload point for broadcast messages for display to end user.
Args:
request: the triggering request.
Returns:
A NO_CONTENT response.
"""
MESSAGES.append(request.POST["message"])
return HttpResponse(status=HTTPStatus.NO_CONTENT)
33 changes: 0 additions & 33 deletions scripts/kafka_consumer.py

This file was deleted.

19 changes: 19 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ def test_index_view_admin(self, admin_client, mocker):
assert "table" in response.context
assertContains(response, "Boot</a>")

def test_session_messages(self, auth_client, mocker):
"""Test the rendering of messages from the user session into the view."""
from django.contrib.sessions.backends.db import SessionStore
from django.contrib.sessions.models import Session

mocker.patch("main.views.get_session_info")
session = Session.objects.get()
message_data = ["message 1", "message 2"]
store = SessionStore(session_key=session.session_key)
store["messages"] = message_data
store.save()

response = auth_client.get(self.endpoint)
assert response.status_code == HTTPStatus.OK

# messages have been removed from the session and added to the context
assert response.context["messages"] == message_data
assert "messages" not in store.load()


class TestLogsView(LoginRequiredTest):
"""Tests for the logs view."""
Expand Down

0 comments on commit a34e2c9

Please sign in to comment.