Skip to content

Commit

Permalink
Add system text json support for actor serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Erik O'Leary <[email protected]>
  • Loading branch information
onionhammer committed Aug 22, 2023
1 parent c99475b commit 088eee8
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 111 deletions.
11 changes: 10 additions & 1 deletion src/Dapr.Actors/Client/ActorProxyFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace Dapr.Actors.Client
using System;
using System.Net.Http;
using Dapr.Actors.Builder;
using Dapr.Actors.Communication;
using Dapr.Actors.Communication.Client;

/// <summary>
Expand Down Expand Up @@ -79,7 +80,15 @@ public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string
options ??= this.DefaultOptions;

var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
var remotingClient = new ActorRemotingClient(daprInteractor);

// provide a serializer if 'useJsonSerialization' is true and no serialization provider is provided.
IActorMessageBodySerializationProvider serializationProvider = null;
if (options.UseJsonSerialization)
{
serializationProvider = new ActorMessageBodyJsonSerializationProvider(options.JsonSerializerOptions);
}

var remotingClient = new ActorRemotingClient(daprInteractor, serializationProvider);
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
var actorProxy = proxyGenerator.CreateActorProxy();
actorProxy.Initialize(remotingClient, actorId, actorType, options);
Expand Down
5 changes: 5 additions & 0 deletions src/Dapr.Actors/Client/ActorProxyOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@ public JsonSerializerOptions JsonSerializerOptions
/// The timeout allowed for an actor request. Can be set to System.Threading.Timeout.InfiniteTimeSpan to disable any timeouts.
/// </summary>
public TimeSpan? RequestTimeout { get; set; } = null;

/// <summary>
/// Enable JSON serialization for actor proxy message serialization in both remoting and non-remoting invocations.
/// </summary>
public bool UseJsonSerialization { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Dapr.Actors.Communication
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using System.Xml;

/// <summary>
Expand Down Expand Up @@ -185,21 +186,21 @@ byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody act
return stream.ToArray();
}

IActorRequestMessageBody IActorRequestMessageBodySerializer.Deserialize(Stream stream)
ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
{
if (stream == null)
{
return null;
return default;
}

if (stream.Length == 0)
{
return null;
return default;
}

stream.Position = 0;
using var reader = this.CreateXmlDictionaryReader(stream);
return (TRequest)this.serializer.ReadObject(reader);
return new ValueTask<IActorRequestMessageBody>((TRequest)this.serializer.ReadObject(reader));
}

byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
Expand All @@ -217,11 +218,11 @@ byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody a
return stream.ToArray();
}

IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream messageBody)
ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
{
if (messageBody == null)
{
return null;
return default;
}

// TODO check performance
Expand All @@ -231,11 +232,11 @@ IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream

if (stream.Capacity == 0)
{
return null;
return default;
}

using var reader = this.CreateXmlDictionaryReader(stream);
return (TResponse)this.serializer.ReadObject(reader);
return new ValueTask<IActorResponseMessageBody>((TResponse)this.serializer.ReadObject(reader));
}

