Skip to content

Commit

Permalink
Reduce MetadataSpec's flakiness (close #110)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter authored and istreeter committed Dec 23, 2020
1 parent 41798e1 commit 299e747
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 49 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ lazy val metadata = project
.settings(Seq(
name := "snowplow-scala-tracker-metadata",
libraryDependencies ++= Seq(
Dependencies.Libraries.scalajHttp,
Dependencies.Libraries.specs2Mock
Dependencies.Libraries.scalajHttp
)
))
.dependsOn(core % "test->test;compile->compile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scalaj.http.Http
*
* @see http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
*/
class Ec2Metadata[F[_]: Sync] {
class Ec2Metadata[F[_]: Sync](client: HttpClient = _.asString) {

val InstanceIdentitySchema =
SchemaKey("com.amazon.aws.ec2", "instance_identity_document", "jsonschema", SchemaVer.Full(1, 0, 0))
Expand Down Expand Up @@ -65,7 +65,7 @@ class Ec2Metadata[F[_]: Sync] {
parse(resp).toOption
.flatMap(_.asObject)
.map(jsonObject => prepareOrThrow(jsonObject))
.getOrElse(Sync[F].raiseError(new RuntimeException("Document can not be parsed")))
.getOrElse(Sync[F].raiseError(new RuntimeException(s"Document can not be parsed: $resp")))
}
}

Expand Down Expand Up @@ -122,7 +122,7 @@ class Ec2Metadata[F[_]: Sync] {
* @return value wrapped delayed inside F
*/
private[metadata] def getContent(url: String): F[String] =
Sync[F].delay(Http(url).asString.body)
Sync[F].delay(client(Http(url)).body)

/**
* Get content of node-link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.scalatracker.SelfDescribingJson
* Unlike EC2 instance document, GCE does not provide an excerpt, but instead
* this module collect only meaningful properties
*/
class GceMetadata[F[_]: Sync] {
class GceMetadata[F[_]: Sync](client: HttpClient = _.asString) {

val InstanceMetadataSchema: SchemaKey =
SchemaKey("com.google.cloud.gce", "instance_metadata", "jsonschema", SchemaVer.Full(1, 0, 0))
Expand Down Expand Up @@ -87,7 +87,7 @@ class GceMetadata[F[_]: Sync] {
Http(InstanceMetadataUri + path).header("Metadata-Flavor", "Google")

private[metadata] def getString(path: String): F[String] =
Sync[F].delay(request(path).asString.body)
Sync[F].delay(client(request(path)).body)

private def getJson(path: String): F[Json] =
getString(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ package com.snowplowanalytics.snowplow.scalatracker
import cats._
import cats.implicits._
import cats.effect.{Clock, Sync}
import scalaj.http.{HttpRequest, HttpResponse}

package object metadata {

private[metadata] type HttpClient = HttpRequest => HttpResponse[String]

implicit class TrackerMetadataOps[F[_]](val tracker: Tracker[F]) extends AnyVal {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@
package com.snowplowanalytics.snowplow.scalatracker.metadata

import java.net.{SocketTimeoutException, UnknownHostException}
import java.util.concurrent.Executors

import cats.Id
import cats.data.NonEmptyList
import cats.effect.{ContextShift, IO}
import com.snowplowanalytics.snowplow.scalatracker.{Emitter, Payload, Tracker}
import com.snowplowanalytics.snowplow.scalatracker.{Emitter, Payload}
import org.specs2.Specification
import org.specs2.mock.Mockito
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scalaj.http.HttpResponse

class MetadataSpec extends Specification with Mockito {
import com.snowplowanalytics.snowplow.scalatracker.syntax.id._
class MetadataSpec extends Specification {

implicit def contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
// Use enough threads to support the Thread.sleep calls, and still allow concurrent timeouts.
val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
implicit def contextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val timer = IO.timer(ec)

val ec2Response = IO.pure("""
val ec2Response = """
|{
| "devpayProductCodes" : null,
| "marketplaceProductCodes" : [ "1abc2defghijklm3nopqrs4tu" ],
Expand All @@ -47,22 +47,20 @@ class MetadataSpec extends Specification with Mockito {
| "ramdiskId" : null,
| "region" : "us-west-2"
|}
""".stripMargin)
""".stripMargin

val gceResponse = IO.pure("""{ "foo": "bar" }""")

val ec2Spy =
spy(new Ec2Metadata[IO]).getContent(anyString).returns(ec2Response).getMock[Ec2Metadata[IO]]
val gceSpy =
spy(new GceMetadata[IO]).getString(anyString).returns(gceResponse).getMock[GceMetadata[IO]]
val gceResponse = """{ "foo": "bar" }"""

def is = s2"""

ec2 extension method should make a request and throw an exception $e1
gce extension method should make a request and throw an exception $e2
ec2 method should make a request and return json $e1
gce method should make a request and return json $e2

ec2 method should make a request and throw an exception $e3
gce method should make a request and throw an exception $e4

ec2 timeout method must work correctly $e3
gce timeout method must work correctly $e4
ec2 timeout method must work correctly $e5
gce timeout method must work correctly $e6

"""

Expand All @@ -71,36 +69,63 @@ class MetadataSpec extends Specification with Mockito {
override def flushBuffer(): Unit = ()
}

def e1 =
Tracker(NonEmptyList.of(emitter), "foo", "foo")
.enableEc2Context[IO]
.unsafeRunSync() must throwA[SocketTimeoutException]
def e1 = {
val client: HttpClient = { _ =>
new HttpResponse(ec2Response, 200, Map.empty)
}

def e2 =
Tracker(NonEmptyList.of(emitter), "foo", "foo")
.enableGceContext[IO]
.unsafeRunSync() must throwA[UnknownHostException]
new Ec2Metadata[IO](client).getInstanceContext
.unsafeRunSync()
.schema
.vendor must beEqualTo("com.amazon.aws.ec2")
}

def e3 = {
ec2Spy.getInstanceContextBlocking.unsafeRunSync() must beSome
def e2 = {
val client: HttpClient = { _ =>
new HttpResponse(gceResponse, 200, Map.empty)
}

val blockingInstance = spy(new Ec2Metadata[IO])
.getContent(anyString)
.returns(IO.sleep(5.seconds).map(_ => "foo"))
.getMock[Ec2Metadata[IO]]
new GceMetadata[IO](client).getInstanceContext
.unsafeRunSync()
.schema
.vendor must beEqualTo("com.google.cloud.gce")
}

blockingInstance.getInstanceContextBlocking.unsafeRunSync() must beNone
def e3 = {
val client: HttpClient = { _ =>
throw new SocketTimeoutException()
}
new Ec2Metadata[IO](client).getInstanceContext
.unsafeRunSync() must throwA[SocketTimeoutException]
}

def e4 = {
gceSpy.getInstanceContextBlocking.unsafeRunSync() must beSome
val client: HttpClient = { _ =>
throw new UnknownHostException()
}
new GceMetadata[IO](client).getInstanceContext
.unsafeRunSync() must throwA[UnknownHostException]
}

def e5 = {

val client: HttpClient = { _ =>
Thread.sleep(5000)
new HttpResponse(ec2Response, 200, Map.empty)
}

new Ec2Metadata[IO](client).getInstanceContextBlocking
.unsafeRunSync() must beNone
}

val blockingInstance = spy(new GceMetadata[IO])
.getString(anyString)
.returns(IO.sleep(5.seconds).map(_ => "foo"))
.getMock[GceMetadata[IO]]
def e6 = {
val client: HttpClient = { _ =>
Thread.sleep(5000)
new HttpResponse(gceResponse, 200, Map.empty)
}

blockingInstance.getInstanceContextBlocking.unsafeRunSync() must beNone
new GceMetadata[IO](client).getInstanceContextBlocking
.unsafeRunSync() must beNone
}

}
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ object Dependencies {

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test"
val specs2Mock = "org.specs2" %% "specs2-mock" % V.specs2 % "test"
val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % "test"
val circeOptics = "io.circe" %% "circe-optics" % V.circeOptics % "test"
}
Expand Down

0 comments on commit 299e747

Please sign in to comment.