Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistent tentant registry #555

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions greyhoundsidecar/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ Compile / PB.targets := Seq(
scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value / "scalapb"
)

val zioVersion = "2.0.10"

lazy val root = (project in file("."))
.settings(
name := "greyhound-sidecar",
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.3",
"dev.zio" %% "zio" % zioVersion,
"io.grpc" % "grpc-netty" % "1.51.0",
"com.wix" %% "greyhound-core" % "0.3.0",
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
Expand All @@ -21,14 +23,17 @@ lazy val root = (project in file("."))
"org.slf4j" % "slf4j-api" % "1.7.25",
"ch.qos.logback" % "logback-classic" % "1.1.3",
"dev.zio" %% "zio-logging-slf4j" % "2.1.3",
"dev.zio" %% "zio-redis" % "0.2.0",

// -- test -- //
"dev.zio" %% "zio-test" % "2.0.3" % Test,
"dev.zio" %% "zio-test-junit" % "2.0.3" % Test,
"dev.zio" %% "zio-test-sbt" % "2.0.3" % Test,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-junit" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
"org.specs2" %% "specs2-core" % "4.14.1" % Test,
"org.apache.curator" % "curator-test" % "5.3.0" % Test,
"com.wix" %% "greyhound-testkit" % "0.3.0" % Test
"com.wix" %% "greyhound-testkit" % "0.3.0" % Test,
"dev.zio" %% "zio-redis-embedded" % "0.2.0" % Test,
"dev.zio" %% "zio-streams" % zioVersion % Test,
),
packageName := "greyhound-sidecar",
version := "1.0",
Expand Down
15 changes: 15 additions & 0 deletions greyhoundsidecar/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ services:
networks:
- greyhound-network

greyhound-sidecar-cache:
image: redis:7.2-rc1-alpine
ports:
- 6379:6379
volumes:
- redis_data:/data
networks:
- greyhound-network

greyhound-sidecar:
depends_on:
- kafka
- greyhound-sidecar-cache
image: greyhound-sidecar:1.0
ports:
- 9000:9000
Expand All @@ -61,3 +73,6 @@ services:
KAFKA_ADDRESS: kafka:29092
networks:
- greyhound-network

volumes:
redis_data:
13 changes: 9 additions & 4 deletions greyhoundsidecar/src/main/scala/greyhound/Registry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ trait Registry {
def isUniqueConsumer(topic: String, consumerGroup: String, tenantId: String): UIO[Boolean]
}

