Skip to content

Commit

Permalink
wrap low-level client access in mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbotsf committed Oct 2, 2024
1 parent 1f2b867 commit d0a628d
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public object MapperPkg {
public object Hl {
public val Base: String = "aws.sdk.kotlin.hll.dynamodbmapper"
public val Annotations: String = "$Base.annotations"
public val Internal: String = "$Base.internal"
public val Items: String = "$Base.items"
public val Model: String = "$Base.model"
public val Ops: String = "$Base.operations"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public object MapperTypes {
public val ManualPagination: TypeRef = TypeRef(MapperPkg.Hl.Annotations, "ManualPagination")
}

public object Internal {
public val withWrappedClient: TypeRef = TypeRef(MapperPkg.Hl.Internal, "withWrappedClient")
}

public object Items {
public fun itemSchema(typeVar: String): TypeRef =
TypeRef(MapperPkg.Hl.Items, "ItemSchema", genericArgs = listOf(TypeVar(typeVar)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ internal class OperationRenderer(
}
write("schema) },")

write("lowLevelInvoke = spec.mapper.client::#L,", operation.methodName)
withBlock("lowLevelInvoke = { lowLevelReq ->", "},") {
withBlock("spec.mapper.client.#T { client ->", "}", MapperTypes.Internal.withWrappedClient) {
write("client.#L(lowLevelReq)", operation.methodName)
}
}

write("deserialize = #L::convert,", operation.response.lowLevelName)
write("interceptors = spec.mapper.config.interceptors,")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ public interface DynamoDbMapper {
public fun DynamoDbMapper(
client: DynamoDbClient,
config: DynamoDbMapper.Config.Builder.() -> Unit = { },
): DynamoDbMapper = DynamoDbMapperImpl.wrapping(client, DynamoDbMapper.Config(config))
): DynamoDbMapper = DynamoDbMapperImpl(client, DynamoDbMapper.Config(config))
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@ internal data class DynamoDbMapperImpl(
override val client: DynamoDbClient,
override val config: DynamoDbMapper.Config,
) : DynamoDbMapper {
internal companion object {
/**
* Wraps a low-level [DynamoDbClient] to add additional features before instantiating a new
* [DynamoDbMapperImpl].
*/
fun wrapping(client: DynamoDbClient, config: DynamoDbMapper.Config): DynamoDbMapperImpl {
val wrappedClient = client.withConfig { interceptors += DdbMapperMetricInterceptor }
return DynamoDbMapperImpl(wrappedClient, config)
}
}
override fun <T, PK> getTable(name: String, schema: ItemSchema.PartitionKey<T, PK>) =
tableImpl(this, name, schema)

Expand All @@ -54,9 +44,12 @@ internal class MapperConfigBuilderImpl : DynamoDbMapper.Config.Builder {
/**
* An interceptor that emits the DynamoDB Mapper business metric
*/
private object DdbMapperMetricInterceptor : HttpInterceptor {
private object BusinessMetricInterceptor : HttpInterceptor {
override suspend fun modifyBeforeSerialization(context: RequestInterceptorContext<Any>): Any {
context.executionContext.emitBusinessMetric(AwsBusinessMetric.DDB_MAPPER)
return context.request
}
}

internal inline fun <T> DynamoDbClient.withWrappedClient(block: (DynamoDbClient) -> T): T =
withConfig { interceptors += BusinessMetricInterceptor }.use(block)
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class DynamoDbMapperTest : DdbLocalTest() {
fun testBusinessMetricEmission() = runTest {
val interceptor = MetricCapturingInterceptor()

val ddb = super.ddb.withConfig { interceptors += interceptor }
val ddb = lowLevelAccess { withConfig { interceptors += interceptor } }
interceptor.assertEmpty()

// No metric for low-level client
ddb.scan { tableName = TABLE_NAME }
lowLevelAccess { scan { tableName = TABLE_NAME } }
interceptor.assertMetric(AwsBusinessMetric.DDB_MAPPER, exists = false)
interceptor.reset()

Expand All @@ -67,12 +67,12 @@ class DynamoDbMapperTest : DdbLocalTest() {
interceptor.reset()

// Still no metric for low-level client (i.e., LL wasn't modified by HL)
ddb.scan { tableName = TABLE_NAME }
lowLevelAccess { scan { tableName = TABLE_NAME } }
interceptor.assertMetric(AwsBusinessMetric.DDB_MAPPER, exists = false)
interceptor.reset()

// Original client can be closed, mapper is unaffected
ddb.close()
lowLevelAccess { close() }
table.scanPaginated { }.collect()
interceptor.assertMetric(AwsBusinessMetric.DDB_MAPPER)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class PutItemTest : DdbLocalTest() {

table.putItem { item = Item(id = "foo", value = 42) }

val resp = ddb.getItem(TABLE_NAME, "id" to "foo")
val resp = lowLevelAccess { getItem(TABLE_NAME, "id" to "foo") }

val item = assertNotNull(resp.item)
assertEquals("foo", item["id"]?.asSOrNull())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ import aws.sdk.kotlin.hll.dynamodbmapper.DynamoDbMapper
import aws.sdk.kotlin.hll.dynamodbmapper.items.ItemSchema
import aws.sdk.kotlin.hll.dynamodbmapper.model.Item
import aws.sdk.kotlin.runtime.auth.credentials.StaticCredentialsProvider
import aws.sdk.kotlin.runtime.http.interceptors.AwsBusinessMetric
import aws.sdk.kotlin.services.dynamodb.DynamoDbClient
import aws.sdk.kotlin.services.dynamodb.deleteTable
import aws.sdk.kotlin.services.dynamodb.waiters.waitUntilTableNotExists
import aws.smithy.kotlin.runtime.client.ProtocolRequestInterceptorContext
import aws.smithy.kotlin.runtime.http.interceptors.HttpInterceptor
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.net.Host
import aws.smithy.kotlin.runtime.net.Scheme
import aws.smithy.kotlin.runtime.net.url.Url
import io.kotest.core.spec.style.AnnotationSpec
import kotlinx.coroutines.runBlocking
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

Expand All @@ -41,6 +46,9 @@ abstract class DdbLocalTest : AnnotationSpec() {
}
}

private val requests = mutableListOf<HttpRequest>()
private val requestInterceptor = RequestCapturingInterceptor(this@DdbLocalTest.requests)

private val ddbHolder = lazy {
DynamoDbClient {
endpointUrl = Url {
Expand All @@ -55,15 +63,20 @@ abstract class DdbLocalTest : AnnotationSpec() {
accessKeyId = "DUMMY"
secretAccessKey = "DUMMY"
}

interceptors += requestInterceptor
}
}

/**
* An instance of a low-level [DynamoDbClient] utilizing the DynamoDB Local instance which may be used for setting
* up or verifying various mapper tests. If this is the first time accessing the value, the client will be
* initialized.
*
* **Important**: This low-level client should only be accessed via [lowLevelAccess] to ensure that User-Agent
* header verification succeeds.
*/
val ddb by ddbHolder
private val ddb by ddbHolder

private val tempTables = mutableListOf<String>()

Expand Down Expand Up @@ -95,9 +108,11 @@ abstract class DdbLocalTest : AnnotationSpec() {
lsis: Map<String, ItemSchema<*>>,
items: List<Item>,
) {
ddb.createTable(name, schema, gsis, lsis)
tempTables += name
ddb.putItems(name, items)
lowLevelAccess {
createTable(name, schema, gsis, lsis)
tempTables += name
putItems(name, items)
}
}

/**
Expand All @@ -109,6 +124,43 @@ abstract class DdbLocalTest : AnnotationSpec() {
config: DynamoDbMapper.Config.Builder.() -> Unit = { },
) = DynamoDbMapper(ddb ?: this.ddb, config)

@BeforeEach
fun initializeTest() {
requestInterceptor.enabled = true
}

/**
* Executes requests on a low-level [DynamoDbClient] and _does not_ log any requests executed in [block]. (This
* skips verifying that low-level requests contain the [AwsBusinessMetric.DDB_MAPPER] metric.)
*/
protected suspend fun <T> lowLevelAccess(block: suspend DynamoDbClient.() -> T): T {
requestInterceptor.enabled = false
return block(ddb).also { requestInterceptor.enabled = true }
}

@AfterEach
fun postVerify() {
requests.forEach { req ->
val uaString = requireNotNull(req.headers["User-Agent"]) {
"Missing User-Agent header for request $req"
}

val components = uaString.split(" ")

val metricsComponent = requireNotNull(components.find { it.startsWith("m/") }) {
"""User-Agent header "$uaString" doesn't contain business metrics for request $req"""
}

val metrics = metricsComponent.removePrefix("m/").split(",")

assertContains(
metrics,
AwsBusinessMetric.DDB_MAPPER.identifier,
"""Mapper business metric not present in User-Agent header "$uaString" for request $req""",
)
}
}

@AfterAll
fun cleanUp() {
if (ddbHolder.isInitialized()) {
Expand All @@ -123,3 +175,13 @@ abstract class DdbLocalTest : AnnotationSpec() {
}
}
}

private class RequestCapturingInterceptor(val requests: MutableList<HttpRequest>) : HttpInterceptor {
var enabled = true

override fun readBeforeTransmit(context: ProtocolRequestInterceptorContext<Any, HttpRequest>) {
if (enabled) {
requests += context.protocolRequest
}
}
}

0 comments on commit d0a628d

Please sign in to comment.