Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Code to prevent collision on Repository implementation #8

Open
alkampfergit opened this issue Jul 12, 2016 · 8 comments
Open

Add Code to prevent collision on Repository implementation #8

alkampfergit opened this issue Jul 12, 2016 · 8 comments

Comments

@alkampfergit
Copy link
Contributor

We already tested code on a private implementation of Repository to reduce ConcurrencyException. Using a ConcurrentDictinoary we can issue waits if another thread is using the same aggregate.

The idea is, if we have multi thread execution and three threads needs to issue a command on the same aggregate, if all three threads read the Aggregate at the same version, the first one that saves the aggregate will succeed but the other two will fail with a ConcurrencyException.

Before loading the Aggregate, the repository can check if other threads are working with the same Aggreagate and then sleep for a little while reducing the risk of ConcurrencyException.

This is even more useful when ICache will be implemented because avoiding exception will allow for the repository to immediately read from ram the new IMemento resulting from previous save.

@mauroservienti
Copy link

Even if I like this from the technical perspective I don't see it as a mandatory thing to have. Let me try to convince you @alkampfergit

If we think to Event Sourcing as a way to solve concurrency issues in high contention domains then the only thing we should do is to postpone concurrency handling at a later time, asynchronously. It should really behave like and append only and trying to serialize in any way access to the share aggregate resource can only make things worse.

On the hand what we, when designing our systems, should do is to correctly identify boundaries. If boundaries are correctly identified there is a high chance that a side effect is that we reduce contention and concurrency.

In all other scenarios I believe that handling ConcurrencyException and introducing a retry mechanism is good enough.

Thoughts?

@AGiorgetti
Copy link
Member

AGiorgetti commented Jul 13, 2016

If all the commands are unrelated to each other I don't see any problem in doing this, but let's consider this:
the starting situation is: Aggregate at Version 1.
Thread 1 -> issues a command to Aggregate (to be executed against Version 1 ? where do you get this information ? from a potentially stale read model ?)
Thread 2 -> issues a command to Aggregate (to be executed against Version 1?... etc etc...)
Thread 3 -> does the same.
Normal outcome is: 1 Thread wins, the other 2 receive a ConcurrencyException.

Now if we implement this feature we can totally avoid this situation and when we load up the aggregates to execute the commands we can wait a bit for the 'lock' imposed by the first thread to expire.

If we do not perform any 'version checking' between the current aggregate version (which is incremented at each command execution) and the 'expected version' we can add to each command header the things will work, but is this acceptable ?

It is if the commands you are issuing are not related to each other (and that's what @mauroservienti means talking about designing your contexts carefully, I think), but what if the commands 'overlaps' ?

I think this can be a very specific domain optimization.

@mauroservienti
Copy link

this works until you scale out, as you scale out the thread semaphore thing working at the process level becomes immediately useless.

but what if the commands 'overlaps' ?

Is there a design/context map bug?

@alkampfergit
Copy link
Contributor Author

alkampfergit commented Jul 13, 2016

Actually if you use the same aggregate from two threads, the second thread in NEventStore will always got a ConcurrencyException.

The problem is, Thread A and Thread B load aggregate X that is at a given state.

Thread A calls a method that change state raising event Z
Thread B calls a method that change state raising event K

Thread A saves the events in eventstream, but now thread B cannot save events in the stream, because those events were raised by the aggregate X when it is in a state that is not valid anymore, because the stream of events is changed and NEventStore reject the events with a ConcurrencyException.

I agree that this kind of situation "could" be handled by merges or asyncronously, but this is how NEventStore and Common Domain were designed, and this limits the complexity for guarantee aggregate consistency.

This check avoid only to waste time and when you have multiple process because you scale out, this optimization will simply not work, and you will got standard ConcurrencyException.

@mauroservienti
Copy link

What about timeouts? Introducing a lock like thing requires to add a lock timeout logic otherwise it is very easy to lock forever.

Before loading the Aggregate, the repository can check if other threads are working with the same Aggreagate and then sleep for a little while reducing the risk of ConcurrencyException.

Waiting for a certain amount of time will only add complexity, wouldn't it be much easier to let the user handle this sort of scenarios?

Building a synchronous in-memory queue using RX BlockingCollection is super trivial and a user well-knowing the use case can easily build a funnel that handles the multi-threaded incoming message pipe to a single threaded per aggregate handling.

@alkampfergit
Copy link
Contributor Author

alkampfergit commented Jul 13, 2016

The lock last for very small configurable time, if the timeout expires, the command continue executing the command , this is only a small optimization.

We have processes that does massive load of documents inside the system, thus generating an high amount of commands in the queue. Each time a Document is inserted, a process manager react resending various commands in the queue. Since the queue has multiple worker, it could easily happens that we have two or more worker that are working with the very same aggreate at the same time and there is little we can do about this without adding extra complexity.

As an example, a process manager send a command to container of the document, informing that another document was added to it, when we load 20.000 documents in the same container, we have basically all 10 workers working with the very same aggregate :).

Without the lock everything works without a single problem, but database is hammered with writes that generates concurency exception, thus we are wasting times.

This lock is not meant to guarantee ordering or has any meaning in business, is just avoiding unnecessary Concurrency Exception to happens given how NEventStore works internally, since NES does some work when a Concurrency Exception is thrown, and I want to avoid that work if I know upfront that command execution will fail. :)

@mauroservienti
Copy link

By queue you meant a queuing infrastructure? Isn't is much easier to handle this at the database/repository implementation level?

@alkampfergit
Copy link
Contributor Author

alkampfergit commented Jul 13, 2016

Yes, we have client that send messages with queue, then several workers use command handlers to execute command. The usual pattern of the command Handler is

Load an aggregate with Repository
Call some methods on the aggregate
Save the aggregate

if several commands arrives on the bus, regarding very same aggregate, several command handlers will execute concurrently, thus loading the same aggreate in memory, and NEventStore will throw concurrency exception during Save.

To avoid this, in the implementation of the Repository, before loading the aggregate, if another thread is already interacting with the same aggregate, the repository waits a little, avoiding a Concurrency Exception.

When you scale out, this optimization does not work across process, and you have basic behavior, intercept ConcurrencyException and retry to handle the command.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants