-
Notifications
You must be signed in to change notification settings - Fork 64
/
S04_Write_to_stream.cs
128 lines (98 loc) · 4.44 KB
/
S04_Write_to_stream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using System;
using System.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Bson;
using Streamstone;
using System.Diagnostics;
namespace Example.Scenarios
{
using System.Threading.Tasks;
public class S04_Write_to_stream : Scenario
{
public override async Task RunAsync()
{
await WriteToExistingOrCreateNewStream();
await WriteSequentiallyToExistingStream();
await WriteMultipleStreamsInParallel();
}
async Task WriteToExistingOrCreateNewStream()
{
var existent = await Stream.TryOpenAsync(Partition);
var stream = existent.Found
? existent.Stream
: new Stream(Partition);
Console.WriteLine("Writing to new stream in partition '{0}'", stream.Partition);
var result = await Stream.WriteAsync(stream,
Event(new InventoryItemCreated(Id, "iPhone6")),
Event(new InventoryItemCheckedIn(Id, 100)));
Console.WriteLine("Successfully written to new stream.\r\nEtag: {0}, Version: {1}",
result.Stream.ETag, result.Stream.Version);
}
async Task WriteSequentiallyToExistingStream()
{
var stream = await Stream.OpenAsync(Partition);
Console.WriteLine("Writing sequentially to existing stream in partition '{0}'", stream.Partition);
Console.WriteLine("Etag: {0}, Version: {1}", stream.ETag, stream.Version);
for (var i = 1; i <= 10; i++)
{
var result = await Stream.WriteAsync(stream,
Event(new InventoryItemCheckedIn(Id, i*100)));
Console.WriteLine("Successfully written event '{0}' under version '{1}'",
result.Events[0].Id, result.Events[0].Version);
Console.WriteLine("Etag: {0}, Version: {1}",
result.Stream.ETag, result.Stream.Version);
stream = result.Stream;
}
}
async Task WriteMultipleStreamsInParallel()
{
const int streamsToWrite = 5;
await Task.WhenAll(Enumerable.Range(1, streamsToWrite).Select(async streamIndex =>
{
var partition = new Partition(Partition.Table, $"WriteMultipleStreamsInParallel-{streamIndex}");
var existent = await Stream.TryOpenAsync(partition);
var stream = existent.Found
? existent.Stream
: new Stream(partition);
Console.WriteLine("Writing to new stream in partition '{0}'", partition);
var stopwatch = Stopwatch.StartNew();
for (var i = 1; i <= 5; i++)
{
var events = Enumerable.Range(1, 10)
.Select(_ => Event(new InventoryItemCheckedIn(partition.Key, i * 1000 + streamIndex)))
.ToArray();
var result = await Stream.WriteAsync(stream, events);
stream = result.Stream;
}
stopwatch.Stop();
Console.WriteLine("Finished writing 300 events to new stream in partition '{0}' in {1}ms", stream.Partition, stopwatch.ElapsedMilliseconds);
}));
}
static EventData Event(object e)
{
var id = Guid.NewGuid();
var properties = new
{
Id = id, // id that you specify for Event ctor is used only for duplicate event detection
Type = e.GetType().Name, // you can include any number of custom properties along with event
Data = JSON(e), // you're free to choose any name you like for data property
Bin = BSON(e) // and any storage format: binary, string, whatever (any EdmType)
};
return new EventData(EventId.From(id), EventProperties.From(properties));
}
static string JSON(object data)
{
return JsonConvert.SerializeObject(data);
}
static byte[] BSON(object data)
{
var stream = new System.IO.MemoryStream();
using (var writer = new BsonDataWriter(stream))
{
var serializer = new JsonSerializer();
serializer.Serialize(writer, data);
}
return stream.ToArray();
}
}
}