Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub delayed scheduling proposal #12

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

yaron2
Copy link
Member

@yaron2 yaron2 commented Nov 23, 2022

Signed-off-by: yaron2 [email protected]

daixiang0
daixiang0 previously approved these changes Nov 30, 2022
Copy link
Member

@daixiang0 daixiang0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@daixiang0
Copy link
Member

+1 binding

1 similar comment
@yaron2
Copy link
Member Author

yaron2 commented Nov 30, 2022

+1 binding

Copy link
Contributor

@mukundansundar mukundansundar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question @yaron2 is what happens to the publishingScopes and subscribingScopes definition that we have for limiting topic access? https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-scopes/

How will that work with <app>-delayed topic?

0005-R-pubsub-delayed-scheduling.md Show resolved Hide resolved
0005-R-pubsub-delayed-scheduling.md Show resolved Hide resolved
@yaron2
Copy link
Member Author

yaron2 commented Nov 30, 2022

One more question @yaron2 is what happens to the publishingScopes and subscribingScopes definition that we have for limiting topic access? https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-scopes/

How will that work with <app>-delayed topic?

It works as is, unchanged. Again, this is an implementation detail but Dapr checks if the app is allowed to publish before it sends it to any topic, delayed or not. On the receiving side, Dapr checks if the app is allowed to subscribe regardless of delayed topics as they only act as an intermediary.

@mukundansundar
Copy link
Contributor

One more question @yaron2 is what happens to the publishingScopes and subscribingScopes definition that we have for limiting topic access? https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-scopes/
How will that work with <app>-delayed topic?

It works as is, unchanged. Again, this is an implementation detail but Dapr checks if the app is allowed to publish before it sends it to any topic, delayed or not. On the receiving side, Dapr checks if the app is allowed to subscribe regardless of delayed topics as they only act as an intermediary.

The scoped topics won't change but in addition to that will the application need access to an <app>-delayed topic?

@yaron2
Copy link
Member Author

yaron2 commented Nov 30, 2022

One more question @yaron2 is what happens to the publishingScopes and subscribingScopes definition that we have for limiting topic access? https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-scopes/
How will that work with <app>-delayed topic?

It works as is, unchanged. Again, this is an implementation detail but Dapr checks if the app is allowed to publish before it sends it to any topic, delayed or not. On the receiving side, Dapr checks if the app is allowed to subscribe regardless of delayed topics as they only act as an intermediary.

The scoped topics won't change but in addition to that will the application need access to an <app>-delayed topic?

