Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an integration test to check dynamo for missing items #6083

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ object ProcessSupporterRatePlanItemLambda extends SafeLogging {

def processItem(supporterRatePlanItem: SupporterRatePlanItem) = {
if (itemIsDiscount(supporterRatePlanItem)) {
logger.info(s"Supporter rate plan item ${supporterRatePlanItem.asJson.spaces2} is a discount")
logger.info(
s"Supporter rate plan item ${supporterRatePlanItem.asJson.spaces2} is a discount rate plan, not a product",
)
Future.successful(())
} else
addAmountIfContribution(supporterRatePlanItem).flatMap(writeItemToDynamo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ class QueryZuoraLambda extends Handler[QueryZuoraState, FetchResultsState] {
}

object QueryZuoraLambda extends StrictLogging {
val stage = StageConstructors.fromEnvironment
val config = ConfigService(stage).load
val service = new ZuoraQuerierService(config, configurableFutureRunner(60.seconds))

def queryZuora(stage: Stage, queryType: QueryType) = {
val config = ConfigService(stage).load
val service = new ZuoraQuerierService(config, configurableFutureRunner(60.seconds))
logger.info(s"Attempting to submit ${queryType.value} query to Zuora")

// Get the time we started the query. Docs for why we need to do this are here:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.gu.lambdas

import com.gu.services.SqsService
import com.gu.supporterdata.model.{Stage, SupporterRatePlanItem}
import com.gu.supporterdata.services.SupporterDataDynamoService
import com.gu.test.tags.annotations.IntegrationTest
import com.typesafe.scalalogging.LazyLogging
import kantan.csv.ops.{toCsvInputOps, toCsvOutputOps}
import kantan.csv.rfc
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.File
import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.io.Codec.fallbackSystemCodec
import scala.io.Source

@IntegrationTest
class CompareExportToDynamoSpec extends AsyncFlatSpec with Matchers with LazyLogging {

"This test compares the export in supporter-product-data/data-extracts to the data in DynamoDB. There " should
"not be any missing identity ids in Dynamo" in {
val stage = Stage.PROD

val results = loadQueryResults
val csvReader = results.asCsvReader[SupporterRatePlanItemWithStatus](rfc.withHeader)

val items = csvReader.collect { case Right(item) => item }.toList
println(s"found ${items.length} rows in CSV")
val dynamoService = SupporterDataDynamoService(stage)
val activeItems = items
.filter(_.subscriptionStatus == "Active")
.filter(_.termEndDate.isAfter(java.time.LocalDate.now().minusDays(1)))
println(s"found ${activeItems.length} active items to check")

val executorService = Executors.newFixedThreadPool(50)
implicit val ec = ExecutionContext.fromExecutor(executorService)

var missing = 0
var found = 0

val eventualTuples = activeItems
.map { item =>
Await.result(
dynamoService
.subscriptionExists(item.identityId, item.subscriptionName)
.map {
case Right(exists) =>
if (!exists) {
missing += 1
println(s"${item.identityId} ${item.subscriptionName} does not exist in Dynamo")
println(s"Missing: $missing, Found: $found")
} else {
found += 1
}
(exists, item)
case Left(error) =>
logger.error(s"Error checking ${item.identityId} ${item.subscriptionName} - $error")
fail
},
Duration(10, "seconds"),
)
}

val (exists, doesNotExist) = eventualTuples.partition(tuple => tuple._1)
println(s"Found ${exists.length} items in Dynamo")
saveResults("missing-from-dynamo.csv", doesNotExist.map(_._2))
saveResults("found-in-dynamo.csv", exists.map(_._2))
assert(condition = true)

}
"None of the missing items" should "exist in Dynamo" in {
val missing = Source.fromFile("./supporter-product-data/data-extracts/missing-from-dynamo.csv").mkString
val csvReader = missing.asCsvReader[SupporterRatePlanItemWithStatus](rfc.withHeader)
val items = csvReader.collect { case Right(item) => item }.toList
val dynamoService = SupporterDataDynamoService(Stage.PROD)
val executorService = Executors.newFixedThreadPool(1)
implicit val ec = ExecutionContext.fromExecutor(executorService)
items.foreach { item =>
println(s"Checking ${item.identityId} ${item.subscriptionName}")
Await.result(
dynamoService.subscriptionExists(item.identityId, item.subscriptionName).map { exists =>
assert(exists.contains(false), s"${item.identityId} ${item.subscriptionName} exists in Dynamo")
},
Duration(10, "seconds"),
)
}
assert(condition = true)
}
"Missing subscriptions" should "be added to Dynamo" in {
val sqsService = SqsService(Stage.PROD)
val missing = Source.fromFile("./supporter-product-data/data-extracts/missing-from-dynamo.csv").mkString
val csvReader = missing.asCsvReader[SupporterRatePlanItemWithStatus](rfc.withHeader)
val batches = csvReader
.collect { case Right(item) => item }
.toList
.map(item =>
SupporterRatePlanItem(
item.subscriptionName,
item.identityId,
None,
item.productRatePlanId,
item.productRatePlanName,
item.termEndDate,
item.termEndDate.minusYears(1),
None,
),
)
.zipWithIndex
.grouped(10)
batches.foreach(sqsService.sendBatch)
assert(condition = true)
}
"An individual subscription" should "exist in Dynamo" in {
val identityId = "100002649"
val subscriptionName = "A-S01060830"
val stage = Stage.PROD

val dynamoService = SupporterDataDynamoService(stage)
dynamoService.subscriptionExists(identityId, subscriptionName).map { exists =>
assert(exists.contains(true))
}
}

def loadQueryResults: String = Source.fromFile("./supporter-product-data/data-extracts/last-Full-PROD.csv").mkString
def saveResults(filename: String, items: List[SupporterRatePlanItemWithStatus]) = {
val out = new File(s"${System.getProperty("user.dir")}/supporter-product-data/data-extracts/$filename")
out.writeCsv(items, rfc.withHeader)
println(s"Saved ${items.length} items to ${out.getAbsolutePath}")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.gu.lambdas

import com.gu.model.ZuoraFieldNames._
import kantan.csv.HeaderCodec
import kantan.csv.java8.{defaultLocalDateCellDecoder, defaultLocalDateCellEncoder}

import java.time.LocalDate

case class SupporterRatePlanItemWithStatus(
subscriptionName: String, // Unique identifier for the subscription
identityId: String, // Unique identifier for user
productRatePlanId: String, // Unique identifier for the product in this rate plan
productRatePlanName: String, // Name of the product in this rate plan
termEndDate: LocalDate,
subscriptionStatus: String,
)

object SupporterRatePlanItemWithStatus {
implicit val personCodec: HeaderCodec[SupporterRatePlanItemWithStatus] =
HeaderCodec.caseCodec(
subscriptionName,
identityId,
productRatePlanId,
productRatePlanName,
termEndDate,
"Subscription.Status",
)(
SupporterRatePlanItemWithStatus.apply,
)(
SupporterRatePlanItemWithStatus.unapply,
)
}
Loading