case class TenantRegistry(ref: Synchronized[Map[String, TenantInfo]]) extends Registry {
case class TenantRegistry(
ref: Synchronized[Map[String, TenantInfo]],
registryRepo: RegistryRepo
) extends Registry {
override def addTenant(tenantId: String, host: String, port: Int): Task[Unit] = {
ref.updateZIO(tenants => if (tenants.values.map(_.hostDetails).exists(hostDetails => hostDetails.host == host && hostDetails.port == port))
ZIO.fail(new StatusRuntimeException(Status.ALREADY_EXISTS))
Expand Down Expand Up @@ -80,9 +83,11 @@ case class TenantRegistry(ref: Synchronized[Map[String, TenantInfo]]) extends Re
}

object TenantRegistry {
val layer = ZLayer.fromZIO {
Synchronized.make(Map.empty[String, TenantInfo])
.map(TenantRegistry(_))
val layer: ZLayer[RegistryRepo, Nothing, TenantRegistry] = ZLayer.fromZIO {
for {
ref <- Synchronized.make(Map.empty[String, TenantInfo])
registryRepo <- ZIO.service[RegistryRepo]
} yield TenantRegistry(ref, registryRepo)
}
}

Expand Down
32 changes: 32 additions & 0 deletions greyhoundsidecar/src/main/scala/greyhound/RegistryRepo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package greyhound

import zio.Ref.Synchronized
import zio.{Task, UIO, ZIO, ZLayer}
import zio.redis.{CodecSupplier, Redis, RedisConfig, RedisError, RedisExecutor}

import scala.util.Try

trait RegistryRepo {
def addTenant(tenantId: String, host: String, port: Int): Task[Unit]
}

case class RedisRegistryRepo(redis: Redis) extends RegistryRepo {
override def addTenant(tenantId: String, host: String, port: Int): Task[Unit] = ???
}

//TODO separate into two files
case class InMemoryRegistryRepo(ref: Synchronized[Map[String, TenantInfo]]) extends RegistryRepo {
override def addTenant(tenantId: String, host: String, port: Int): Task[Unit] = ???
}

object InMemoryRegistryRepo {
lazy val layer: ZLayer[Any, Throwable, RegistryRepo] = ZLayer.fromZIO {
for {
ref <- Synchronized.make(Map.empty[String, TenantInfo])
} yield InMemoryRegistryRepo(ref)
}
}

object RedisRegistryRepo {
lazy val layer: ZLayer[Redis, RedisError.IOError, RegistryRepo] = ZLayer.fromFunction(RedisRegistryRepo(_))
}
51 changes: 43 additions & 8 deletions greyhoundsidecar/src/main/scala/greyhound/SidecarServerMain.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package greyhound

import com.wixpress.dst.greyhound.sidecar.api.v1.greyhoundsidecar.ZioGreyhoundsidecar
import io.grpc.Status
import scalapb.zio_grpc.{RequestContext, ServerMain, ServiceList, ZTransform}
import zio.logging.backend.SLF4J
import zio.redis._
import zio.stream.ZStream
import zio.{Cause, Runtime, URIO, ZIO}
import zio.{Cause, Runtime, UIO, URIO, ZIO, ZLayer}

object SidecarServerMain extends ServerMain {

Expand Down Expand Up @@ -46,18 +48,51 @@ object SidecarServerMain extends ServerMain {
.onError(logCause)
}

val sidecarService = for {
private def getRedisConfig: UIO[RedisConfig] = {
val maybeConfig = for {
redisHost <- ZIO.fromOption(scala.util.Properties.envOrNone("REDIS_HOST"))
redisPortEnv <- ZIO.fromOption(scala.util.Properties.envOrNone("REDIS_PORT"))
redisPort <- ZIO.attempt(redisPortEnv.toInt)
} yield RedisConfig(redisHost, redisPort)

maybeConfig.catchAll(_ => ZIO.succeed(RedisConfig.Default))
}

private val redisConfigLayer = ZLayer.fromZIO(getRedisConfig)

// Currently supports only single node
private lazy val redisLayer: ZLayer[Any, RedisError.IOError, Redis] = ZLayer.make[Redis](
Redis.layer,
RedisExecutor.layer,
redisConfigLayer,
ZLayer.succeed(CodecSupplier.utf8string),
)

private lazy val redisRegistryRepo = ZLayer.make[RedisRegistryRepo](
ZLayer.fromFunction(RedisRegistryRepo(_)),
redisLayer,
)

private val sidecarService = for {
service <- ZIO.service[SidecarService]
kafkaAddress <- ZIO.service[KafkaInfo].map(_.address)
_ <- zio.Console.printLine(s"~~~ INIT Sidecar Service with kafka address $kafkaAddress").orDie
} yield service.transform[RequestContext](new LoggingTransform)

val initSidecarService = sidecarService.provide(
SidecarService.layer,
TenantRegistry.layer,
KafkaInfoLive.layer,
ConsumerCreatorImpl.layer)

private def getStandaloneMode: UIO[Boolean] = ZIO.attempt(
scala.util.Properties.envOrNone("STANDALONE_MODE").exists(_.toBoolean)
).catchAll(_ => ZIO.succeed(false))

private def initSidecarService: ZIO[Any, Throwable, ZioGreyhoundsidecar.ZGreyhoundSidecar[RequestContext]] = for {
isStandalone <- getStandaloneMode
partialService = sidecarService.provideSome[RegistryRepo](
SidecarService.layer,
TenantRegistry.layer,
KafkaInfoLive.layer,
ConsumerCreatorImpl.layer,
)
service <- if(isStandalone) partialService.provide(redisRegistryRepo) else partialService.provide(InMemoryRegistryRepo.layer)
} yield service
}

class LoggingTransform[R] extends ZTransform[R, Status, R with RequestContext] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,8 @@ class SidecarService(tenantRegistry: Registry,
}

object SidecarService {
val layer: ZLayer[Registry with ConsumerCreator with KafkaInfo, Nothing, SidecarService] = ZLayer.fromZIO {
for {
kafkaAddress <- ZIO.service[KafkaInfo].map(_.address)
tenantRegistry <- ZIO.service[Registry]
consumerCreator <- ZIO.service[ConsumerCreator]
} yield new SidecarService(tenantRegistry = tenantRegistry, consumerCreator = consumerCreator, kafkaAddress = kafkaAddress)
val layer = ZLayer.fromFunction { (registry: Registry, consumerCreator: ConsumerCreator, kafkaInfo: KafkaInfo) =>
new SidecarService(registry, consumerCreator, _ => ZIO.unit, kafkaInfo.address)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object MultiTenantTest extends JUnitRunnableSpec with KafkaTestSupport with Conn
override val kafkaPort: Int = 6667
override val zooKeeperPort: Int = 2187
override val sideCarUserGrpcPort: Int = 9107
override val isStandaloneMode: Boolean = false
val sideCarUser1GrpcPort = 9105
val sideCarUser2GrpcPort = 9106

Expand Down Expand Up @@ -238,7 +239,7 @@ object MultiTenantTest extends JUnitRunnableSpec with KafkaTestSupport with Conn
(testSidecarUser1Layer >>> sidecarUserServer1Layer) ++
testSidecarUser2Layer ++
(testSidecarUser2Layer >>> sidecarUserServer2Layer) ++
((TenantRegistry.layer ++ TestKafkaInfo.layer ++ (TenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
((TestTenantRegistry.layer ++ TestKafkaInfo.layer ++ (TestTenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
TestAspect.withLiveClock @@
runKafka(kafkaPort, zooKeeperPort) @@
sequential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object RetrySidecarServiceTest extends JUnitRunnableSpec with KafkaTestSupport w
override val kafkaPort: Int = 6669
override val zooKeeperPort: Int = 2189
override val sideCarUserGrpcPort: Int = 9109
override val isStandaloneMode: Boolean = false

val sidecarUserServerLayer = ZLayer.fromZIO(for {
user <- ZIO.service[FailOnceTestSidecarUser]
Expand Down Expand Up @@ -118,7 +119,7 @@ object RetrySidecarServiceTest extends JUnitRunnableSpec with KafkaTestSupport w
ZLayer.succeed(zio.Scope.global) ++
FailOnceTestSidecarUser.layer ++
(FailOnceTestSidecarUser.layer >>> sidecarUserServerLayer) ++
((TenantRegistry.layer ++ TestKafkaInfo.layer ++ (TenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
((TestTenantRegistry.layer ++ TestKafkaInfo.layer ++ (TestTenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
TestAspect.withLiveClock @@
runKafka(kafkaPort, zooKeeperPort) @@
sequential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object SidecarServiceTest extends JUnitRunnableSpec with KafkaTestSupport with C
override val kafkaPort: Int = 6668
override val zooKeeperPort: Int = 2188
override val sideCarUserGrpcPort: Int = 9108
override val isStandaloneMode: Boolean = false

val sidecarUserServerLayer = ZLayer.fromZIO(for {
user <- ZIO.service[TestSidecarUser]
Expand Down Expand Up @@ -155,7 +156,7 @@ object SidecarServiceTest extends JUnitRunnableSpec with KafkaTestSupport with C
ZLayer.succeed(zio.Scope.global) ++
TestSidecarUser.layer ++
(TestSidecarUser.layer >>> sidecarUserServerLayer) ++
((TenantRegistry.layer ++ TestKafkaInfo.layer ++ (TenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
((TestTenantRegistry.layer ++ TestKafkaInfo.layer ++ (TestTenantRegistry.layer >>> ConsumerCreatorImpl.layer)) >>> SidecarService.layer)) @@
TestAspect.withLiveClock @@
runKafka(kafkaPort, zooKeeperPort) @@
sequential
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
package greyhound.support

import greyhound.KafkaInfo
import greyhound.{InMemoryRegistryRepo, KafkaInfo, RedisRegistryRepo, RegistryRepo, TenantRegistry}
import zio.ZLayer
import zio.redis.{CodecSupplier, Redis, RedisExecutor}
import zio.redis.embedded.EmbeddedRedis

trait ConnectionSettings {
val kafkaPort: Int
val zooKeeperPort: Int
val sideCarUserGrpcPort: Int
val isStandaloneMode: Boolean

final val localhost: String = "localhost"

case class TestKafkaInfo() extends KafkaInfo {
override val address: String = s"$localhost:$kafkaPort"
}

//TODO move to test context?
object TestKafkaInfo {
val layer = ZLayer.succeed(TestKafkaInfo())
}

object TestRegistryRepo {
def layer = if (isStandaloneMode) redisTestLayer else inMemoryTestLayer

val redisTestLayer = ZLayer.make[RegistryRepo](
RedisRegistryRepo.layer,
Redis.layer,
RedisExecutor.layer,
EmbeddedRedis.layer,
ZLayer.succeed(CodecSupplier.utf8string),
)

val inMemoryTestLayer = InMemoryRegistryRepo.layer
}

object TestTenantRegistry {
val layer = ZLayer.make[TenantRegistry](
TenantRegistry.layer,
TestRegistryRepo.layer
)
}

//TODO also add sidecar service layer?
}