-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
105 lines (86 loc) · 3.06 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
using System.Buffers;
using System.Runtime.InteropServices;
using Nexus.DataModel;
using Nexus.Extensibility;
using Nexus.Remoting;
// args
if (args.Length < 2)
throw new Exception("No argument for address and/or port was specified.");
// get address
var address = args[0];
// get port
int port;
try
{
port = int.Parse(args[1]);
}
catch (Exception ex)
{
throw new Exception("The second command line argument must be a valid port number.", ex);
}
var communicator = new RemoteCommunicator(new DotnetDataSource(), address, port);
await communicator.RunAsync();
public class DotnetDataSource : SimpleDataSource
{
public override Task<CatalogRegistration[]> GetCatalogRegistrationsAsync(string path, CancellationToken cancellationToken)
{
if (path == "/")
return Task.FromResult(new CatalogRegistration[]
{
new CatalogRegistration("/A/B/C", "Test catalog /A/B/C.")
});
else
return Task.FromResult(new CatalogRegistration[0]);
}
public override Task<ResourceCatalog> GetCatalogAsync(string catalogId, CancellationToken cancellationToken)
{
if (catalogId == "/A/B/C")
{
var representation = new Representation(NexusDataType.FLOAT64, TimeSpan.FromSeconds(1));
var resource = new ResourceBuilder("resource1")
.WithUnit("°C")
.WithGroups("group1")
.AddRepresentation(representation)
.Build();
var catalog = new ResourceCatalogBuilder("/A/B/C")
.AddResource(resource)
.Build();
return Task.FromResult(catalog);
}
else
{
throw new Exception("Unknown catalog identifier.");
}
}
public override async Task ReadAsync(
DateTime begin,
DateTime end,
ReadRequest[] requests,
ReadDataHandler readData,
IProgress<double> progress,
CancellationToken cancellationToken)
{
var length = (int)((end - begin).Ticks / TimeSpan.FromSeconds(1).Ticks);
using var memoryOwner = MemoryPool<double>.Shared.Rent(length);
var temperatureMemory = memoryOwner.Memory.Slice(0, length);
await readData.Invoke("/SAMPLE/LOCAL/T1/1_s", begin, end, temperatureMemory, cancellationToken);
foreach (var request in requests)
{
Calculate();
/* this nested sync method is required because spans cannot be accessed in async methods */
void Calculate()
{
/* generate data */
var temperatureSpan = temperatureMemory.Span;
var resultBuffer = MemoryMarshal.Cast<byte, double>(request.Data.Span);
for (int i = 0; i < resultBuffer.Length; i++)
{
/* example: multiply by two */
resultBuffer[i] = temperatureSpan[i] * 2;
}
/* mark all data as valid */
request.Status.Span.Fill(1);
}
}
}
}