No, this is a Dapr internally used topic.

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/singularity?metadata.scheduledSendTime=2045-11-05T08:15:30-05:00 -H "Content-Type: application/json" -d '{"execute-order": "66"}'
```

Upon receiving a delayed message, the Dapr runtime will examine the due date for the message and publish it to the target topic if time is due. If the time isn't due, the Dapr runtime will hold the message until the time is right to send it without ACKing back to the broker. If the Dapr instance crashes, the message will remain in the broker and be consumed by a different instance.
Copy link
Member

@artursouza artursouza Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work in every broker. Not acking a message can cause no other messages to be consumed in some configurations. In Kafka, there is in-order guarantee per partition, so not acking a message will clog the partition for other messages - even for those that are already due.

Instead, this should implement an outbox pattern. In the outbox pattern, messages are only published after a business transaction takes place - in this case, the business transaction is done by the clock. Then, the component must scan states in a state store to identify which messages are due and publish those, deleting the corresponding record.

The state store keys would basically be in segments of 1h (or any other granularity) and contain the ids for every message that must be delivered in that hour. Then, daprd would look at the current hour and previous (configurable by the user to be -n hours) and publish every message that is due in those time windows.

This will make this solution work in a predictable way without depending on broker-specific handling of "not acking messages" by just relying on the state store pattern underneath.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaron2 has this comment been integrated yet? If so can you resolve the conversation :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hasn't been resolved yet

@ItalyPaleAle
Copy link
Contributor

I second @artursouza's concerns that it may cause some issues with a variety of brokers. Artur mentioned Kafka. Two more examples:

  • Azure Service Bus requires consumers to acquire and constantly renew a lock, or the message will automatically be nack'd. Renewing locks can become expensive when we have a lot of messages that are delayed.
  • MQTT's relationship with ACK's is..... complex, to say the least.
  • Some brokers automatically delete messages that have been in the queue for longer than N days and have not been ack'd.

I also have a question about what happens if the consumer is scaled horizontally. Among all PubSub components we support right now, we observe a variety of behaviors, but many (not all) deliver all messages to all subscribers. When the app is scaled horizontally, then, how do we coordinate this?

I definitely do see the value in having delayed messages and it's a big pain point. However, I share the concerns with this proposed implementation's feasibility (and long term supportability across all brokers). Ideally, we should encourage users to rely on the native support for delayed messages that brokers (like Service Bus) offer whenever possible.

Alternatively, if we need a Dapr-specific implementation, I would consider something that uses a state store to temporarily persist messages until they're ready to be published.

@artursouza
Copy link
Member

I would also consider making the state store bound to a particular topic - appID combination to avoid contention between multiple apps.

@yaron2
Copy link
Member Author

yaron2 commented Dec 14, 2022

When the app is scaled horizontally, then, how do we coordinate this?

Coordination isn't needed as this would satisfy the at-least-once guarantee.

The concern about queue clogging is real and using a general purpose state store makes sense, I'll update the proposal for that section.

@yaron2 yaron2 dismissed daixiang0’s stale review December 14, 2022 21:10

Reworking feedback

@olitomlinson
Copy link

olitomlinson commented Feb 21, 2023

@yaron2

Personally, I would advocate utilising Dapr Workflows to handle the time-bound aspects of business logic, and not push that responsibility into the PubSub broker.

Pros

  • From an end-user perspective, this model is more flexible as you can have various strategies for how to delay processing. And have many strategies in play at once, even on a per-message basis if needed.

  • From a maintenance and reasoning perspective, it doesn't complicate any of the mechanics of the PubSub implementations.

Cons

  • Shifts burden onto the developer to apply some additional effort to introduce workflows. Counter : IMO this is a short term pain for a long term gain.

  • Latency penalty due to spinning up a Workflow instance per message.

image


Assuming that a DurableTimers equivalent make it into the Dapr embedded workflow API surface

Example

PubSub Subscriber starts a workflow instance for each message.

// Dapr subscription in [Topic] routes orders topic to this route
app.MapPost("/orders", [Topic("orderpubsub", "orders")] (Order order) => {
    await workflowClient.ScheduleNewWorkflowAsync(
        name: nameof(OrderProcessingWorkflow),
        instanceId: order.orderId,
        input: order);
    return Results.Ok(order);
});

Use a workflow to host the logic,then use a Timer to defer the work until the desired time.

class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
    public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
    {
        // Put the workflow to sleep for 72 hours because thats how the business wants to handle this message type
        DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
        await context.CreateTimer(dueTime, CancellationToken.None);

        // Notify the user that an order has come through
        await context.CallActivityAsync(
            nameof(NotifyActivity),
            new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}"));

       ....
    }
}

@kayvonkhosrowpour
Copy link

+1 for pubsub publishing with delay

@ItalyPaleAle
Copy link
Contributor

What if we implemented this on top of actors instead?

We could leverage the actor reminder subsystem to publish messages at a specific time. We'd also get "for free" the ability to publish messages with a delay.

@yaron2
Copy link
Member Author

yaron2 commented Mar 28, 2023

What if we implemented this on top of actors instead?

We could leverage the actor reminder subsystem to publish messages at a specific time. We'd also get "for free" the ability to publish messages with a delay.

That's an idea I've been looking at recently, but would have to defer this to reminders v2. We're planning a distributed scheduling solution in Dapr to serve higher level APIs like actor reminders, delayed pub/sub and an upcoming outbox pattern API

@ItalyPaleAle
Copy link
Contributor

ItalyPaleAle commented Mar 28, 2023

Yes, agree "reminders v1" would likely not offer the perf this would require.

However, it would be nice to see how this could look like implemented on top of an internal actor, like workflows, even if the implementation weren't possible until "reminders v2".

@olitomlinson
Copy link

@yaron2 Does a public forum exist for discussing the 'new distributed scheduling solution' ?

@ulf-melin-sveasolar
Copy link

What is happening to this?

@yaron2
Copy link
Member Author

yaron2 commented Mar 20, 2024

What is happening to this?

We are working on a distributed scheduling system for Dapr which among other consumers (Actors and Workflows) could also be used to support this delayed messaging feature. This proposal will be updated once we have the new system in place

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants