Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WiP - Durable functions #646

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions ProtoActor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HostedService", "benchmarks\HostedService\HostedService.csproj", "{7840C651-9352-4CB2-9152-4793EC219DE9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DurableFunctions", "examples\DurableFunctions\DurableFunctions.csproj", "{181B6946-85C5-4484-B14A-E001A4F9D5E6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.Durable", "src\Proto.Cluster.Durable\Proto.Cluster.Durable.csproj", "{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -728,6 +732,30 @@ Global
{7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x64.Build.0 = Release|Any CPU
{7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x86.ActiveCfg = Release|Any CPU
{7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x86.Build.0 = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x64.ActiveCfg = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x64.Build.0 = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x86.ActiveCfg = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x86.Build.0 = Debug|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|Any CPU.Build.0 = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x64.ActiveCfg = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x64.Build.0 = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x86.ActiveCfg = Release|Any CPU
{181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x86.Build.0 = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x64.ActiveCfg = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x64.Build.0 = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x86.ActiveCfg = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x86.Build.0 = Debug|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|Any CPU.Build.0 = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x64.ActiveCfg = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x64.Build.0 = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x86.ActiveCfg = Release|Any CPU
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -795,6 +823,8 @@ Global
{C8C3C2EE-083A-46B4-8F5A-0C0F442C14BE} = {222D5932-627D-406B-9FA5-B60B38FD3019}
{8B241043-F933-4C1F-AB57-BAE8B624F133} = {222D5932-627D-406B-9FA5-B60B38FD3019}
{7840C651-9352-4CB2-9152-4793EC219DE9} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F}
{181B6946-85C5-4484-B14A-E001A4F9D5E6} = {59DCCC96-DDAF-469F-9E8E-9BC733285082}
{8D11A769-19F5-4458-9A7A-A4B25A5EF92F} = {771514F1-12AE-4A26-89CB-2646D3EF7034}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C}
Expand Down
26 changes: 26 additions & 0 deletions examples/DurableFunctions/DurableFunctions.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Proto.Cluster.Consul\Proto.Cluster.Consul.csproj" />
<ProjectReference Include="..\..\src\Proto.Cluster.Durable\Proto.Cluster.Durable.csproj" />
<ProjectReference Include="..\..\src\Proto.Remote.GrpcCore\Proto.Remote.GrpcCore.csproj" />
<ProjectReference Include="..\..\src\Proto.Cluster.Identity.MongoDb\Proto.Cluster.Identity.MongoDb.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Grpc.Tools" Version="2.23.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0-preview.7.20364.11" />
<PackageReference Include="MongoDB.Driver" Version="2.11.4" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Serilog" Version="2.10.1-dev-01256" />
<PackageReference Include="Serilog.Extensions.Logging" Version="3.0.2-dev-10281" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0-dev-00839" />
</ItemGroup>


</Project>
99 changes: 99 additions & 0 deletions examples/DurableFunctions/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Durable;
using Proto.Cluster.Identity;
using Proto.Cluster.Identity.MongoDb;
using Proto.Remote.GrpcCore;
using Serilog;
using Serilog.Events;
using Log = Serilog.Log;

namespace DurableFunctions
{
class Program
{
static async Task Main(string[] args)
{
SetupLogger();

var db = GetMongo();
var pids = db.GetCollection<PidLookupEntity>("pids");

var identity = new IdentityStorageLookup(new MongoIdentityStorage("mycluster", pids));
var provider = new ConsulProvider(new ConsulProviderConfig());
var system = new ActorSystem()
.WithRemote(
GrpcCoreRemoteConfig
.BindToLocalhost()
)
.WithCluster(
ClusterConfig
.Setup("mycluster",provider,identity)
.WithClusterKind("MyFunc", Props.FromProducer(() => new MyFunction()))
.WithClusterKind("SomeActor",Props.FromProducer(() => new SomeActor()))
)
.WithDurableFunctions();

await system
.Cluster()
.StartMemberAsync();

await system.Cluster().RequestAsync<int>("foo", "MyFunc", 123, CancellationToken.None);

Console.ReadLine();
}

private static void SetupLogger()
{
Log.Logger = new LoggerConfiguration()
.WriteTo.Console(LogEventLevel.Information, "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}")
.CreateLogger();

var l = LoggerFactory.Create(l =>
l.AddSerilog()
);

Proto.Log.SetLoggerFactory(l);
}

private static IMongoDatabase GetMongo()
{
var connectionString = "mongodb://127.0.0.1:27017/ProtoMongo";
var url = MongoUrl.Create(connectionString);
var settings = MongoClientSettings.FromUrl(url);
var client = new MongoClient(settings);
var database = client.GetDatabase("DurableFunctions");
return database;
}
}

public class MyFunction : DurableFunction
{
protected override async Task Run(DurableContext context)
{
var x = await context.RequestAsync<int>("foo", "SomeActor", 222);
var y = await context.RequestAsync<int>("foo", "SomeActor", 333);
Console.WriteLine($"result {x * y}");
}
}

public class SomeActor : IActor
{
public Task ReceiveAsync(IContext context)
{
if (context.Message is int i)
{
Console.WriteLine($"got call for {i}");
context.Respond(i*2);
}

return Task.CompletedTask;
}
}
}
52 changes: 52 additions & 0 deletions src/Proto.Cluster.Durable/DurableContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// -----------------------------------------------------------------------
// <copyright file="DurableContext.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading.Tasks;

namespace Proto.Cluster.Durable
{
public class DurableContext
{
private readonly Cluster _cluster;
private readonly ClusterIdentity _identity;
public object Message { get; set; }

public DurableContext(Cluster cluster, ClusterIdentity identity)
{
_cluster = cluster;
_identity = identity;
}

public Task<T> WaitForExternalEvent<T>()
{
return null;
}

public Task CreateTimer()
{
return null;
}

public async Task<T> RequestAsync<T>(string identity, string kind, object message)
{
//send request to local orchestrator
//orchestrator saves request to DB

//await response from orchestrator
var target = new ClusterIdentity
{
Identity = identity,
Kind = kind,
};

var request = new DurableRequest(_identity, target, message);

var response = await _cluster.DurableRequestAsync(request);
var m = response.Message;
return (T) m;
}
}
}
26 changes: 26 additions & 0 deletions src/Proto.Cluster.Durable/DurableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// -----------------------------------------------------------------------
// <copyright file="DurableExtensions.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System.Threading.Tasks;

namespace Proto.Cluster.Durable
{
public static class DurableExtensions
{
public static async Task<DurableResponse> DurableRequestAsync(this Cluster self, DurableRequest message)
{
var d = self.System.Extensions.Get<DurablePlugin>();
var response = await d.DurableRequestAsync(message);
return response;
}

public static ActorSystem WithDurableFunctions(this ActorSystem system)
{
var p = new DurablePlugin(system.Cluster());
system.Extensions.Register(p);
return system;
}
}
}
39 changes: 39 additions & 0 deletions src/Proto.Cluster.Durable/DurableFunction.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// -----------------------------------------------------------------------
// <copyright file="DurableFunction.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Proto.Cluster.Durable
{
[PublicAPI]
public abstract class DurableFunction : IActor
{
private ClusterIdentity? _identity;
private DurableContext? _durableContext;

async Task IActor.ReceiveAsync(IContext context)
{
if (context.Message is ClusterInit init)
{
_identity = init.ClusterIdentity;
_durableContext = new DurableContext(init.Cluster, _identity);
}

if (_durableContext != null && context.Sender != null)
{
//if workflow not exists, save new workflow, also save message

context.Respond(123); //this should be a real message like "FunctionStarted" or something

_durableContext.Message = context.Message!; //use the saved message here
await Run(_durableContext);
}
}

protected abstract Task Run(DurableContext context);
}
}
34 changes: 34 additions & 0 deletions src/Proto.Cluster.Durable/DurablePlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// -----------------------------------------------------------------------
// <copyright file="DurablePlugin.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Proto.Extensions;

namespace Proto.Cluster.Durable
{
public class DurablePlugin : IActorSystemExtension<DurablePlugin>
{
private readonly Dictionary<DurableRequest, DurableResponse> _cache = new();
private readonly Cluster _cluster;

public DurablePlugin(Cluster cluster)
{
_cluster = cluster;
}

public async Task<DurableResponse> DurableRequestAsync(DurableRequest request)
{
if (_cache.TryGetValue(request, out var response)) return response;

var responseMessage = await _cluster.RequestAsync<object>(request.Target.Identity, request.Target.Kind, request.Message, CancellationToken.None);
response = new DurableResponse(responseMessage);
_cache.TryAdd(request, response);

return response;
}
}
}
9 changes: 9 additions & 0 deletions src/Proto.Cluster.Durable/DurableRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// -----------------------------------------------------------------------
// <copyright file="DurableRequest.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
namespace Proto.Cluster.Durable
{
public record DurableRequest(ClusterIdentity Sender, ClusterIdentity Target, object Message);
}
9 changes: 9 additions & 0 deletions src/Proto.Cluster.Durable/DurableResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// -----------------------------------------------------------------------
// <copyright file="DurableResponse.cs" company="Asynkron AB">
// Copyright (C) 2015-2020 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
namespace Proto.Cluster.Durable
{
public record DurableResponse(object Message);
}
20 changes: 20 additions & 0 deletions src/Proto.Cluster.Durable/Proto.Cluster.Durable.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net5.0;netstandard2.1</TargetFrameworks>
<LangVersion>9</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Proto.Cluster\Proto.Cluster.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Tools" Version="2.35.0-pre1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions src/Proto.Cluster.Durable/protos.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
syntax = "proto3";
package cluster;
option csharp_namespace = "Proto.Cluster";

import "Proto.Actor/Protos.proto";
Loading