Skip to content

Commit

Permalink
API-2532: Added atomicSave to SubscriptionFieldsRepository to ensure … (
Browse files Browse the repository at this point in the history
#36)

* API-2532: Added atomicSave to SubscriptionFieldsRepository to ensure save and retrieval of data is atomic.

* API-2532: Fix WRT review feedback.

* API-2532: Extended scope of type alias code improvement.

* API-2532: code cleaning

* API-2532: Extracted saveAtomic to MongoCrudHelper. Fixed logging bug.

* API-2532: some cleaning
  • Loading branch information
googley42 authored and gokyo committed Nov 14, 2017
1 parent 1e17654 commit 9163bbd
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 68 deletions.
2 changes: 2 additions & 0 deletions app/uk/gov/hmrc/apisubscriptionfields/model/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ package object model {

type Fields = Map[String, String]

type IsInsert = Boolean

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import play.api.libs.json._
import reactivemongo.api.indexes.IndexType
import reactivemongo.bson.BSONObjectID
import reactivemongo.play.json.collection.JSONCollection
import uk.gov.hmrc.apisubscriptionfields.model.{ApiContext, ApiVersion}
import uk.gov.hmrc.apisubscriptionfields.model.{ApiContext, ApiVersion, IsInsert}
import uk.gov.hmrc.mongo.ReactiveRepository
import uk.gov.hmrc.mongo.json.ReactiveMongoFormats

Expand All @@ -32,7 +32,7 @@ import scala.concurrent.Future
@ImplementedBy(classOf[FieldsDefinitionMongoRepository])
trait FieldsDefinitionRepository {

def save(fieldsDefinition: FieldsDefinition): Future[(FieldsDefinition, Boolean)]
def save(fieldsDefinition: FieldsDefinition): Future[(FieldsDefinition, IsInsert)]

def fetch(apiContext: ApiContext, apiVersion: ApiVersion): Future[Option[FieldsDefinition]]
def fetchAll(): Future[List[FieldsDefinition]]
Expand Down Expand Up @@ -61,7 +61,7 @@ class FieldsDefinitionMongoRepository @Inject()(mongoDbProvider: MongoDbProvider
)
)

override def save(fieldsDefinition: FieldsDefinition): Future[(FieldsDefinition, Boolean)] = {
override def save(fieldsDefinition: FieldsDefinition): Future[(FieldsDefinition, IsInsert)] = {
save(fieldsDefinition, selectorForFieldsDefinition(fieldsDefinition))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package uk.gov.hmrc.apisubscriptionfields.repository

import play.api.Logger
import play.api.libs.json.{JsObject, OWrites, Reads}
import play.api.libs.json.{JsObject, OFormat, OWrites, Reads}
import reactivemongo.api.Cursor
import reactivemongo.play.json.ImplicitBSONHandlers._
import reactivemongo.play.json.collection.JSONCollection
import uk.gov.hmrc.apisubscriptionfields.model.IsInsert

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand All @@ -29,7 +30,34 @@ trait MongoCrudHelper[T] extends MongoIndexCreator with MongoErrorHandler {

protected val mongoCollection: JSONCollection

def save(entity: T, selector: JsObject)(implicit w: OWrites[T]): Future[(T, Boolean)] = {
def saveAtomic(selector: JsObject, updateOperations: JsObject)(implicit w: OFormat[T]): Future[(T, IsInsert)] = {
Logger.debug(s"[saveAtomic] selector: $selector updateOperations: $updateOperations")

val updateOp = mongoCollection.updateModifier(
update = updateOperations,
fetchNewObject = true,
upsert = true
)

mongoCollection.findAndModify(selector, updateOp).map { findAndModifyResult =>
val maybeTuple: Option[(T, IsInsert)] = for {
value <- findAndModifyResult.value
updateLastError <- findAndModifyResult.lastError
} yield (value.as[T], !updateLastError.updatedExisting)

maybeTuple.fold[(T, IsInsert)] {
handleError(selector, findAndModifyResult)
} (tuple => tuple)
}
}

private def handleError(selector: JsObject, findAndModifyResult: mongoCollection.BatchCommands.FindAndModifyCommand.FindAndModifyResult) = {
val error = s"Error upserting database for $selector."
Logger.error(s"$error lastError: ${findAndModifyResult.lastError}")
throw new RuntimeException(error)
}

def save(entity: T, selector: JsObject)(implicit w: OWrites[T]): Future[(T, IsInsert)] = {
Logger.debug(s"[save] entity: $entity selector: $selector")
mongoCollection.update(selector, entity, upsert = true).map {
updateWriteResult => (entity, handleSaveError(updateWriteResult, s"Could not save entity: $entity"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package uk.gov.hmrc.apisubscriptionfields.repository

import play.api.Logger
import reactivemongo.api.commands.{UpdateWriteResult, WriteResult}
import uk.gov.hmrc.apisubscriptionfields.model.IsInsert

trait MongoErrorHandler {

def handleDeleteError(result: WriteResult, exceptionMsg: => String): Boolean = {
handleError(result, databaseAltered, exceptionMsg)
}

def handleSaveError(updateWriteResult: UpdateWriteResult, exceptionMsg: => String): Boolean = {
def handleSaveError(updateWriteResult: UpdateWriteResult, exceptionMsg: => String): IsInsert = {

def handleUpsertError(result: WriteResult) =
if (databaseAltered(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package uk.gov.hmrc.apisubscriptionfields.repository
import javax.inject.{Inject, Singleton}

import com.google.inject.ImplementedBy
import play.api.Logger
import play.api.libs.json._
import reactivemongo.api.indexes.IndexType
import reactivemongo.bson.BSONObjectID
import reactivemongo.play.json._
import reactivemongo.play.json.collection.JSONCollection
import uk.gov.hmrc.apisubscriptionfields.model._
import uk.gov.hmrc.mongo.ReactiveRepository
Expand All @@ -32,7 +34,7 @@ import scala.concurrent.Future
@ImplementedBy(classOf[SubscriptionFieldsMongoRepository])
trait SubscriptionFieldsRepository {

def save(subscription: SubscriptionFields): Future[(SubscriptionFields, Boolean)]
def saveAtomic(subscription: SubscriptionFields): Future[(SubscriptionFields, IsInsert)]

def fetch(clientId: ClientId, apiContext: ApiContext, apiVersion: ApiVersion): Future[Option[SubscriptionFields]]
def fetchByFieldsId(fieldsId: SubscriptionFieldsId): Future[Option[SubscriptionFields]]
Expand Down Expand Up @@ -75,8 +77,18 @@ class SubscriptionFieldsMongoRepository @Inject()(mongoDbProvider: MongoDbProvid
)
)

override def save(subscription: SubscriptionFields): Future[(SubscriptionFields, Boolean)] = {
save(subscription, selectorForSubscriptionFields(subscription))
override def saveAtomic(subscription: SubscriptionFields): Future[(SubscriptionFields, IsInsert)] = {

Logger.debug(s"[saveAtomic] entity: $subscription")

// Note that $setOnInsert operation will happen for inserts only, it is ignored for updates.
saveAtomic(
selector = selectorForSubscriptionFields(subscription),
updateOperations = Json.obj(
"$setOnInsert" -> Json.obj("fieldsId" -> subscription.fieldsId.toString),
"$set" -> Json.obj("fields" -> Json.toJson(subscription.fields))
)
)
}

override def fetch(clientId: ClientId, apiContext: ApiContext, apiVersion: ApiVersion): Future[Option[SubscriptionFields]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import scala.concurrent.Future
@Singleton
class FieldsDefinitionService @Inject() (repository: FieldsDefinitionRepository) {

def upsert(apiContext: ApiContext, apiVersion: ApiVersion, fieldDefinitions: Seq[FieldDefinition]): Future[(FieldsDefinitionResponse, Boolean)] = {
def upsert(apiContext: ApiContext, apiVersion: ApiVersion, fieldDefinitions: Seq[FieldDefinition]): Future[(FieldsDefinitionResponse, IsInsert)] = {
Logger.debug(s"[upsert fields definition] apiContext: $apiContext, apiVersion: $apiVersion, fieldDefinitions: $fieldDefinitions")
val fieldsDefinition = FieldsDefinition(apiContext.value, apiVersion.value, fieldDefinitions)
repository.save(fieldsDefinition).map {
case (fd: FieldsDefinition, inserted: Boolean) => (asResponse(fd), inserted)
case (fd: FieldsDefinition, inserted: IsInsert) => (asResponse(fd), inserted)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,9 @@ class UUIDCreator {
@Singleton
class SubscriptionFieldsService @Inject()(repository: SubscriptionFieldsRepository, uuidCreator: UUIDCreator) {

def upsert(clientId: ClientId, apiContext: ApiContext, apiVersion: ApiVersion, subscriptionFields: Fields): Future[(SubscriptionFieldsResponse, Boolean)] = {

def update(existingFieldsId: UUID): Future[SubscriptionFieldsResponse] =
save(SubscriptionFields(clientId.value, apiContext.value, apiVersion.value, existingFieldsId, subscriptionFields))

def create(): Future[SubscriptionFieldsResponse] =
save(SubscriptionFields(clientId.value, apiContext.value, apiVersion.value, uuidCreator.uuid(), subscriptionFields))

Logger.debug(s"[upsert subscription fields] clientId: $clientId, apiContext: $apiVersion, apiVersion: $apiVersion")
// TODO: we need to change the method `upsert` and make it atomic.
// At the moment we call `save` after `fetch`, this might lead to issues in case of concurrent upserts.
// This can be fixed by calling `collection.findAndModify()` (instead of `collection.update()`) in the `MongoCrudHelper.save()` implementation.
// https://stackoverflow.com/questions/10778493/whats-the-difference-between-findandmodify-and-update-in-mongodb
repository.fetch(clientId, apiContext, apiVersion) flatMap { o =>
o.fold(
create().map((_, true))
)(
existing => update(existing.fieldsId).map((_, false))
)
}

def upsert(clientId: ClientId, apiContext: ApiContext, apiVersion: ApiVersion, subscriptionFields: Fields): Future[(SubscriptionFieldsResponse, IsInsert)] = {
val fields = SubscriptionFields(clientId.value, apiContext.value, apiVersion.value, uuidCreator.uuid(), subscriptionFields)
repository.saveAtomic(fields).map(tuple => (asResponse(tuple._1), tuple._2))
}

def delete(clientId: ClientId, apiContext: ApiContext, apiVersion: ApiVersion): Future[Boolean] = {
Expand Down Expand Up @@ -93,11 +75,6 @@ class SubscriptionFieldsService @Inject()(repository: SubscriptionFieldsReposito
} yield fields.map(asResponse)) map (BulkSubscriptionFieldsResponse(_))
}

private def save(apiSubscriptionFields: SubscriptionFields): Future[SubscriptionFieldsResponse] = {
Logger.debug(s"[save subscription fields] subscriptionFields: $apiSubscriptionFields")
repository.save(apiSubscriptionFields).map(_ => asResponse(apiSubscriptionFields))
}

private def asResponse(apiSubscription: SubscriptionFields): SubscriptionFieldsResponse = {
SubscriptionFieldsResponse(
clientId = apiSubscription.clientId,
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ lazy val acceptanceTestSettings =

lazy val scoverageSettings: Seq[Setting[_]] = Seq(
coverageExcludedPackages := "<empty>;Reverse.*;model.*;.*config.*;.*(AuthService|BuildInfo|Routes).*",
coverageMinimum := 94,
coverageMinimum := 93,
coverageFailOnMinimum := true,
coverageHighlighting := true,
parallelExecution in Test := false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec

import play.api.libs.json._

def saveByFieldsId(subscription: SubscriptionFields): Future[(SubscriptionFields, Boolean)] = {
def saveByFieldsId(subscription: SubscriptionFields): Future[(SubscriptionFields, IsInsert)] = {
save(subscription, Json.obj("fieldsId" -> subscription.fieldsId))
}

Expand Down Expand Up @@ -77,27 +77,27 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
BSONDocument("clientId" -> s.clientId, "apiContext" -> s.apiContext, "apiVersion" -> s.apiVersion)
}

"save" should {
"saveAtomic" should {
val apiSubscriptionFields = createApiSubscriptionFields()

import reactivemongo.play.json._

"insert the record in the collection" in {
collectionSize shouldBe 0

await(repository.save(apiSubscriptionFields)) shouldBe ((apiSubscriptionFields, true))
await(repository.saveAtomic(apiSubscriptionFields)) shouldBe ((apiSubscriptionFields, true))
collectionSize shouldBe 1
await(repository.collection.find(selector(apiSubscriptionFields)).one[SubscriptionFields]) shouldBe Some(apiSubscriptionFields)
}

"update the record in the collection" in {
collectionSize shouldBe 0

await(repository.save(apiSubscriptionFields)) shouldBe ((apiSubscriptionFields, true))
await(repository.saveAtomic(apiSubscriptionFields)) shouldBe ((apiSubscriptionFields, true))
collectionSize shouldBe 1

val edited = apiSubscriptionFields.copy(fields = Map("field4" -> "value_4"))
await(repository.save(edited)) shouldBe ((edited, false))
await(repository.saveAtomic(edited)) shouldBe ((edited, false))
collectionSize shouldBe 1
await(repository.collection.find(selector(edited)).one[SubscriptionFields]) shouldBe Some(edited)
}
Expand All @@ -109,9 +109,9 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
val apiSubForApp1Context2 = createSubscriptionFieldsWithApiContext(rawContext = fakeRawContext2)
val apiSubForApp2Context1 = createSubscriptionFieldsWithApiContext(clientId = fakeRawClientId2)

await(repository.save(apiSubForApp1Context1))
await(repository.save(apiSubForApp1Context2))
await(repository.save(apiSubForApp2Context1))
await(repository.saveAtomic(apiSubForApp1Context1))
await(repository.saveAtomic(apiSubForApp1Context2))
await(repository.saveAtomic(apiSubForApp2Context1))
collectionSize shouldBe 3

await(repository.fetchByClientId(FakeClientId)) shouldBe List(apiSubForApp1Context1, apiSubForApp1Context2)
Expand All @@ -127,7 +127,7 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
"fetch using clientId, apiContext, apiVersion" should {
"retrieve the correct record" in {
val apiSubscription = createApiSubscriptionFields()
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.fetch(FakeClientId, FakeContext, FakeVersion)) shouldBe Some(apiSubscription)
Expand All @@ -136,7 +136,7 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
"return None when no subscription fields are found in the collection" in {
for (i <- 1 to 3) {
val apiSubscription = createApiSubscriptionFields(clientId = uniqueClientId)
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
}
collectionSize shouldBe 3

Expand All @@ -148,15 +148,15 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
"fetchByFieldsId" should {
"retrieve the correct record from the `fieldsId` " in {
val apiSubscription = createApiSubscriptionFields()
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.fetchByFieldsId(SubscriptionFieldsId(apiSubscription.fieldsId))) shouldBe Some(apiSubscription)
}

"return `None` when the `fieldsId` doesn't match any record in the collection" in {
for (i <- 1 to 3) {
await(repository.save(createApiSubscriptionFields(clientId = uniqueClientId)))
await(repository.saveAtomic(createApiSubscriptionFields(clientId = uniqueClientId)))
}
collectionSize shouldBe 3

Expand All @@ -169,9 +169,9 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
val subscriptionFields1 = createApiSubscriptionFields(clientId = uniqueClientId)
val subscriptionFields2 = createApiSubscriptionFields(clientId = uniqueClientId)
val subscriptionFields3 = createApiSubscriptionFields(clientId = uniqueClientId)
await(repository.save(subscriptionFields1))
await(repository.save(subscriptionFields2))
await(repository.save(subscriptionFields3))
await(repository.saveAtomic(subscriptionFields1))
await(repository.saveAtomic(subscriptionFields2))
await(repository.saveAtomic(subscriptionFields3))
collectionSize shouldBe 3

await(repository.fetchAll()) shouldBe List(subscriptionFields1, subscriptionFields2, subscriptionFields3)
Expand All @@ -186,7 +186,7 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
"remove the record with a specific subscription field" in {
val apiSubscription: SubscriptionFields = createApiSubscriptionFields()

await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.delete(ClientId(apiSubscription.clientId), ApiContext(apiSubscription.apiContext), ApiVersion(apiSubscription.apiVersion))) shouldBe true
Expand All @@ -195,7 +195,7 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec

"not alter the collection for unknown subscription fields" in {
for (i <- 1 to 3) {
await(repository.save(createApiSubscriptionFields(clientId = uniqueClientId)))
await(repository.saveAtomic(createApiSubscriptionFields(clientId = uniqueClientId)))
}
collectionSize shouldBe 3

Expand All @@ -208,26 +208,26 @@ class SubscriptionFieldsRepositorySpec extends UnitSpec
val apiSubscription = createApiSubscriptionFields("A_FIXED_CLIENTID")

"have a unique compound index based on `clientId`, `apiContext` and `apiVersion`" in {
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.save(apiSubscription.copy(fieldsId = UUID.randomUUID())))
await(repository.saveAtomic(apiSubscription.copy(fieldsId = UUID.randomUUID())))
collectionSize shouldBe 1
}

"have a unique index based on `fieldsId`" in {
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.saveByFieldsId(apiSubscription.copy(apiVersion = "2.2")))
collectionSize shouldBe 1
}

"have a non-unique index based on `clientId`" in {
await(repository.save(apiSubscription))
await(repository.saveAtomic(apiSubscription))
collectionSize shouldBe 1

await(repository.save(apiSubscription.copy(apiContext = fakeRawContext2, fieldsId = UUID.randomUUID())))
await(repository.saveAtomic(apiSubscription.copy(apiContext = fakeRawContext2, fieldsId = UUID.randomUUID())))
collectionSize shouldBe 2
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,28 +112,23 @@ class SubscriptionFieldsServiceSpec extends UnitSpec with SubscriptionFieldsTest

"upsert" should {
"return false when updating an existing api subscription fields" in {
(mockSubscriptionFieldsIdRepository save _) expects FakeApiSubscription returns ((FakeApiSubscription, false))
(mockSubscriptionFieldsIdRepository fetch(_: ClientId, _: ApiContext, _: ApiVersion))
.expects(FakeClientId, FakeContext, FakeVersion).returns(Some(FakeApiSubscription))
(mockSubscriptionFieldsIdRepository saveAtomic _) expects FakeApiSubscription returns ((FakeApiSubscription, false))

val result = await(service.upsert(FakeClientId, FakeContext, FakeVersion, subscriptionFields))

result shouldBe ((SubscriptionFieldsResponse(fakeRawClientId, fakeRawContext, fakeRawVersion, FakeFieldsId, subscriptionFields), false))
}

"return true when creating a new api subscription fields" in {
(mockSubscriptionFieldsIdRepository save _) expects FakeApiSubscription returns ((FakeApiSubscription, true))
(mockSubscriptionFieldsIdRepository fetch(_: ClientId, _: ApiContext, _: ApiVersion))
.expects(FakeClientId, FakeContext, FakeVersion).returns(None)
(mockSubscriptionFieldsIdRepository saveAtomic _) expects FakeApiSubscription returns ((FakeApiSubscription, true))

val result = await(service.upsert(FakeClientId, FakeContext, FakeVersion, subscriptionFields))

result shouldBe ((SubscriptionFieldsResponse(fakeRawClientId, fakeRawContext, fakeRawVersion, FakeFieldsId, subscriptionFields), true))
}

"propagate the error" in {
(mockSubscriptionFieldsIdRepository fetch(_: ClientId, _: ApiContext, _: ApiVersion))
.expects(*, *, *).returns(Future.failed(emulatedFailure))
(mockSubscriptionFieldsIdRepository saveAtomic _) expects FakeApiSubscription returns Future.failed(emulatedFailure)

val caught = intercept[EmulatedFailure] {
await(service.upsert(FakeClientId, FakeContext, FakeVersion, subscriptionFields))
Expand Down

0 comments on commit 9163bbd

Please sign in to comment.