diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index dd9acfc2d..fc382c630 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -170,10 +170,12 @@ data class Transform( return if (includeId) { mutableMapOf( TRANSFORM_DOC_ID_FIELD to this.id, + _DOC_COUNT to docCount, TRANSFORM_DOC_COUNT_FIELD to docCount ) } else { mutableMapOf( + _DOC_COUNT to docCount, TRANSFORM_DOC_COUNT_FIELD to docCount ) } @@ -286,6 +288,8 @@ data class Transform( const val MAXIMUM_PAGE_SIZE_CONTINUOUS = 1_000 const val MINIMUM_JOB_INTERVAL = 1 const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id" + const val _DOC_COUNT = "_doc_count" + // Keeping the field in order to be backward compatible const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count" const val CONTINUOUS_FIELD = "continuous" const val USER_FIELD = "user" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 2a08d75e1..264e86ffe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptType @@ -204,6 +205,82 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0) } + fun `test transform target index _doc_count against the source index _doc_count`() { + val sourceIdxTestName = "source_idx_test" + val targetIdxTestName = "target_idx_test" + + val storeAndForwardTerm = "store_and_fwd_flag" + val fareAmount = "fare_amount" + val avgAmountPerFlag = "avg_amount_per_store_flag" + + validateSourceIndex(sourceIdxTestName) + + val transform = Transform( + id = "id_13", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = storeAndForwardTerm, targetField = storeAndForwardTerm) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + + var req = """ + { + "size": 0, + "aggs": { + "$avgAmountPerFlag": { + "terms": { + "field": "$storeAndForwardTerm", "order": { "_key": "asc" } + }, + "aggs": { + "avg": { "avg": { "field": "$fareAmount" } } } + } + } + } + """.trimIndent() + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[avgAmountPerFlag]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[avgAmountPerFlag]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + rawAggBuckets.forEachIndexed { idx, rawAggBucket -> + val transformAggBucket = transformAggBuckets[idx] + assertEquals( + "The doc_count had a different value raw[$rawAggBucket] transform[$transformAggBucket]", + rawAggBucket["doc_count"]!!, transformAggBucket["doc_count"]!! + ) + } + } + } + fun `test transform with failure during indexing`() { validateSourceIndex("transform-source-index") @@ -497,7 +574,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 5000, totalDocs) @@ -548,7 +625,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 10000, totalDocs) @@ -626,7 +703,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 48, totalDocs) @@ -686,7 +763,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 100, totalDocs) @@ -843,7 +920,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 52, totalDocs) @@ -917,7 +994,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) val responseHits = response.asMap().getValue("hits") as Map<*, *> val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket -> - val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int + val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int sum + docCount } assertEquals("Not all documents included in the transform target index", 88, totalDocs) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt index dc2447711..30fa45c1f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt @@ -62,7 +62,7 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { emptyMap(), transform.toHttpEntity() ) - val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count") + val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count", "_doc_count") assertEquals("Preview transform failed", RestStatus.OK, response.restStatus()) val transformedDocs = response.asMap()["documents"] as List> assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys)