diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt index 34112c6e5..f6d3e8f50 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt @@ -19,12 +19,14 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client import org.opensearch.common.Rounding +import org.opensearch.common.time.DateFormatter +import org.opensearch.common.time.DateFormatters import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.index.query.MatchAllQueryBuilder import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.common.model.dimension.DateHistogram @@ -34,7 +36,7 @@ import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata import org.opensearch.indexmanagement.rollup.model.RollupStats -import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_EPOCH_MILLIS_FORMAT +import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.search.aggregations.bucket.composite.InternalComposite import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder @@ -181,7 +183,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont .sort(dateHistogram.sourceField, SortOrder.ASC) // TODO: figure out where nulls are sorted .trackTotalHits(false) .fetchSource(false) - .docValueField(dateHistogram.sourceField, DATE_FIELD_EPOCH_MILLIS_FORMAT) + .docValueField(dateHistogram.sourceField, DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT) val searchRequest = SearchRequest(rollup.sourceIndex) .source(searchSourceBuilder) .allowPartialSearchResults(false) @@ -194,10 +196,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont // Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis // If the doc value is null or empty it will be treated the same as empty doc hits - val firstHitTimestamp = response.hits.hits.first().field(dateHistogram.sourceField).getValue()?.toLong() + val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue() ?: return StartingTimeResult.NoDocumentsFound - - return StartingTimeResult.Success(getRoundedTime(firstHitTimestamp, dateHistogram)) + // Parse date and extract epochMillis + val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT) + val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli() + return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram)) } catch (e: RemoteTransportException) { val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index ef5c05868..f6d749fbb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -12,11 +12,11 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.xcontent.LoggingDeprecationHandler -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.core.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser.Token import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.BoostingQueryBuilder import org.opensearch.index.query.ConstantScoreQueryBuilder @@ -63,6 +63,7 @@ import org.opensearch.search.aggregations.metrics.SumAggregationBuilder import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time" const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis" @Suppress("ReturnCount") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt index d31ff9edb..46e62057d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt @@ -735,7 +735,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() { // TODO: Mockito 2 supposedly should be able to mock final classes but there were errors when trying to do so // Will need to check if there is a workaround or a better way to mock getting hits.hits since this current approach is verbose - val docField = DocumentField(dateHistogram.sourceField, listOf(getInstant(timestamp).toEpochMilli().toString())) + val docField = DocumentField(dateHistogram.sourceField, listOf(timestamp)) val searchHit = SearchHit(0) searchHit.setDocumentField(dateHistogram.sourceField, docField) val searchHits = SearchHits(arrayOf(searchHit), null, 0.0F) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 3c2f10067..168ea3a64 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -33,6 +33,7 @@ import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.Collections.emptyMap class RollupRunnerIT : RollupRestTestCase() { @@ -1253,6 +1254,71 @@ class RollupRunnerIT : RollupRestTestCase() { assertEquals("Backing index [$backingIndex2] has to have owner rollup job with id:[${startedRollup1.id}]", rollupMetadata.failureReason) } + fun `test rollup with date_nanos as date_histogram field`() { + val index = "date-nanos-index" + val rollupIndex = "date-nanos-index-rollup" + createIndex( + index, + Settings.EMPTY, + """"properties": { + "purchaseDate": { + "type": "date_nanos" + }, + "itemName": { + "type": "keyword" + }, + "itemPrice": { + "type": "float" + } + }""" + ) + + indexDoc(index, "1", """{"purchaseDate": 1683149130000.6497, "itemName": "shoes", "itemPrice": 100.5}""".trimIndent()) + indexDoc(index, "2", """{"purchaseDate": 1683494790000, "itemName": "shoes", "itemPrice": 30.0}""".trimIndent()) + indexDoc(index, "3", """{"purchaseDate": "2023-05-08T18:57:33.743656789Z", "itemName": "shoes", "itemPrice": 60.592}""".trimIndent()) + + refreshAllIndices() + + val job = Rollup( + id = "rollup_with_alias_992434131", + schemaVersion = 1L, + enabled = true, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.DAYS), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic change of page size", + sourceIndex = index, + targetIndex = rollupIndex, + metadataID = null, + roles = emptyList(), + pageSize = 1000, + delay = 0, + continuous = true, + dimensions = listOf( + DateHistogram(sourceField = "purchaseDate", fixedInterval = "5d"), + Terms("itemName", "itemName"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "itemPrice", + targetField = "itemPrice", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(job) + + waitFor { assertTrue("Target rollup index was not created", indexExists(rollupIndex)) } + + waitFor { + val rollupJob = getRollup(rollupId = job.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not started", RollupMetadata.Status.STARTED, rollupMetadata.status) + } + } + // TODO: Test scenarios: // - Source index deleted after first execution // * If this is with a source index pattern and the underlying indices are recreated but with different data