From a566c5f2323fba982507f4a1436b212246b7104d Mon Sep 17 00:00:00 2001 From: "Kasper B. Graversen" Date: Tue, 27 Feb 2024 00:00:41 +0100 Subject: [PATCH] describe various integration patterns --- README.md | 193 ++++++++++++++++-- .../AdoSingletonStepTests.cs | 2 +- .../MicroWorkflow.Tests/DocumentationTests.cs | 103 ++++++++++ src/Product/MicroWorkflow/Step.cs | 2 +- 4 files changed, 281 insertions(+), 19 deletions(-) create mode 100644 src/Demos/MicroWorkflow.Tests/DocumentationTests.cs diff --git a/README.md b/README.md index 0cf5f81..ff9fb4f 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,20 @@ # MicroWorkflow .net [![Stats](https://img.shields.io/badge/Code_lines-1,7_K-ff69b4.svg)]() -[![Stats](https://img.shields.io/badge/Test_lines-1,1_K-69ffb4.svg)]() -[![Stats](https://img.shields.io/badge/Doc_lines-453-ffb469.svg)]() +[![Stats](https://img.shields.io/badge/Test_lines-1,2_K-69ffb4.svg)]() +[![Stats](https://img.shields.io/badge/Doc_lines-594-ffb469.svg)]() +Micro Workflow is a very fast, small, embedable and distributed workflow system primarily for .Net developers. -# 0. +The code base is so small every one can read and understand the inner workings if necesarry. + +# 1. Why use Micro Workflow + +You should consider using Micro Workflow due to one or more of the following reason + +when you have a need for a queue, scheduling of code to execute, or a business process that needs to be robus. We provide many examples in ["integration patterns"](https://github.com/kbilsted/MicroWorkflow.net/tree/feature/doc?tab=readme-ov-file#5-integration-patterns) -# 1. Design goals **Simplicity** * We model only the steps in a workflow, *not* the transitions between them @@ -18,15 +24,19 @@ **We use C# all the way** * We don't want to invent a new language - we love C#! -* Workflow code is *readable*, *debugable*, and *testable* - like the rest of your code base -* You can use existing best practices for logging, IOC containers etc. -* Workflow code is *easy to commit and merge* use your existing *branching strategies* +* Workflow code is *readable*, *debugable*, and *testable* - like the rest of your code base. +* You can use existing best practices for logging, IOC containers etc. of your choice +* Since Workflow code is just C# it is *easy to commit and merge* use your existing *branching strategies* * You *do not* need a special graphical editor for specifying flows **The datamodel is simple - just three DB tables** * If things break in production, it is easy for you to figure out what has happened and remedy the problem * You can reason about the consequences of versioning the step implementations vs. simply change the existing flows +**Distributed mindset** +* Supports *Fail-over setup* To improve up-time applications/integrations are often running on multiple servers at the same time. This is a common scenario is supported with no special setup. +* Supports incremental deployments across more instances. When deploying multiple instances, the roll-out is typical gradual. Hence we support that steps may be added that is only known to a sub-set of the running workflows. + **Scalable** * You can add more workers in the workflow engine (vertical scaling) * You can add more servers each running a workflow engine (horizontal scaling) @@ -54,7 +64,7 @@ This is how you control ordering of events. There are no restrictions on the names of steps, but we found using a scheme similar to REST api's is beneficial. Hence we recommend you to use `{version}/{business domain}/{workflow name}/{workflow step}`. -``` +```C# [StepName(Name)] class FetchData : IStepImplementation { @@ -62,9 +72,12 @@ class FetchData : IStepImplementation public async Task ExecuteAsync(Step step) { - ... + var url = step.State; + var result = await new HttpClient().GetAsync(url); + var content = result.Content.ReadAsStringAsync(); + return step.Done() - .With(new Step(AnalyzeWords.Name, state-for-step)); + .With(new Step(AnalyzeWords.Name, content)); } } @@ -81,19 +94,28 @@ class AnalyzeWords : IStepImplementation } ``` +And then to start everyting we instantiate a workflow engine and add concrete steps + +```C# +var engine = new WorkflowEngine(...); +engine.Data.Add(new Step(FetchData.Name, "https://www.acme.company/rss")); +``` + + Below are some more elaborate exaples. -### Simple console demo + +**Simple console demo** A fully working C# example in one file: https://github.com/kbilsted/MicroWorkflow.net/blob/master/src/Demos/ConsoleDemo/Program.cs -### Webapi demo +**Webapi demo** Now that you have understood the basics, lets make an example using a real IOC container and a database see https://github.com/kbilsted/MicroWorkflow.net/tree/master/src/Demos/WebApiDemo -### IOC container +**IOC container** You can use any IOC container that supports named instances. We use Autofac. For more information see https://github.com/kbilsted/MicroWorkflow.net/tree/master/src/Product/MicroWorkflow.Ioc.Autofac @@ -104,6 +126,7 @@ You likely want to persist workflows in a database. We currently use Microsoft S + # 4. Core concepts in Micro Workflow The model revolves around the notion of a *step*. A step is in traditional workfow litterature referred to as an activity. Where activities live in a workflow. The workflow has identity and state and so forth. @@ -141,8 +164,144 @@ Operations you can do on steps +# 5. Integration patterns + +We discuss some of the typical integration patterns between services + + +## A queue with unlimited retries and no strict FIFO ordering + +A queue implementation where elements are placed in a persistent storage and processed one by one. In case of an error, the element is retried later whilst other elements are processed. This queue is mostly FIFO (first in first out), but due to the nature of the retry, it is not guaranteed. + +This implementation is also known as an "outbox pattern". + +The Micro Workflow uses a dynamic number of workers for execution so scale up easily. + +The implementation roughly + +```C# +[StepName(Name)] +class SendMemberEnrollmentToAcmeCompany(HttpClient client) : IStepImplementation +{ + public const string Name = "v1/Send-member-enrollment-to-Acme-company"; + + public async Task ExecuteAsync(Step step) + { + var result = await client.PostAsync("...", null); + result.EnsureSuccessStatusCode(); + return step.Done(); + } +} +``` + + +## A queue with limited and intelligent retries and no strict FIFO ordering + +Similar to the above example except + +We retry only 5 times, and we retry only if the receiving service fails with a http 500 range. A http 400 range signals the payload is wrong and thus we fail the job. + + +```C# +[StepName(Name)] +class SendMemberEnrollmentToAcmeCompanyLimitedRetry(HttpClient client) : IStepImplementation +{ + public const string Name = "v1/Send-member-enrollment-to-Acme-company-limited-retry"; + + public async Task ExecuteAsync(Step step) + { + if (step.ExecutionCount >= 5) + return step.Fail("too many retries"); + + var result = await client.PostAsync("...", null); + + switch ((int)result.StatusCode) + { + case >= 200 and < 300: + return step.Done(); + + case >= 400 and < 500: + return step.Fail("Wrong payload " + result.ToString()); + + case >= 500: + return step.Rerun(description: $"Upstream error {result}"); + + default: throw new NotImplementedException(); + } + } +} +``` + + +## A queue that may only perform operations in a limited time window + +Some integrations are prohibited from running 24/7. For examples systems running nightly batch processing. Similar, integration with customers are sometimes implemeted such that the customer is not disturbed in the middle of the night with notifications but only within "reasonable hours" e.g between 7-20. + +```C# +[StepName(Name)] +class SendMemberEnrollmentToAcmeCompanyLimitedTimewindow() : IStepImplementation +{ + public const string Name = "v1/Send-member-enrollment-to-Acme-company-limited-from-0700-to-2000"; + + public async Task ExecuteAsync(Step step) + { + // ensure window of 0700 - 2000 + var now = DateTime.Now; + if (now.Hour < 7) + return step.Rerun(scheduleTime: now.Date.AddHours(7)); + + if (now.Hour >= 20) + return step.Rerun(scheduleTime: now.Date.AddDays(1).AddHours(7)); + + // ... do stuff + + return step.Done(); + } +} +``` + + +## A scheduled task Once every hour tp fetch latest data from source + +For this scenario to work we run the same code block over and over again with a set interval. We utilize that a step may defined as `singleton` meaning that it can only exist once in the ready queue. Thus we cannot +accidently add the step twice (i.e. in a distributed environment). We use `AddStepIfNotExists()` but could have used `Add()` with a `try..catch`. + +```C# +public void ScheduleDataFetch(WorkflowEngine engine) +{ + var step = new Step(ScheduledFetchDataOnceAnHour.Name) + { + Singleton = true, + ScheduleTime = DateTime.Now.Date.AddHours(DateTime.Now.Hour) + }; + + engine.Data.AddStepIfNotExists(step, new SearchModel(Name: ScheduledFetchDataOnceAnHour.Name)); +} +``` + +And for fun, we fail the step if it has not been created correctly, to ensure we don't mess up. + +```C# +[StepName(Name)] +class ScheduledFetchDataOnceAnHour() : IStepImplementation +{ + public const string Name = "v1/fetch-data-from-acme-once-an-hour"; + + public async Task ExecuteAsync(Step step) + { + if (!step.Singleton) + throw new FailCurrentStepException("Must be a singleton step!"); + + // ... fetch data + + return step.Rerun(scheduleTime: step.ExecutionStartTime!.Value.AddHours(1)); + } +} +``` + + -# 5. Performance +# 6. Performance Simplicify is the focus of the code base. Performance is simply a side-effect of keeping things simple. @@ -154,7 +313,7 @@ You can take outset in some simple test scenarios at https://github.com/kbilsted -# 6. Flow versioning +# 7. Flow versioning Since each step may be regarded as being part of a flow, or as a single independent step, there is no notion of versions. However, you can use a version number in steps (similar to using version in REST api's). This enable you to create a new version with new steps that has a different implementation to the old. @@ -163,7 +322,7 @@ If all steps need to execute on the new code, simply use multiple step names for -# 7. Retries and ordering +# 8. Retries and ordering The automatic retry of a step in case of a failure is key feature. You can control ordering to some degree by putting longer and longer delays when retrying a failing step. This technique is sometimes called exponential back-off, since the time between retries exponentially increase to ensure throughput of succesful jobs. The default retry delay is calculated as `delay = retries^3 seconds`. If you want to stop retrying either return a `step.Fail()` or `throw FailCurrentStepException`. @@ -173,7 +332,7 @@ Step execution is only orderes by an earliest execution time. If you need to con -# 8. Micro Workflow and related concepts +# 9. Micro Workflow and related concepts Another way to gain conceptual insights into the framework, we explain why Micro Workflow is a good implementation fit to many concepts. diff --git a/src/Demos/MicroWorkflow.Tests/AdoSingletonStepTests.cs b/src/Demos/MicroWorkflow.Tests/AdoSingletonStepTests.cs index 19fa362..65c05c5 100644 --- a/src/Demos/MicroWorkflow.Tests/AdoSingletonStepTests.cs +++ b/src/Demos/MicroWorkflow.Tests/AdoSingletonStepTests.cs @@ -27,7 +27,7 @@ public void When_creating_a_singleton_Then_it_is_created() helper.StepHandlers = [Handle(name, step => { stepResult = $"hello"; stepResultIsSingleton = step.Singleton; - return ExecutionResult.Done(); + return step.Done(); })]; helper.StopWhenNoWork().BuildAndStart(); diff --git a/src/Demos/MicroWorkflow.Tests/DocumentationTests.cs b/src/Demos/MicroWorkflow.Tests/DocumentationTests.cs new file mode 100644 index 0000000..1f5c8d7 --- /dev/null +++ b/src/Demos/MicroWorkflow.Tests/DocumentationTests.cs @@ -0,0 +1,103 @@ +namespace MicroWorkflow; + +/// +/// not real tests just a place for code used in the readme.md +/// +class DocumentationTests +{ + [StepName(Name)] + class SendMemberEnrollmentToAcmeCompany(HttpClient client) : IStepImplementation + { + public const string Name = "v1/send-member-enrollment-to-acme-company"; + + public async Task ExecuteAsync(Step step) + { + var result = await client.PostAsync("...", null); + result.EnsureSuccessStatusCode(); + return step.Done(); + } + } + + [StepName(Name)] + class SendMemberEnrollmentToAcmeCompanyLimitedRetry(HttpClient client) : IStepImplementation + { + public const string Name = "v1/send-member-enrollment-to-acme-company-limited-retry"; + + public async Task ExecuteAsync(Step step) + { + if (step.ExecutionCount >= 5) + return step.Fail("too many retries"); + + var result = await client.PostAsync("...", null); + + switch ((int)result.StatusCode) + { + case >= 200 and < 300: + return step.Done(); + + case >= 400 and < 500: + return step.Fail("Wrong payload " + result.ToString()); + + case >= 500: + return step.Rerun(description: $"Upstream error {result}"); + + default: throw new NotImplementedException(); + } + } + } + + + + [StepName(Name)] + class SendMemberEnrollmentToAcmeCompanyLimitedTimewindow() : IStepImplementation + { + public const string Name = "v1/send-member-enrollment-to-acme-company-limited-from-0700-to-2000"; + + public async Task ExecuteAsync(Step step) + { + // ensure window of 0700 - 2000 + var now = DateTime.Now; + if (now.Hour < 7) + return step.Rerun(scheduleTime: now.Date.AddHours(7)); + + if (now.Hour >= 20) + return step.Rerun(scheduleTime: now.Date.AddDays(1).AddHours(7)); + + // ... stuff + + return step.Done(); + } + } + + + [StepName(Name)] + class ScheduledFetchDataOnceAnHour() : IStepImplementation + { + public const string Name = "v1/fetch-data-from-acme-once-an-hour"; + + public async Task ExecuteAsync(Step step) + { + if (!step.Singleton) + throw new FailCurrentStepException("Must be a singleton step!"); + + // ... fetch data + + return step.Rerun(scheduleTime: step.ExecutionStartTime!.Value.AddHours(1)); + } + } + + + class AddScheduler + { + public void ScheduleDataFetch(WorkflowEngine engine) + { + var step = new Step(ScheduledFetchDataOnceAnHour.Name) + { + Singleton = true, + ScheduleTime = DateTime.Now.Date.AddHours(DateTime.Now.Hour) + }; + + engine.Data.AddStepIfNotExists(step, new SearchModel(Name: ScheduledFetchDataOnceAnHour.Name)); + } + } +} diff --git a/src/Product/MicroWorkflow/Step.cs b/src/Product/MicroWorkflow/Step.cs index b1f1af4..cca1571 100644 --- a/src/Product/MicroWorkflow/Step.cs +++ b/src/Product/MicroWorkflow/Step.cs @@ -39,7 +39,7 @@ public class Step /// the arguments for an activation as it is formatted and persisted in the persistencelayer public string? ActivationArgs { get; set; } - /// The time when the latest execution took place + /// During a step implementation, this is the time when the engine started the execution. When queried outside this scope it may not yet have been executed (then it is null) or in case of a step that is being rerun, it is the last execution time. public DateTime? ExecutionStartTime { get; set; } /// The elapsed time of the latest execution