Skip to content

Commit

Permalink
Merge pull request #2 from xebia-functional/sharding-reafa
Browse files Browse the repository at this point in the history
connecting sharding to the consumer
  • Loading branch information
beherrera authored Dec 11, 2023
2 parents 1660a1a + b375b82 commit 8571732
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 16 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ lazy val root = (project in file("."))
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-distributed-data" % AkkaVersion,
"com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,
"ch.qos.logback" % "logback-classic" % LogbackClassicVersion,
"com.newrelic.agent.java" %% "newrelic-scala-api" % "8.7.0",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test
),
Expand Down
16 changes: 16 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,27 @@ akka {
log-config-on-start = off

actor {
provider = cluster
serialization-bindings {
"com.xebia.useractorpoc.CborSerializable" = jackson-cbor
}
}

remote {
artery {
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}

cluster {
seed-nodes = [
"akka://[email protected]:2551",
"akka://[email protected]:2552",
]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}

persistence {
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
Expand Down
81 changes: 70 additions & 11 deletions src/main/scala/com/xebia/conspoc/ConsPOC.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,84 @@
package com.xebia.conspoc

import akka.actor.typed.ActorSystem
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.{Keep, Sink}
import com.xebia.useractorpoc.UserActor
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.pattern.StatusReply
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.Future
import scala.io.StdIn

sealed trait FilterInt {
def apply(x: Int): Boolean
}

object FilterInt {
final case object Even extends FilterInt {
def apply(x: Int): Boolean = x % 2 == 0
}

final case object Odd extends FilterInt {
def apply(x: Int): Boolean = !Even(x)
}
}

object ConsPOC extends App {
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "consumer")

val config = system.settings.config.getConfig("akka.kafka.consumer")
val bootstrapServers = system.settings.config.getString("bootstrapServers")
val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
private def configWithPort(port: Int): Config =
ConfigFactory.parseString(
s"""
akka.remote.artery.canonical.port = $port
""").withFallback(ConfigFactory.load())

val port: Int = args.lift(1) match {
case Some(portString) if portString.matches("""\d+""") => portString.toInt
case _ => throw new IllegalArgumentException("An akka cluster port argument is required")
}

val filter = args.lift(0) match {
case Some("even") => FilterInt.Even
case _ => FilterInt.Odd
}

val groupId = args.lift(0).getOrElse("group")

// implicit val system = ActorSystem(Behaviors.empty, "consumer", configWithPort(port))
implicit val consumerAS = ActorSystem(Behaviors.empty, "consumer")

val shardingAS = ActorSystem[Nothing](Behaviors.empty, "Sharding", configWithPort(port))

val sharding = ClusterSharding(shardingAS)

sharding.init(Entity(UserActor.TypeKey) { entityContext =>
UserActor(entityContext.entityId)
})

val consumerConfig = consumerAS.settings.config.getConfig("akka.kafka.consumer")
val bootstrapServers = consumerAS.settings.config.getString("bootstrapServers")
val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("group1")
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val (control, streamCompletion): (Consumer.Control, Future[akka.Done]) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("users"))
.map(msg => println(s"Received: ${msg.value()}"))
.filter(record => filter(record.value().toInt))
.map { msg =>
val id = msg.value()
val entityRef = sharding.entityRefFor(UserActor.TypeKey, id)
entityRef.tell(UserActor.Add(id, shardingAS.deadLetters))
}
.toMat(Sink.ignore)(Keep.both)
.run()

Expand All @@ -33,7 +87,12 @@ object ConsPOC extends App {
StdIn.readLine()

// Shutdown logic
control.shutdown().onComplete { _ =>
system.terminate()
}(system.executionContext)
}
control
.shutdown()
.onComplete { _ =>
consumerAS.terminate()
}(consumerAS.executionContext)



}
10 changes: 10 additions & 0 deletions src/main/scala/com/xebia/shardingpoc/Users.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.xebia.shardingpoc

object Users {

def main(args: Array[String]): Unit = {


}

}
22 changes: 17 additions & 5 deletions src/main/scala/com/xebia/useractorpoc/UserActor.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package com.xebia.useractorpoc

import akka.actor.typed.Behavior
import akka.actor.typed.ActorRef
import akka.actor.typed.{ActorRef, ActorSystem, Behavior, SupervisorStrategy}
import akka.actor.typed.scaladsl._
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, RetentionCriteria}
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey

import scala.concurrent.duration._
import akka.actor.typed.SupervisorStrategy
import akka.pattern.StatusReply


object UserActor {

val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("UserActor")

def initSharding(system: ActorSystem[_]): ActorRef[ShardingEnvelope[Command]] =
ClusterSharding(system).init(Entity(TypeKey) { entityContext =>
UserActor(entityContext.entityId)
})

sealed trait Command
final case class Add(number: String, replyTo: ActorRef[StatusReply[Summary]]) extends Command
final case class Get(replyTo: ActorRef[Summary]) extends Command
Expand Down Expand Up @@ -47,8 +59,8 @@ object UserActor {
replyTo ! state.toSummary
Effect.none
}
case Clear =>
Effect.persist(Cleared)
case Clear => Effect.persist(Cleared)
case Summary(_) => Effect.none
}
}

Expand Down

0 comments on commit 8571732

Please sign in to comment.