Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pekko support #2899

Merged
merged 8 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
run: docker run -d -it -p 39227:9200 -p 39337:9300 -e "discovery.type=single-node" -v /home/runner/work/elastic4s/elastic4s/elastic4s-tests/src/test/resources/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:8.5.3

- name: run tests
run: sbt ++3.2.0 elastic4s-scala3/test
run: sbt ++3.3.0 elastic4s-scala3/test

- name: Import GPG key
id: import_gpg
Expand All @@ -125,7 +125,7 @@ jobs:
echo "email: ${{ steps.import_gpg.outputs.email }}"

- name: publish snapshot
run: sbt ++3.2.0 elastic4s-scala3/publish
run: sbt ++3.3.0 elastic4s-scala3/publish
env:
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ jobs:
run: docker run -d -it -p 39227:9200 -p 39337:9300 -e "discovery.type=single-node" -v /home/runner/work/elastic4s/elastic4s/elastic4s-tests/src/test/resources/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:8.5.3

- name: run tests
run: sbt ++3.2.0 elastic4s-scala3/test
run: sbt ++3.3.0 elastic4s-scala3/test
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}

- name: publish 3.0 release
run: sbt ++3.2.0 elastic4s-scala3/publishSigned
run: sbt ++3.3.0 elastic4s-scala3/publishSigned
env:
RELEASE_VERSION: ${{ github.event.inputs.version }}
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
Expand Down
24 changes: 19 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def ossrhUsername = sys.env.getOrElse("OSSRH_USERNAME", "")
def ossrhPassword = sys.env.getOrElse("OSSRH_PASSWORD", "")

val scala2Versions = Seq("2.12.17", "2.13.11")
val scalaAllVersions = scala2Versions :+ "3.2.2"
val scalaAllVersions = scala2Versions :+ "3.3.0"
lazy val commonScalaVersionSettings = Seq(
scalaVersion := "2.12.17",
crossScalaVersions := Nil
Expand Down Expand Up @@ -139,7 +139,8 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq(
ziojson,
clientsttp,
httpstreams,
akkastreams
akkastreams,
pekkostreams
)
lazy val scala3_root = Project("elastic4s-scala3", file("scala3"))
.settings(name := "elastic4s")
Expand All @@ -157,7 +158,7 @@ lazy val root = Project("elastic4s", file("."))
noPublishSettings
)
.aggregate(
Seq[ProjectReference](scalaz, sprayjson, ziojson_1, clientakka) ++ scala3Projects: _*
Seq[ProjectReference](scalaz, sprayjson, ziojson_1, clientakka, clientpekko) ++ scala3Projects: _*
)

lazy val domain = (project in file("elastic4s-domain"))
Expand Down Expand Up @@ -269,6 +270,12 @@ lazy val akkastreams = (project in file("elastic4s-streams-akka"))
.settings(scala3Settings)
.settings(libraryDependencies += Dependencies.akkaStream)

lazy val pekkostreams = (project in file("elastic4s-streams-pekko"))
.dependsOn(core, testkit % "test", jackson % "test")
.settings(name := "elastic4s-streams-pkko")
.settings(scala3Settings)
.settings(libraryDependencies += Dependencies.pekkoStream)

lazy val jackson = (project in file("elastic4s-json-jackson"))
.dependsOn(core)
.settings(name := "elastic4s-json-jackson")
Expand Down Expand Up @@ -324,8 +331,15 @@ lazy val clientsttp = (project in file("elastic4s-client-sttp"))
lazy val clientakka = (project in file("elastic4s-client-akka"))
.dependsOn(core, testkit % "test")
.settings(name := "elastic4s-client-akka")
.settings(scala2Settings) // tests need re-writing to not use scalaMock. We also need akka-http to be cross-published, which depends on an akka bump with restrictive licensing changes
.settings(libraryDependencies ++= Seq(akkaHTTP, akkaStream, scalaMock))
.settings(scala2Settings) // We need akka-http to be cross-published, which depends on an akka bump with restrictive licensing changes
.settings(libraryDependencies ++= Seq(akkaHTTP, akkaStream))

