Minimal Reactive / Async Streams Socket Implementation
- .NET 8.0 library
- connect: asynchronous
- send: synchronous
- receive: async enumerable or observable
- accept: async enumerable or observable
- simple and intuitive API
- dependencies: System.Reactive, Microsoft.Extensions.Logging
PM> Install-Package RxSockets
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Xunit;
using RxSockets;
interface IRxSocketServer : IAsyncDisposable
{
EndPoint LocalEndPoint { get; }
IAsyncEnumerable<IRxSocketClient> AcceptAllAsync { get; }
}
// Create a server using an available port on the local machine.
IRxSocketServer server = RxSocketServer.Create();
// Prepare to start accepting connections from clients.
server
.AcceptAllAsync
.ToObservableFromAsyncEnumerable()
.Subscribe(onNext: acceptClient =>
{
// After the server accepts a client connection,
// start receiving messages from the client and ...
acceptClient
.ReceiveAllAsync
.ToObservableFromAsyncEnumerable()
.ToStrings()
.Subscribe(onNext: message =>
{
// Echo each message received back to the client.
acceptClient.Send(message.ToByteArray());
});
});
interface IRxSocketClient : IAsyncDisposable
{
EndPoint RemoteEndPoint { get; }
bool Connected { get; }
int Send(ReadOnlySpan<byte> buffer);
IAsyncEnumerable<byte> ReceiveAllAsync { get; }
}
// Create a client connected to EndPoint of the server.
IRxSocketClient client = await server.LocalEndPoint.CreateRxSocketClientAsync();
// Send the message "Hello!" to the server,
// which the server will then echo back to the client.
client.Send("Hello!".ToByteArray());
// Receive the message from the server.
string message = await client.ReceiveAllAsync.ToStrings().FirstAsync();
Assert.Equal("Hello!", message);
await client.DisposeAsync();
await server.DisposeAsync();
IObservable<T> ToObservableFromAsyncEnumerable<T>(this IAsyncEnumerable<T> source)
The extension method ToObservableFromAsyncEnumerable()
may be used to create observables from the async enumerables IRxSocketClient.ReceiveAllAsync
and IRxSocketServer.AcceptAllAsync
.
Observable.Publish()[.RefCount() | .AutoConnect()]
may be used to support multiple simultaneous observers.
To communicate using strings (see example above), the following extension methods are provided:
byte[] ToByteArray(this string source);
byte[] ToByteArray(this IEnumerable<string> source)
IEnumerable<string> ToStrings(this IEnumerable<byte> source)
IAsyncEnumerable<string> ToStrings(this IAsyncEnumerable<byte> source)
IObservable<string> ToStrings(this IObservable<byte> source)
To communicate using byte arrays with a 4 byte BigEndian integer length prefix, the following extension methods are provided:
byte[] ToByteArrayWithLengthPrefix(this byte[] source)
IEnumerable<byte[]> ToArraysFromBytesWithLengthPrefix(this IEnumerable<byte> source)
IAsyncEnumerable<byte[]> ToArraysFromBytesWithLengthPrefix(this IAsyncEnumerable<byte> source)
IObservable<byte[]> ToArraysFromBytesWithLengthPrefix(this IObservable<byte> source)