Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally disable sequence numbers using optimistic locking (etags) #362

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"CosmosEventSourcing": {
"IsSequenceNumberingDisabled": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class JobsList : AggregateRoot

public string Username { get; private set; } = null!;

public JobsList(string name, string category, string username)
public JobsList(string name, string category, string username) : base(true)
{
if (string.IsNullOrWhiteSpace(name))
{
Expand Down Expand Up @@ -120,7 +120,7 @@ public static JobsList Replay(List<DomainEvent> domainEvents)
return jobList;
}

private JobsList()
private JobsList() : base(true)
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ public async ValueTask SaveAsync(JobsList jobList)
jobList.Id))
.ToList();

eventItems.Add(new JobsListEventItem(
jobList.AtomicEvent,
jobList.Id));

await _eventStore.PersistAsync(eventItems);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@
"Microsoft.Azure.CosmosRepository": "Debug",
"Microsoft.Azure.CosmosEventSourcing": "Debug"
}
},
"CosmosEventSourcing": {
"IsSequenceNumberingDisabled": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ public abstract class AggregateRoot : IAggregateRoot
private List<DomainEvent> _events = new();
private readonly List<DomainEvent> _newEvents = new();
private AtomicEvent? _atomicEvent;
private readonly bool _isSequenceNumberDisabled;

protected AggregateRoot()
{

}

protected AggregateRoot(bool isSequenceNumberDisabled)
{
_isSequenceNumberDisabled = isSequenceNumberDisabled;
}

/// <inheritdoc />
public IReadOnlyList<DomainEvent> NewEvents =>
Expand All @@ -33,20 +44,23 @@ public abstract class AggregateRoot : IAggregateRoot
/// <remarks>This should only be used when adding NEW events.</remarks>
protected void AddEvent(DomainEvent domainEvent)
{
if (_atomicEvent is null)
if (_isSequenceNumberDisabled is false)
{
CreateAtomicMarkerEvent();
}

if (!_newEvents.Any())
{
UpdateAtomicMarkerEvent();
if (_atomicEvent is null)
{
CreateAtomicMarkerEvent();
}

if (!_newEvents.Any())
{
UpdateAtomicMarkerEvent();
}
}

DomainEvent evt = domainEvent with
{
EventId = Guid.NewGuid().ToString(),
Sequence = _events.Count + 1,
Sequence = _isSequenceNumberDisabled ? -1 : _events.Count + 1,
OccuredUtc = DateTime.UtcNow
};

Expand All @@ -67,17 +81,29 @@ protected void Apply(List<DomainEvent> domainEvents)
throw new DomainEventsRequiredException(GetType());
}

var atomicEvent = domainEvents.SingleOrDefault(x => x is AtomicEvent) as AtomicEvent;
if (_isSequenceNumberDisabled)
{
var orderedEvents = domainEvents
.OrderBy(x => x.OccuredUtc)
.ToList();

orderedEvents.ForEach(Apply);
_events = orderedEvents;
}
else
{
var atomicEvent = domainEvents.SingleOrDefault(x => x is AtomicEvent) as AtomicEvent;

_atomicEvent = atomicEvent ?? throw new AtomicEventRequiredException(GetType());
domainEvents.Remove(atomicEvent);
_atomicEvent = atomicEvent ?? throw new AtomicEventRequiredException(GetType());
domainEvents.Remove(atomicEvent);

var orderedEvents = domainEvents
.OrderBy(x => x.Sequence)
.ToList();
var orderedEvents = domainEvents
.OrderBy(x => x.Sequence)
.ToList();

orderedEvents.ForEach(Apply);
_events = orderedEvents;
orderedEvents.ForEach(Apply);
_events = orderedEvents;
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
using Microsoft.Azure.CosmosEventSourcing.ChangeFeed;
using Microsoft.Azure.CosmosEventSourcing.Converters;
using Microsoft.Azure.CosmosEventSourcing.Events;
using Microsoft.Azure.CosmosEventSourcing.Options;
using Microsoft.Azure.CosmosEventSourcing.Stores;
using Microsoft.Azure.CosmosRepository.ChangeFeed.Providers;
using Microsoft.Azure.CosmosRepository.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Microsoft.Azure.CosmosEventSourcing.Extensions;
Expand All @@ -25,11 +28,18 @@ public static class ServiceCollectionExtensions
/// <returns>The instance of the <see cref="IServiceCollection"/></returns>
public static IServiceCollection AddCosmosEventSourcing(
this IServiceCollection services,
Action<ICosmosEventSourcingBuilder> eventSourcingBuilder)
Action<ICosmosEventSourcingBuilder> eventSourcingBuilder,
Action<CosmosEventSourcingOptions>? setupAction = null)
{
DefaultCosmosEventSourcingBuilder builder = new(services);
DomainEventConverter.ConvertableTypes.Add(typeof(AtomicEvent));
eventSourcingBuilder.Invoke(builder);

services.AddOptions<CosmosEventSourcingOptions>()
.Configure<IConfiguration>(
(settings, configuration) =>
configuration.GetSection(CosmosEventSourcingOptions.SectionName).Bind(settings));

services.AddScoped<IContextService, DefaultContextService>();
services.AddScoped(typeof(IEventStore<>), typeof(DefaultEventStore<>));
services.AddScoped(typeof(IWriteOnlyEventStore<>), typeof(DefaultEventStore<>));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) David Pine. All rights reserved.
// Licensed under the MIT License.

namespace Microsoft.Azure.CosmosEventSourcing.Options;

public class CosmosEventSourcingOptions
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need triple slash for these public APIs.

{
public const string SectionName = "CosmosEventSourcing";

public bool IsSequenceNumberingDisabled { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@

using System.Runtime.CompilerServices.Context;
using Microsoft.Azure.CosmosEventSourcing.Items;
using Microsoft.Azure.CosmosEventSourcing.Options;
using Microsoft.Azure.CosmosRepository;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.CosmosEventSourcing.Stores;

internal partial class DefaultEventStore<TEventItem>(
IBatchRepository<TEventItem> batchRepository,
IReadOnlyRepository<TEventItem> readOnlyRepository,
IContextService contextService) :
IContextService contextService,
IOptionsMonitor<CosmosEventSourcingOptions> optionsMonitor) :
IEventStore<TEventItem> where TEventItem : EventItem
{
private readonly IOptionsMonitor<CosmosEventSourcingOptions> _optionsMonitor = optionsMonitor;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field isn't needed, you can access the optionsMonitor anywhere within the partial class. These are primary constructors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh that makes sense now, I'll update this cheers!

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ public async ValueTask PersistAsync(
return;
}

if (eventItems.Count(x => x.EventName is nameof(AtomicEvent)) is not 1)
if(_optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(_optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)
if (optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)

{
throw new AtomicEventRequiredException();
if (eventItems.Count(x => x.EventName is nameof(AtomicEvent)) is not 1)
{
throw new AtomicEventRequiredException();
}
}

await batchRepository.UpdateAsBatchAsync(
Expand Down Expand Up @@ -78,7 +81,7 @@ private static IEnumerable<TEventItem> SetCorrelationId(
return items;
}

private static IEnumerable<TEventItem> BuildEvents(
private IEnumerable<TEventItem> BuildEvents(
IAggregateRoot aggregateRoot,
string partitionKey)
{
Expand All @@ -90,10 +93,13 @@ private static IEnumerable<TEventItem> BuildEvents(
partitionKey) as TEventItem)
.ToList();

events.Add(Activator.CreateInstance(
typeof(TEventItem),
aggregateRoot.AtomicEvent,
partitionKey) as TEventItem);
if(_optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(_optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)
if (optionsMonitor.CurrentValue.IsSequenceNumberingDisabled is false)

{
events.Add(Activator.CreateInstance(
typeof(TEventItem),
aggregateRoot.AtomicEvent,
partitionKey) as TEventItem);
}

return events.Any(x => x is null)
? throw new InvalidOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,32 @@ private TestAggregate()
}
}

private class TestAggregateNoSequencing : AggregateRoot
{
public int ReplayedEvents { get; private set; }

protected override void Apply(DomainEvent domainEvent)
{
switch (domainEvent)
{
case ReplayableEvent:
ReplayedEvents++;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any race condition concerns with multiple threads calling apply at the same time, and reads of the ReplayedEvents?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there will be, kind of a long story to this change, I plan to update the docs alongside this as well. But in short, the sequence number and the strong consistency provided by the atomic event is great in a single region scenario.

However, when we have been assessing the ability to enable multi region writes where traffic would be load balanced across two regions both writing to there local cosmos db data store, we could not guarantee that this sequence number remains in sync and in some use cases for event sourcing it doesn't really matter. The plan here is to handle simultaneous writes, as long as that event is valid at the point in time we are going to let it go ahead.

This diagram is covering our use case, but I am still fleshing out all the details of whether this is 100% suitable via some POCs.

Screenshot 2023-11-02 at 18 05 29

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with it if you are, let's :shipit:

break;
}
}

public static TestAggregateNoSequencing Replay(List<DomainEvent> domainEvents)
{
TestAggregateNoSequencing a = new();
a.Apply(domainEvents);
return a;
}

private TestAggregateNoSequencing() : base(isSequenceNumberDisabled: true)
{
}
}

private class TestAggregateRootMapper : IAggregateRootMapper<TestAggregate, ReplayableEventItem>
{
public IEnumerable<ReplayableEventItem> MapFrom(TestAggregate aggregateRoot) => throw new NotImplementedException();
Expand All @@ -71,67 +97,19 @@ public TestAggregate MapTo(IEnumerable<ReplayableEventItem> domainEvents) =>
TestAggregate.Replay(domainEvents.Select(x => x.DomainEvent).ToList());
}

private class AggregateWithNoReplayMethod : AggregateRoot
{
protected override void Apply(DomainEvent domainEvent) => throw new NotImplementedException();
}

[Fact]
public async Task ReadAggregateAsync_AggregateWithReplayMethod_ReplaysEvents()
private class TestAggregateRootNoSequencingMapper : IAggregateRootMapper<TestAggregateNoSequencing, ReplayableEventItem>
{
//Arrange
IEventStore<ReplayableEventItem> sut = _autoMocker.CreateInstance<DefaultEventStore<ReplayableEventItem>>();
public IEnumerable<ReplayableEventItem> MapFrom(
TestAggregateNoSequencing aggregateRoot) => throw new NotImplementedException();

Mock<IReadOnlyRepository<ReplayableEventItem>> repository = _autoMocker.GetMock<IReadOnlyRepository<ReplayableEventItem>>();

ReplayableEventItem atomicEvent = new(new AtomicEvent(Guid.Empty.ToString(), "etag"), "A");
atomicEvent.SetPrivatePropertyValue(nameof(FullItem.Etag), Guid.NewGuid().ToString());

List<ReplayableEventItem> events = new()
{
new ReplayableEventItem(new ReplayableEvent(), "A"),
atomicEvent,
};

repository
.Setup(o =>
o.GetAsync(x => x.PartitionKey == "A", default))
.ReturnsAsync(events);

//Act
TestAggregate a = await sut.ReadAggregateAsync<TestAggregate>("A");

//Assert
a.ReplayedEvents.Should().Be(1);
public TestAggregateNoSequencing MapTo(
IEnumerable<ReplayableEventItem> domainEvents) =>
TestAggregateNoSequencing.Replay(domainEvents.Select(x => x.DomainEvent).ToList());
}

[Fact]
public async Task ReadAggregateAsync_AggregateMapper_MapsAggregateCorrectly()
private class AggregateWithNoReplayMethod : AggregateRoot
{
//Arrange
IEventStore<ReplayableEventItem> sut = _autoMocker.CreateInstance<DefaultEventStore<ReplayableEventItem>>();

Mock<IReadOnlyRepository<ReplayableEventItem>> repository = _autoMocker.GetMock<IReadOnlyRepository<ReplayableEventItem>>();

ReplayableEventItem atomicEvent = new(new AtomicEvent(Guid.Empty.ToString(), "etag"), "A");
atomicEvent.SetPrivatePropertyValue(nameof(FullItem.Etag), Guid.NewGuid().ToString());

List<ReplayableEventItem> events = new()
{
new ReplayableEventItem(new ReplayableEvent(), "A"),
atomicEvent
};

repository
.Setup(o =>
o.GetAsync(x => x.PartitionKey == "A", default))
.ReturnsAsync(events);

//Act
TestAggregate a = await sut.ReadAggregateAsync("A", new TestAggregateRootMapper());

//Assert
a.ReplayedEvents.Should().Be(1);
protected override void Apply(DomainEvent domainEvent) => throw new NotImplementedException();
}

[Fact]
Expand Down
Loading