Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Support for polymorphic ADTs as protocol between two Streamlets #203

Open
wjglerum opened this issue Mar 29, 2020 · 1 comment
Open

Support for polymorphic ADTs as protocol between two Streamlets #203

wjglerum opened this issue Mar 29, 2020 · 1 comment
Labels
kind/bug Something isn't working. kind/enhancement New feature or request. kind/maintenance Refactoring task or improving code health status.

Comments

@wjglerum
Copy link

We would like to use a polymorphic ADT as the message protocol between two Streamlets. Let's say for example I would like to design a SubscriptionProtocol with the following commands:

  • SubscribeCmd
  • UnsubscribeCmd

And I would like to use this as the protocol between two Streamlets with Avro. We can tune avrohugger in our build.sbt to generate ADTs from Avro IDL files. See below with the following example protocol:

@namespace("subscriptions.avro")
protocol SubscriptionProtocol {

  record SubscribeCmd {
    string subscriberId;
    string topicId;
  }

  record UnsubscribeCmd {
    string subscriptionId;
    string subscriberId;
  }
}

And the following sbt configuration for our datamodel module:

lazy val datamodel = appModule("datamodel")
  .enablePlugins(CloudflowLibraryPlugin)
  .settings(
    commonSettings,
    avroScalaSpecificCustomTypes in Compile :=
      avrohugger.format.SpecificRecord.defaultTypes.copy(protocol = avrohugger.types.ScalaADT),
    (sourceGenerators in Compile) += (avroScalaGenerateSpecific in Test).taskValue
  )

This will generate the following code for us (note: simplified)

sealed trait extends org.apache.avro.specific.SpecificRecordBase with Product with Serializable
final case class SubscribeCmd(var subscriberId: String, var topicId: String) extends org.apache.avro.specific.SpecificRecordBase with SubscriptionProtocol { ... }
final case class UnsubscribeCmd(var subscriptionId: String, var subscriberId: String) extends org.apache.avro.specific.SpecificRecordBase with SubscriptionProtocol { ... }

However when we use this as the protocol in our Inlets and Outlets with our Streamlets it doesn't work:

class ExampleStreamlet extends AkkaServerStreamlet {
  val out = AvroOutlet[SubscriptionProtocol]("out")
  final override val shape = StreamletShape.withOutlets(out)

  final override def createLogic = HttpServerLogic.default(this, out)
}
Streamlet 'com.example.ExampleStreamlet' could not be introspected. Its descriptor method threw an exception: null

So I was stuck on this approach.

Alternative approach

Next I took another approach where I would wrap different command inside a generic event. Which looks something like this:

@namespace("subscriptions.avro")
protocol SubscriptionProtocolV2 {

  enum EventType {
    SUBSCRIBE, UNSUBSCRIBE
  }

  record SubscribeCmdV2 {
    string subscriberId;
    string topicId;
  }

  record UnsubscribeCmdV2 {
    string subscriptionId;
    string subscriberId;
  }

  record SubscribeEvent {
    EventType eventType;
    union { SubscribeCmdV2, UnsubscribeCmdV2 } payload;
  }

This also fails to compile due to a limitiaton in the avrohugger plugin, as it can't generate a union with different records, only nullable fielts. Also support julianpeeters/avrohugger#116 and below:

[error] (datamodel / Compile / avroScalaGenerateSpecific) Unions beyond nullable fields are not supported

We can fix this by using a GenericRecordBase, however the AvroInlets and AvroOutlet require a protocol which extends SpecificRecordBase. So we are stuck here too.

Workaround

We now use a temporary workaround by defining option fields for each command:

  record SubscribeEvent {
    EventType eventType;
    union { null, SubscribeCmdV2 } subscribeCmd;
    union { null, UnsubscribeCmdV2 } unsubscribeCmd;
  }

This generates the following Scala code:

object EventType extends Enumeration {
  type EventType = Value
  val SUBSCRIBE, UNSUBSCRIBE = Value
}
case class SubscribeCmdV2(subscriberId: String, topicId: String)
case class UnsubscribeCmdV2(subscriptionId: String, subscriberId: String)
case class SubscribeEvent(eventType: EventType.Value, subscribeCmd: Option[SubscribeCmdV2], unsubscribeCmd: Option[UnsubscribeCmdV2])

This works, but isn't pretty and not really extensible when we want to add more commands to this event.

Question

How could we support polymorphic ADTs in Cloudflow? Thanks!

Also see the discussion on [Gitter])https://gitter.im/lightbend/cloudflow?at=5e7ba8efaf5fed7748602ca8)

@RayRoestenburg RayRoestenburg added the kind/enhancement New feature or request. label Mar 31, 2020
@RayRoestenburg RayRoestenburg added kind/bug Something isn't working. kind/maintenance Refactoring task or improving code health status. labels Sep 24, 2020
@SemanticBeeng
Copy link

SemanticBeeng commented Jan 1, 2022

I relate to this request because the business logic should speak the ubiquitous language of the domain model and not Avro.
Here's my take on the potential solution:

AvroInlet does indeed depend on SpecificRecordBase
image

But cloudflow itself does not.
This can be seen looking around CodecInlet and Codec
image

image

image

and see how the streamlets call Codec
image

So, there is no cloudflow direct dependency on the SpecificRecordBase

This allows us to implement avro <-> Scala serde using GenericRecords.

This article has a very nice approach using avro-hugger, avro4s and shapeless, Coproducts and polymorphic functions.
https://bitrock.it/blog/polymorphic-messages-in-kafka-streams.html

Avro doesn’t support inheritance between records, so any OOP strategy to have assets inherit properties from a common ancestor is unfortunately not viable.

image

image

image

@wjglerum @RayRoestenburg @michaelpnash please review and advise if this is agreeable.

Maybe we do not need expect cloudflow main code base to handle this because support can be implemented externally, in cloudflow-contrib for example.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
kind/bug Something isn't working. kind/enhancement New feature or request. kind/maintenance Refactoring task or improving code health status.
Projects
None yet
Development

No branches or pull requests

3 participants