-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refact: Make topic explicit in message queue API #358
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this makes sense. Trying to understand the hierarchy of control now
- task definitions have an
agent_id
specifying what service to send to (this probably needs to be renamed in the future) - messages have a
type
used for judging how a service handles a message - the
topic
for a consumer is defined by the control plane config topic + msg type?- does this mean each service + message type combination has its own topic/queue now?
Slightly fuzzy if there's anything beyond this 😅 I feel like I need to draw this out now
Correct, this didn't change.
Same as before
Correct, this is brand new
Exactly, which makes it possible to share the same Kafka cluster (but really any message queue broker) across multiple control planes
That's really it :) |
@masci thanks for confirming! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- wow, this turned out to be a big one.
Should we add some issues/tickets for making topic
's explicit with the other existing message queue integrations?
# register to control plane | ||
control_plane_url = ( | ||
f"http://{control_plane_config.host}:{control_plane_config.port}" | ||
) | ||
await service.register_to_control_plane(control_plane_url) | ||
|
||
# register to message queue | ||
consumer_fn = await service.register_to_message_queue() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if its a pre-requisite to register to the service (and its topic) to the message queue.
They both need to happen, and both are viewed as pre-requisites ofc for a user to send a task to a service:
User sends task -> control plane publishes message with associated topic -> message queue pushes the message to the correct topic queue -> the service consumes the task
|
||
@property | ||
def publish_callback(self) -> Optional[PublishCallback]: | ||
return None | ||
|
||
@abstractmethod | ||
def get_topic(self, msg_type: str) -> str: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think before we were thinking of delegating such responsibilities to the control plane, which should then know about all of the existing topics.
Follow up issues: |
Apologies for the mega-refactoring, breaking this down into smaller PR would take a week 😛
Fixes #349 for good.
Problem
Message publishers and subcribers assumed the "topic" to listen to was the same as the message type. This works in a single-tenant environment but becomes a problem when the same message queue holds data for multiple control planes and services, causing clashes.
Solution
control_plane_one.my_service
so that services with the same name attached to different control planes can coexistFollow up