Get the power of parallel processing with TPL Dataflow
Package | Version | Changelog |
---|---|---|
BiDaFlow | 0.2.0 | CHANGELOG.md |
BiDaFlow.AsyncEnumerable | 0.2.2 | CHANGELOG.md |
await Enumerable.Range(1, 100).AsSourceBlock()
// RunWith connects to a ITargetBlock and returns a single IDataflowBlock
.RunWith(
new BatchBlock<int>(5, new GroupingDataflowBlockOptions { BoundedCapacity = 5 })
// Chain makes a new IPropagatorBlock linking the upstream and downstream blocks
.Chain(new TransformBlock<int[], int>(
x => x.Sum(),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4,
MaxDegreeOfParallelism = 4,
SingleProducerConstrained = true,
}))
// ChainToTarget makes a new ITargetBlock linking the upstream and downstream blocks
.ChainToTarget(new ActionBlock<int>(x => Console.WriteLine(x)))
)
.Completion;
Do you want to make more customized block? You can use FluentDataflow.EncapsulateAsDataflowBlock
.
See the API documentation and get the power to make blocks freely.
IAsyncEnumerable
is the key interface of data flow with back pressure in .NET.
BiDaFlow.AsyncEnumerable empowers IAsyncEnumerable
to be able to process data parallelly in manner of Task Async.
await AsyncEnumerable.Range(1, 100)
// Process elements in parallel with IPropagatorBlock
.RunThroughDataflowBlock(() =>
new TransformBlock<int, int>(
x => x * 10,
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 6,
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
SingleProducerConstrained = true,
})
)
// The result is an IAsyncEnumerable
// Subsequent process can be written with System.Linq.Async
.ForEachAsync(x => Console.WriteLine(x));
- BiDaFlow.ReactiveStreams - integration with Reactive Streams, TPL Dataflow and AsyncEnumerable