-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store Kafka messages in user session #106
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #106 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 5 5
Lines 43 44 +1
=========================================
+ Hits 43 44 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some v minor suggestions, but otherwise looks good 👍.
@@ -2,11 +2,10 @@ services: | |||
app: | |||
build: . | |||
command: | |||
- sh | |||
- bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did plain old sh
not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a strictly necessary change in this PR. I found that bash seems to be better at forwarding interrupt signals to child processes so services shutdown more quickly and cleanly.
"""Add commandline options.""" | ||
parser.add_argument("--debug", action="store_true") | ||
|
||
def handle(self, debug: bool = False, **kwargs: Any) -> None: # type: ignore[misc] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably this is because we disallow explicit Any
? Tbh I'm not sure that setting is super helpful -- there are cases (like this one) where an explicit Any
makes sense. Maybe we should change that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually like to disallow explicity Any as I think Any's can be a bit of a crutch so prefer that we think carefully about where we use them and explicitly silence the check where necessary.
self.stdout.flush() | ||
bm = BroadcastMessage() | ||
bm.ParseFromString(message.value) | ||
message_bodies.append(bm.data.value.decode("utf-8")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message_bodies.append(bm.data.value.decode("utf-8")) | |
message_bodies.append(bm.data.value.decode()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very neat!
kafka_consumer: | ||
build: . | ||
command: | ||
- bash | ||
- -c | ||
- | | ||
python manage.py kafka_consumer --debug |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, just to make sure I get this, we are using the very same code AND database in both apps, but in one we run the server and in the other we run the kafka_consumer. Is that to better use the compute resources or to better manage the different components in case one of them fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. Ideally Docker Compose is built around the idea of only having one process running in each container. Whilst we could run this is the background under the app
service we would indeed struggle with handling error states. I also think it's cleaner to have the consumer logs in a separate feed.
Ideally we might also run the ssh server in the drunc
service as a separate service but there is a hard coded restriction in the dummy_boot command that the process must be started on localhost.
It's also a nice proof of principle that these things can be run in separate places which will be important for deployment where we may have multiple copies of the app running but will only ever want one consumer.
Thanks for the reviews so far. I'm planning to merge this towards the end of the day. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I follow the logic. Apologies; pile of questions as usual.
Messages are polled from Kafka by a special meta/admin Django function non-stop.
Users who are logged in at a given moment have their sessions atomically updated with messages.
This means that if one user pops the messages, they are not popped for all users.
sessions = Session.objects.all() | ||
for session in sessions: | ||
store = SessionStore(session_key=session.session_key) | ||
store.setdefault("messages", []).extend(message_bodies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So setdefault
handles where messages
hasn't been added to session, and extend
is a funny way of saying append
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. setdefault
is shorthand for the below pattern:
value = mydict.get("key")
if value is None:
value = []
mydict["key"] = value
Extend is basically append but it takes an iterator of values to add rather than a single one. The key thing is that it mutates the existing list.
with transaction.atomic(): | ||
# atomic here to prevent race condition with messages being | ||
# popped by the web application | ||
sessions = Session.objects.all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets all currently logged in users' sessions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets all sessions in the database. This broadly corresponds to logged in users subject to the caveat I mention above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This command file is in some magic directory which is automatically loaded by Django?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description
Moves from storing Kafka messages in a hacky global variable to using the Session data of active users. This works by:
This approach is a fairly basic but gives some nice properties:
One caveat is the need to clear the session store in deployment - https://docs.djangoproject.com/en/5.1/topics/http/sessions/#clearing-the-session-store. We should also agree a session expiry period with the project team.
Fixes #76.
Type of change
Key checklist
python -m pytest
)python -m sphinx -b html docs docs/build
)pre-commit run --all-files
)Further checks