Skip to content

Commit

Permalink
556: Moved _doc_count from transform._doc_count to root of document (o…
Browse files Browse the repository at this point in the history
…pensearch-project#558)

Signed-off-by: Stevan Buzejic <[email protected]>

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored Oct 7, 2022
1 parent fe8bf44 commit c908871
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ data class Transform(
return if (includeId) {
mutableMapOf(
TRANSFORM_DOC_ID_FIELD to this.id,
_DOC_COUNT to docCount,
TRANSFORM_DOC_COUNT_FIELD to docCount
)
} else {
mutableMapOf(
_DOC_COUNT to docCount,
TRANSFORM_DOC_COUNT_FIELD to docCount
)
}
Expand Down Expand Up @@ -286,6 +288,8 @@ data class Transform(
const val MAXIMUM_PAGE_SIZE_CONTINUOUS = 1_000
const val MINIMUM_JOB_INTERVAL = 1
const val TRANSFORM_DOC_ID_FIELD = "$TRANSFORM_TYPE._id"
const val _DOC_COUNT = "_doc_count"
// Keeping the field in order to be backward compatible
const val TRANSFORM_DOC_COUNT_FIELD = "$TRANSFORM_TYPE._doc_count"
const val CONTINUOUS_FIELD = "continuous"
const val USER_FIELD = "user"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.model.TransformMetadata
import org.opensearch.indexmanagement.waitFor
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
Expand Down Expand Up @@ -204,6 +205,82 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Didn't capture search time", metadata.stats.searchTimeInMillis > 0)
}

fun `test transform target index _doc_count against the source index _doc_count`() {
val sourceIdxTestName = "source_idx_test"
val targetIdxTestName = "target_idx_test"

val storeAndForwardTerm = "store_and_fwd_flag"
val fareAmount = "fare_amount"
val avgAmountPerFlag = "avg_amount_per_store_flag"

validateSourceIndex(sourceIdxTestName)

val transform = Transform(
id = "id_13",
schemaVersion = 1L,
enabled = true,
enabledAt = Instant.now(),
updatedAt = Instant.now(),
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
description = "test transform doc values must be the same",
metadataId = null,
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
roles = emptyList(),
pageSize = 1,
groups = listOf(
Terms(sourceField = storeAndForwardTerm, targetField = storeAndForwardTerm)
),
aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount))
).let { createTransform(it, it.id) }

updateTransformStartTime(transform)

waitFor {
assertTrue("Target transform index was not created", indexExists(transform.targetIndex))
}

waitFor {
val transformJob = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)

var req = """
{
"size": 0,
"aggs": {
"$avgAmountPerFlag": {
"terms": {
"field": "$storeAndForwardTerm", "order": { "_key": "asc" }
},
"aggs": {
"avg": { "avg": { "field": "$fareAmount" } } }
}
}
}
""".trimIndent()

var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)

var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(transformRes.restStatus() == RestStatus.OK)

val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[avgAmountPerFlag]!!["buckets"]!!
val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[avgAmountPerFlag]!!["buckets"]!!

assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size)
rawAggBuckets.forEachIndexed { idx, rawAggBucket ->
val transformAggBucket = transformAggBuckets[idx]
assertEquals(
"The doc_count had a different value raw[$rawAggBucket] transform[$transformAggBucket]",
rawAggBucket["doc_count"]!!, transformAggBucket["doc_count"]!!
)
}
}
}

fun `test transform with failure during indexing`() {
validateSourceIndex("transform-source-index")

Expand Down Expand Up @@ -497,7 +574,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 5000, totalDocs)
Expand Down Expand Up @@ -548,7 +625,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 10000, totalDocs)
Expand Down Expand Up @@ -626,7 +703,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 48, totalDocs)
Expand Down Expand Up @@ -686,7 +763,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 100, totalDocs)
Expand Down Expand Up @@ -843,7 +920,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 52, totalDocs)
Expand Down Expand Up @@ -917,7 +994,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val responseHits = response.asMap().getValue("hits") as Map<*, *>
val totalDocs = (responseHits["hits"] as ArrayList<*>).fold(0) { sum, bucket ->
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["transform._doc_count"] as Int
val docCount = ((bucket as Map<*, *>)["_source"] as Map<*, *>)["_doc_count"] as Int
sum + docCount
}
assertEquals("Not all documents included in the transform target index", 88, totalDocs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
emptyMap(),
transform.toHttpEntity()
)
val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count")
val expectedKeys = setOf("revenue", "passengerCount", "flag", "transform._doc_count", "_doc_count")
assertEquals("Preview transform failed", RestStatus.OK, response.restStatus())
val transformedDocs = response.asMap()["documents"] as List<Map<String, Any>>
assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys)
Expand Down

0 comments on commit c908871

Please sign in to comment.