lazy val clientpekko = (project in file("elastic4s-client-pekko"))
.dependsOn(core, testkit % "test")
.settings(name := "elastic4s-client-pekko")
.settings(scala3Settings)
.settings(libraryDependencies ++= Seq(pekkoHTTP, pekkoStream))


lazy val tests = (project in file("elastic4s-tests"))
.settings(name := "elastic4s-tests")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,33 @@ package com.sksamuel.elastic4s.akka
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes, Uri}
import com.sksamuel.elastic4s.{ElasticRequest, HttpEntity => ElasticEntity, HttpResponse => ElasticResponse}
import org.scalamock.function.MockFunction1
import org.scalamock.scalatest.MockFactory
import org.mockito.ArgumentMatchers._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatestplus.mockito.MockitoSugar
import org.mockito.Mockito._

import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

class AkkaHttpClientMockTest
extends AnyWordSpec
with Matchers
with MockFactory
with MockitoSugar
with ScalaFutures
with IntegrationPatience
with BeforeAndAfterAll {

private implicit lazy val system: ActorSystem = ActorSystem()

override def afterAll: Unit = {
override def afterAll(): Unit = {
system.terminate()
}

def mockHttpPool(): (MockFunction1[HttpRequest, Try[HttpResponse]], TestHttpPoolFactory) = {
val sendRequest = mockFunction[HttpRequest, Try[HttpResponse]]
def mockHttpPool(): (Function[HttpRequest, Try[HttpResponse]], TestHttpPoolFactory) = {
val sendRequest = mock[Function[HttpRequest, Try[HttpResponse]]]
val poolFactory = new TestHttpPoolFactory(sendRequest)
(sendRequest, poolFactory)
}
Expand All @@ -49,22 +50,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("ok")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host2/test")
}))
.thenReturn(Success(HttpResponse().withEntity("ok")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -91,14 +92,14 @@ class AkkaHttpClientMockTest
blacklist,
httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.add _).expects("host1").returns(true)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -122,22 +123,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -158,22 +159,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Failure(new Exception("Some exception")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Failure(new Exception("Some exception")))

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host2/test")
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -194,16 +195,16 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(true)
(blacklist.size _).expects().returns(1)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(true)
when(blacklist.size).thenReturn(1)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class AkkaHttpClientTest extends AnyFlatSpec with Matchers with DockerTests with

private implicit lazy val system: ActorSystem = ActorSystem()

override def beforeAll: Unit = {
override def beforeAll(): Unit = {
Try {
client.execute {
deleteIndex("testindex")
}.await
}
}

override def afterAll: Unit = {
override def afterAll(): Unit = {
Try {
client.execute {
deleteIndex("testindex")
Expand Down
18 changes: 18 additions & 0 deletions elastic4s-client-pekko/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
com.sksamuel.elastic4s.pekko {
hosts = []
https = false
verify-ssl-certificate = true
// optionally provide credentials
// username = ...
// password = ...
queue-size = 1000
blacklist {
min-duration = 1m
max-duration = 30m
}
max-retry-timeout = 30s
pekko.http {
// pekko-http settings specific for elastic4s
// can be overwritten in this section
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.sksamuel.elastic4s.pekko

/**
* List of 'bad' hosts.
* Implementation must have expiration logic backed-in.
*/
private[pekko] trait Blacklist {

/**
* Adds a host to the blacklist.
*
* @param host host
* @return true if record is blacklisted for the first time
*/
def add(host: String): Boolean

/**
* Removes a host from the blacklist.
*
* @param host host
* @return true if host was blacklisted
*/
def remove(host: String): Boolean

/**
* Checks if a host can be used.
*
* @param host host
* @return true if host is not in a blacklist or temporary removed from it
*/
def contains(host: String): Boolean

/**
* Number of hosts in blacklist
*/
def size: Int

/**
* List all hosts in the blacklist
*
* @return
*/
def list: List[String]
}

Loading
Loading