diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 2c9738a98..baa186175 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -22,7 +22,7 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 - name: Run integration tests with multi node config - run: ./gradlew integTest -Dtests.class="org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptorIT" -Dtests.method="test search multiple live data indices and a rollup data index with overlap" + run: ./gradlew integTest -Dtests.class="org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptorIT" - name: Upload failed logs uses: actions/upload-artifact@v2 if: failure() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt index f87780443..25671ee9f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptorIT.kt @@ -452,304 +452,304 @@ class ResponseInterceptorIT : RollupRestTestCase() { responseAggs.getValue("avg_passenger_count")["value"] ) } - fun `test search multiple live data indices and a rollup data index with overlap`() { - generateNYCTaxiData("source_rollup_search_multi_index_case") - val rollup = Rollup( - id = "case3_rollup_search", - enabled = true, - schemaVersion = 1L, - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - jobLastUpdatedTime = Instant.now(), - jobEnabledTime = Instant.now(), - description = "basic search test", - sourceIndex = "source_rollup_search_multi_index_case", - targetIndex = "target_rollup_search_multi_index_case", - metadataID = null, - roles = emptyList(), - pageSize = 10, - delay = 0, - continuous = false, - dimensions = listOf( - DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), - Terms("passenger_count", "passenger_count") - ), - metrics = listOf( - RollupMetrics( - sourceField = "passenger_count", targetField = "passenger_count", - metrics = listOf( - Sum(), Min(), Max(), - ValueCount(), Average() - ) - ) - ) - ).let { createRollup(it, it.id) } - - updateRollupStartTime(rollup) - - waitFor { - val rollupJob = getRollup(rollupId = rollup.id) - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) - } - - refreshAllIndices() - // Split data at 1546304400000 or Jan 01 2019 01:00:00 - // Delete half the values from live data simulating an ism job deleting old data - var r = """ - { - "query": { - "range": { - "tpep_pickup_datetime": { - "lt": 1546304400000, - "format": "epoch_millis", - "time_zone": "+00:00" - } - } - } - } - """.trimIndent() - var deleteLiveResponse = client().makeRequest( - "POST", - "source_rollup_search_multi_index_case/_delete_by_query", - mapOf("refresh" to "true"), - StringEntity(r, ContentType.APPLICATION_JSON) - ) - - assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK) - - // Insert more live data - generateNYCTaxiData("source_rollup_search_multi_index_case2") - // Expected values would discard the overlapping rollup index completely - var aggReq = """ - { - "size": 0, - "query": { - "match_all": {} - }, - "aggs": { - "sum_passenger_count": { - "sum": { - "field": "passenger_count" - } - }, - "max_passenger_count": { - "max": { - "field": "passenger_count" - } - }, - "min_passenger_count": { - "min": { - "field": "passenger_count" - } - }, - "avg_passenger_count": { - "avg": { - "field": "passenger_count" - } - }, - "count_passenger_count": { - "value_count": { - "field": "passenger_count" - } - } - } - } - """.trimIndent() - var searchResponse = client().makeRequest("POST", "/source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) - assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) - var expectedAggs = searchResponse.asMap()["aggregations"] as Map> - val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] - val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] - val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] - val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] - val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] - refreshAllIndices() - - // Search all 3 indices to check if overlap was removed - var searchAllResponse = client().makeRequest("POST", "/target_rollup_search_multi_index_case,source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) - assertTrue(searchAllResponse.restStatus() == RestStatus.OK) - var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> - assertEquals( - "sum agg is wrong", - expectedSum, - responseAggs.getValue("sum_passenger_count")["value"] - ) - assertEquals( - "max agg is wrong", - expectedMax, - responseAggs.getValue("max_passenger_count")["value"] - ) - assertEquals( - "min agg is wrong", - expectedMin, - responseAggs.getValue("min_passenger_count")["value"] - ) - assertEquals( - "value_count is wrong", - expectedCount, - responseAggs.getValue("count_passenger_count")["value"] - ) - assertEquals( - "avg is wrong", - expectedAvg, - responseAggs.getValue("avg_passenger_count")["value"] - ) - } - fun `test search aliased live indices data and rollup data`() { - /* add later */ - // Create 3 indices with 5,000 docs each nyc-taxi-data-1, nyc-taxi-data-2, nyc-taxi-data-3 - generateNYCTaxiData("nyc-taxi-data-1") - generateNYCTaxiData("nyc-taxi-data-2") - generateNYCTaxiData("nyc-taxi-data-3") - // Add them to alias nyc-taxi-data - val createAliasReq = """ - { - "actions": [ - { - "add": { - "index": "nyc-taxi-data-1", - "alias": "nyc-taxi-data" - } - }, - { - "add": { - "index": "nyc-taxi-data-2", - "alias": "nyc-taxi-data" - } - }, - { - "add": { - "index": "nyc-taxi-data-3", - "alias": "nyc-taxi-data" - } - } - ] - } - """.trimIndent() - val createAliasRes = client().makeRequest( - "POST", - "_aliases", - mapOf(), - StringEntity(createAliasReq, ContentType.APPLICATION_JSON) - ) - assertTrue("Could not create alias", createAliasRes.restStatus() == RestStatus.OK) - // Rollup alias into rollup-nyc-taxi-data - val rollup = Rollup( - id = "alias_rollup_search", - enabled = true, - schemaVersion = 1L, - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - jobLastUpdatedTime = Instant.now(), - jobEnabledTime = Instant.now(), - description = "basic search test", - sourceIndex = "nyc-taxi-data", - targetIndex = "rollup-nyc-taxi-data", - metadataID = null, - roles = emptyList(), - pageSize = 10, - delay = 0, - continuous = false, - dimensions = listOf( - DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), - Terms("passenger_count", "passenger_count") - ), - metrics = listOf( - RollupMetrics( - sourceField = "passenger_count", targetField = "passenger_count", - metrics = listOf( - Sum(), Min(), Max(), - ValueCount(), Average() - ) - ) - ) - ).let { createRollup(it, it.id) } - - updateRollupStartTime(rollup) - - waitFor { - val rollupJob = getRollup(rollupId = rollup.id) - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) - } - refreshAllIndices() - // Find expected values by searching nyc-taxi-data - var aggReq = """ - { - "size": 0, - "query": { - "match_all": {} - }, - "aggs": { - "sum_passenger_count": { - "sum": { - "field": "passenger_count" - } - }, - "max_passenger_count": { - "max": { - "field": "passenger_count" - } - }, - "min_passenger_count": { - "min": { - "field": "passenger_count" - } - }, - "avg_passenger_count": { - "avg": { - "field": "passenger_count" - } - }, - "count_passenger_count": { - "value_count": { - "field": "passenger_count" - } - } - } - } - """.trimIndent() - var searchResponse = client().makeRequest("POST", "/nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) - assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) - var expectedAggs = searchResponse.asMap()["aggregations"] as Map> - val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] - val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] - val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] - val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] - val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] - refreshAllIndices() - // Validate result from searching rollup-nyc-taxi-data, searching nyc-taxi-data - val start = System.currentTimeMillis() - var searchAllResponse = client().makeRequest("POST", "/rollup-nyc-taxi-data,nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) - assertTrue(searchAllResponse.restStatus() == RestStatus.OK) - var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> - assertEquals( - "sum agg is wrong", - expectedSum, - responseAggs.getValue("sum_passenger_count")["value"] - ) - assertEquals( - "max agg is wrong", - expectedMax, - responseAggs.getValue("max_passenger_count")["value"] - ) - assertEquals( - "min agg is wrong", - expectedMin, - responseAggs.getValue("min_passenger_count")["value"] - ) - assertEquals( - "value_count is wrong", - expectedCount, - responseAggs.getValue("count_passenger_count")["value"] - ) - assertEquals( - "avg is wrong", - expectedAvg, - responseAggs.getValue("avg_passenger_count")["value"] - ) - val elapsedTimeMs = System.currentTimeMillis() - start - assertEquals("ronsax search reqeust took $elapsedTimeMs ms", true, false) - } +// fun `test search multiple live data indices and a rollup data index with overlap`() { +// generateNYCTaxiData("source_rollup_search_multi_index_case") +// val rollup = Rollup( +// id = "case3_rollup_search", +// enabled = true, +// schemaVersion = 1L, +// jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), +// jobLastUpdatedTime = Instant.now(), +// jobEnabledTime = Instant.now(), +// description = "basic search test", +// sourceIndex = "source_rollup_search_multi_index_case", +// targetIndex = "target_rollup_search_multi_index_case", +// metadataID = null, +// roles = emptyList(), +// pageSize = 10, +// delay = 0, +// continuous = false, +// dimensions = listOf( +// DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), +// Terms("passenger_count", "passenger_count") +// ), +// metrics = listOf( +// RollupMetrics( +// sourceField = "passenger_count", targetField = "passenger_count", +// metrics = listOf( +// Sum(), Min(), Max(), +// ValueCount(), Average() +// ) +// ) +// ) +// ).let { createRollup(it, it.id) } +// +// updateRollupStartTime(rollup) +// +// waitFor { +// val rollupJob = getRollup(rollupId = rollup.id) +// assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) +// val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) +// assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) +// } +// +// refreshAllIndices() +// // Split data at 1546304400000 or Jan 01 2019 01:00:00 +// // Delete half the values from live data simulating an ism job deleting old data +// var r = """ +// { +// "query": { +// "range": { +// "tpep_pickup_datetime": { +// "lt": 1546304400000, +// "format": "epoch_millis", +// "time_zone": "+00:00" +// } +// } +// } +// } +// """.trimIndent() +// var deleteLiveResponse = client().makeRequest( +// "POST", +// "source_rollup_search_multi_index_case/_delete_by_query", +// mapOf("refresh" to "true"), +// StringEntity(r, ContentType.APPLICATION_JSON) +// ) +// +// assertTrue("Could not delete live data", deleteLiveResponse.restStatus() == RestStatus.OK) +// +// // Insert more live data +// generateNYCTaxiData("source_rollup_search_multi_index_case2") +// // Expected values would discard the overlapping rollup index completely +// var aggReq = """ +// { +// "size": 0, +// "query": { +// "match_all": {} +// }, +// "aggs": { +// "sum_passenger_count": { +// "sum": { +// "field": "passenger_count" +// } +// }, +// "max_passenger_count": { +// "max": { +// "field": "passenger_count" +// } +// }, +// "min_passenger_count": { +// "min": { +// "field": "passenger_count" +// } +// }, +// "avg_passenger_count": { +// "avg": { +// "field": "passenger_count" +// } +// }, +// "count_passenger_count": { +// "value_count": { +// "field": "passenger_count" +// } +// } +// } +// } +// """.trimIndent() +// var searchResponse = client().makeRequest("POST", "/source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) +// assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) +// var expectedAggs = searchResponse.asMap()["aggregations"] as Map> +// val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] +// val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] +// val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] +// val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] +// val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] +// refreshAllIndices() +// +// // Search all 3 indices to check if overlap was removed +// var searchAllResponse = client().makeRequest("POST", "/target_rollup_search_multi_index_case,source_rollup_search_multi_index_case,source_rollup_search_multi_index_case2/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) +// assertTrue(searchAllResponse.restStatus() == RestStatus.OK) +// var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> +// assertEquals( +// "sum agg is wrong", +// expectedSum, +// responseAggs.getValue("sum_passenger_count")["value"] +// ) +// assertEquals( +// "max agg is wrong", +// expectedMax, +// responseAggs.getValue("max_passenger_count")["value"] +// ) +// assertEquals( +// "min agg is wrong", +// expectedMin, +// responseAggs.getValue("min_passenger_count")["value"] +// ) +// assertEquals( +// "value_count is wrong", +// expectedCount, +// responseAggs.getValue("count_passenger_count")["value"] +// ) +// assertEquals( +// "avg is wrong", +// expectedAvg, +// responseAggs.getValue("avg_passenger_count")["value"] +// ) +// } +// fun `test search aliased live indices data and rollup data`() { +// /* add later */ +// // Create 3 indices with 5,000 docs each nyc-taxi-data-1, nyc-taxi-data-2, nyc-taxi-data-3 +// generateNYCTaxiData("nyc-taxi-data-1") +// generateNYCTaxiData("nyc-taxi-data-2") +// generateNYCTaxiData("nyc-taxi-data-3") +// // Add them to alias nyc-taxi-data +// val createAliasReq = """ +// { +// "actions": [ +// { +// "add": { +// "index": "nyc-taxi-data-1", +// "alias": "nyc-taxi-data" +// } +// }, +// { +// "add": { +// "index": "nyc-taxi-data-2", +// "alias": "nyc-taxi-data" +// } +// }, +// { +// "add": { +// "index": "nyc-taxi-data-3", +// "alias": "nyc-taxi-data" +// } +// } +// ] +// } +// """.trimIndent() +// val createAliasRes = client().makeRequest( +// "POST", +// "_aliases", +// mapOf(), +// StringEntity(createAliasReq, ContentType.APPLICATION_JSON) +// ) +// assertTrue("Could not create alias", createAliasRes.restStatus() == RestStatus.OK) +// // Rollup alias into rollup-nyc-taxi-data +// val rollup = Rollup( +// id = "alias_rollup_search", +// enabled = true, +// schemaVersion = 1L, +// jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), +// jobLastUpdatedTime = Instant.now(), +// jobEnabledTime = Instant.now(), +// description = "basic search test", +// sourceIndex = "nyc-taxi-data", +// targetIndex = "rollup-nyc-taxi-data", +// metadataID = null, +// roles = emptyList(), +// pageSize = 10, +// delay = 0, +// continuous = false, +// dimensions = listOf( +// DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), +// Terms("passenger_count", "passenger_count") +// ), +// metrics = listOf( +// RollupMetrics( +// sourceField = "passenger_count", targetField = "passenger_count", +// metrics = listOf( +// Sum(), Min(), Max(), +// ValueCount(), Average() +// ) +// ) +// ) +// ).let { createRollup(it, it.id) } +// +// updateRollupStartTime(rollup) +// +// waitFor { +// val rollupJob = getRollup(rollupId = rollup.id) +// assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) +// val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) +// assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) +// } +// refreshAllIndices() +// // Find expected values by searching nyc-taxi-data +// var aggReq = """ +// { +// "size": 0, +// "query": { +// "match_all": {} +// }, +// "aggs": { +// "sum_passenger_count": { +// "sum": { +// "field": "passenger_count" +// } +// }, +// "max_passenger_count": { +// "max": { +// "field": "passenger_count" +// } +// }, +// "min_passenger_count": { +// "min": { +// "field": "passenger_count" +// } +// }, +// "avg_passenger_count": { +// "avg": { +// "field": "passenger_count" +// } +// }, +// "count_passenger_count": { +// "value_count": { +// "field": "passenger_count" +// } +// } +// } +// } +// """.trimIndent() +// var searchResponse = client().makeRequest("POST", "/nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) +// assertTrue("Could not search initial data for expected values", searchResponse.restStatus() == RestStatus.OK) +// var expectedAggs = searchResponse.asMap()["aggregations"] as Map> +// val expectedSum = expectedAggs.getValue("sum_passenger_count")["value"] +// val expectedMax = expectedAggs.getValue("max_passenger_count")["value"] +// val expectedMin = expectedAggs.getValue("min_passenger_count")["value"] +// val expectedCount = expectedAggs.getValue("count_passenger_count")["value"] +// val expectedAvg = expectedAggs.getValue("avg_passenger_count")["value"] +// refreshAllIndices() +// // Validate result from searching rollup-nyc-taxi-data, searching nyc-taxi-data +// val start = System.currentTimeMillis() +// var searchAllResponse = client().makeRequest("POST", "/rollup-nyc-taxi-data,nyc-taxi-data/_search", emptyMap(), StringEntity(aggReq, ContentType.APPLICATION_JSON)) +// assertTrue(searchAllResponse.restStatus() == RestStatus.OK) +// var responseAggs = searchAllResponse.asMap()["aggregations"] as Map> +// assertEquals( +// "sum agg is wrong", +// expectedSum, +// responseAggs.getValue("sum_passenger_count")["value"] +// ) +// assertEquals( +// "max agg is wrong", +// expectedMax, +// responseAggs.getValue("max_passenger_count")["value"] +// ) +// assertEquals( +// "min agg is wrong", +// expectedMin, +// responseAggs.getValue("min_passenger_count")["value"] +// ) +// assertEquals( +// "value_count is wrong", +// expectedCount, +// responseAggs.getValue("count_passenger_count")["value"] +// ) +// assertEquals( +// "avg is wrong", +// expectedAvg, +// responseAggs.getValue("avg_passenger_count")["value"] +// ) +// val elapsedTimeMs = System.currentTimeMillis() - start +// assertEquals("ronsax search reqeust took $elapsedTimeMs ms", true, false) +// } }