/// <summary>
Expand Down
100 changes: 100 additions & 0 deletions src/Dapr.Actors/Communication/ActorMessageBodyJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Dapr.Actors.Communication
{
internal class ActorMessageBodyJsonConverter<T> : JsonConverter<T>
{
private readonly List<Type> methodRequestParameterTypes;
private readonly List<Type> wrappedRequestMessageTypes;
private readonly Type wrapperMessageType;

public ActorMessageBodyJsonConverter(
List<Type> methodRequestParameterTypes,
List<Type> wrappedRequestMessageTypes = null
)
{
this.methodRequestParameterTypes = methodRequestParameterTypes;
this.wrappedRequestMessageTypes = wrappedRequestMessageTypes;

if (this.wrappedRequestMessageTypes != null && this.wrappedRequestMessageTypes.Count == 1)
{
this.wrapperMessageType = this.wrappedRequestMessageTypes[0];
}

// If T is of WrappedMessageBody, then get the 'Value' property.
if (typeof(T).IsAssignableFrom(typeof(WrappedMessageBody)))
{
}
}

public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
// Ensure start-of-object, then advance
if (reader.TokenType != JsonTokenType.StartObject) throw new JsonException();
reader.Read();

// Ensure property name, then advance
if (reader.TokenType != JsonTokenType.PropertyName || reader.GetString() != "value") throw new JsonException();
reader.Read();

// If the value is null, return null.
if (reader.TokenType == JsonTokenType.Null)
{
// Read the end object token.
reader.Read();
return default;
}

// If the value is an object, deserialize it to wrapper message type
if (this.wrapperMessageType != null)
{
var value = JsonSerializer.Deserialize(ref reader, this.wrapperMessageType, options);

// Construct a new WrappedMessageBody with the deserialized value.
var wrapper = new WrappedMessageBody()
{
Value = value,
};

// Read the end object token.
reader.Read();

// Coerce the type to T; required because WrappedMessageBody inherits from two separate interfaces, which
// cannot both be used as generic constraints
return (T)((object)wrapper);
}

return JsonSerializer.Deserialize<T>(ref reader, options);
}

public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
{
writer.WriteStartObject();
writer.WritePropertyName("value");

if (value is WrappedMessageBody body)
{
JsonSerializer.Serialize(writer, body.Value, body.Value.GetType(), options);
}
else
writer.WriteNullValue();
writer.WriteEndObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Actors.Communication
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using System.Xml;

/// <summary>
/// This is the implmentation for <see cref="IActorMessageBodySerializationProvider"/>used by remoting service and client during
/// request/response serialization . It uses request Wrapping and data contract for serialization.
/// </summary>
internal class ActorMessageBodyJsonSerializationProvider : IActorMessageBodySerializationProvider
{
public JsonSerializerOptions Options { get; }

/// <summary>
/// Initializes a new instance of the <see cref="ActorMessageBodyJsonSerializationProvider"/> class.
/// </summary>
public ActorMessageBodyJsonSerializationProvider(JsonSerializerOptions options)
{
Options = options;
}

/// <summary>
/// Creates a MessageFactory for Wrapped Message Json Remoting Types. This is used to create Remoting Request/Response objects.
/// </summary>
/// <returns>
/// <see cref="IActorMessageBodyFactory" /> that provides an instance of the factory for creating
/// remoting request and response message bodies.
/// </returns>
public IActorMessageBodyFactory CreateMessageBodyFactory()
{
return new WrappedRequestMessageFactory();
}

/// <summary>
/// Creates IActorRequestMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
/// </summary>
/// <param name="serviceInterfaceType">The remoted service interface.</param>
/// <param name="methodRequestParameterTypes">The union of parameter types of all of the methods of the specified interface.</param>
/// <param name="wrappedRequestMessageTypes">Wrapped Request Types for all Methods.</param>
/// <returns>
/// An instance of the <see cref="IActorRequestMessageBodySerializer" /> that can serialize the service
/// actor request message body to a messaging body for transferring over the transport.
/// </returns>
public IActorRequestMessageBodySerializer CreateRequestMessageBodySerializer(
Type serviceInterfaceType,
IEnumerable<Type> methodRequestParameterTypes,
IEnumerable<Type> wrappedRequestMessageTypes = null)
{
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodRequestParameterTypes, wrappedRequestMessageTypes);
}

/// <summary>
/// Creates IActorResponseMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
/// </summary>
/// <param name="serviceInterfaceType">The remoted service interface.</param>
/// <param name="methodReturnTypes">The return types of all of the methods of the specified interface.</param>
/// <param name="wrappedResponseMessageTypes">Wrapped Response Types for all remoting methods.</param>
/// <returns>
/// An instance of the <see cref="IActorResponseMessageBodySerializer" /> that can serialize the service
/// actor response message body to a messaging body for transferring over the transport.
/// </returns>
public IActorResponseMessageBodySerializer CreateResponseMessageBodySerializer(
Type serviceInterfaceType,
IEnumerable<Type> methodReturnTypes,
IEnumerable<Type> wrappedResponseMessageTypes = null)
{
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodReturnTypes, wrappedResponseMessageTypes);
}

/// <summary>
/// Default serializer for service remoting request and response message body that uses the
/// memory stream to create outgoing message buffers.
/// </summary>
private class MemoryStreamMessageBodySerializer<TRequest, TResponse> :
IActorRequestMessageBodySerializer,
IActorResponseMessageBodySerializer
where TRequest : IActorRequestMessageBody
where TResponse : IActorResponseMessageBody
{
private readonly JsonSerializerOptions serializerOptions;

public MemoryStreamMessageBodySerializer(
JsonSerializerOptions serializerOptions,
Type serviceInterfaceType,
IEnumerable<Type> methodRequestParameterTypes,
IEnumerable<Type> wrappedRequestMessageTypes = null)
{
var _methodRequestParameterTypes = new List<Type>(methodRequestParameterTypes);
var _wrappedRequestMessageTypes = new List<Type>(wrappedRequestMessageTypes);

this.serializerOptions = new(serializerOptions)
{
// Workaround since WrappedMessageBody creates an object
// with parameters as fields
IncludeFields = true,
};

this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TRequest>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TResponse>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
}

byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody actorRequestMessageBody)
{
if (actorRequestMessageBody == null)
{
return null;
}

using var stream = new MemoryStream();
using var writer = new Utf8JsonWriter(stream);
JsonSerializer.Serialize<object>(writer, actorRequestMessageBody, this.serializerOptions);
writer.Flush();

return stream.ToArray();
}

async ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
{
if (stream == null)
{
return default;
}

if (stream.Length == 0)
{
return default;
}

stream.Position = 0;
return await JsonSerializer.DeserializeAsync<TRequest>(stream, this.serializerOptions);
}

byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
{
if (actorResponseMessageBody == null)
{
return null;
}

using var stream = new MemoryStream();
using var writer = new Utf8JsonWriter(stream);
JsonSerializer.Serialize<object>(writer, actorResponseMessageBody, this.serializerOptions);
writer.Flush();

return stream.ToArray();
}

async ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
{
if (messageBody == null)
{
return null;
}

using var stream = new MemoryStream();
messageBody.CopyTo(stream);
stream.Position = 0;

if (stream.Capacity == 0)
{
return null;
}

return await JsonSerializer.DeserializeAsync<TResponse>(stream, this.serializerOptions);
}
}
}
}
Loading

0 comments on commit 088eee8

Please sign in to comment.