Skip to content

Commit

Permalink
Date nanos rollup fix (opensearch-project#772)
Browse files Browse the repository at this point in the history
* initial commit

Signed-off-by: Petar Dzepina <[email protected]>

* initial commit

Signed-off-by: Petar Dzepina <[email protected]>

* reverted pkgbuild.gradle

Signed-off-by: Petar Dzepina <[email protected]>

* ec

Signed-off-by: Petar Dzepina <[email protected]>

* rework

Signed-off-by: Petar Dzepina <[email protected]>

* ec

Signed-off-by: Petar Dzepina <[email protected]>

* ec

Signed-off-by: Petar Dzepina <[email protected]>

* ec

Signed-off-by: Petar Dzepina <[email protected]>

* fixed tests

Signed-off-by: Petar Dzepina <[email protected]>

* changed rollup id in test

Signed-off-by: Petar Dzepina <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored May 16, 2023
1 parent 4c60abd commit 7ed6299
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.Client
import org.opensearch.common.Rounding
import org.opensearch.common.time.DateFormatter
import org.opensearch.common.time.DateFormatters
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.index.query.MatchAllQueryBuilder
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
Expand All @@ -34,7 +36,7 @@ import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_EPOCH_MILLIS_FORMAT
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
Expand Down Expand Up @@ -181,7 +183,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
.sort(dateHistogram.sourceField, SortOrder.ASC) // TODO: figure out where nulls are sorted
.trackTotalHits(false)
.fetchSource(false)
.docValueField(dateHistogram.sourceField, DATE_FIELD_EPOCH_MILLIS_FORMAT)
.docValueField(dateHistogram.sourceField, DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
val searchRequest = SearchRequest(rollup.sourceIndex)
.source(searchSourceBuilder)
.allowPartialSearchResults(false)
Expand All @@ -194,10 +196,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont

// Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis
// If the doc value is null or empty it will be treated the same as empty doc hits
val firstHitTimestamp = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()?.toLong()
val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
?: return StartingTimeResult.NoDocumentsFound

return StartingTimeResult.Success(getRoundedTime(firstHitTimestamp, dateHistogram))
// Parse date and extract epochMillis
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.BoostingQueryBuilder
import org.opensearch.index.query.ConstantScoreQueryBuilder
Expand Down Expand Up @@ -63,6 +63,7 @@ import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder

const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"

@Suppress("ReturnCount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() {

// TODO: Mockito 2 supposedly should be able to mock final classes but there were errors when trying to do so
// Will need to check if there is a workaround or a better way to mock getting hits.hits since this current approach is verbose
val docField = DocumentField(dateHistogram.sourceField, listOf(getInstant(timestamp).toEpochMilli().toString()))
val docField = DocumentField(dateHistogram.sourceField, listOf(timestamp))
val searchHit = SearchHit(0)
searchHit.setDocumentField(dateHistogram.sourceField, docField)
val searchHits = SearchHits(arrayOf(searchHit), null, 0.0F)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections.emptyMap

class RollupRunnerIT : RollupRestTestCase() {

Expand Down Expand Up @@ -1253,6 +1254,71 @@ class RollupRunnerIT : RollupRestTestCase() {
assertEquals("Backing index [$backingIndex2] has to have owner rollup job with id:[${startedRollup1.id}]", rollupMetadata.failureReason)
}

fun `test rollup with date_nanos as date_histogram field`() {
val index = "date-nanos-index"
val rollupIndex = "date-nanos-index-rollup"
createIndex(
index,
Settings.EMPTY,
""""properties": {
"purchaseDate": {
"type": "date_nanos"
},
"itemName": {
"type": "keyword"
},
"itemPrice": {
"type": "float"
}
}"""
)

indexDoc(index, "1", """{"purchaseDate": 1683149130000.6497, "itemName": "shoes", "itemPrice": 100.5}""".trimIndent())
indexDoc(index, "2", """{"purchaseDate": 1683494790000, "itemName": "shoes", "itemPrice": 30.0}""".trimIndent())
indexDoc(index, "3", """{"purchaseDate": "2023-05-08T18:57:33.743656789Z", "itemName": "shoes", "itemPrice": 60.592}""".trimIndent())

refreshAllIndices()

val job = Rollup(
id = "rollup_with_alias_992434131",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.DAYS),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = index,
targetIndex = rollupIndex,
metadataID = null,
roles = emptyList(),
pageSize = 1000,
delay = 0,
continuous = true,
dimensions = listOf(
DateHistogram(sourceField = "purchaseDate", fixedInterval = "5d"),
Terms("itemName", "itemName"),
),
metrics = listOf(
RollupMetrics(
sourceField = "itemPrice",
targetField = "itemPrice",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
)
)
).let { createRollup(it, it.id) }

updateRollupStartTime(job)

waitFor { assertTrue("Target rollup index was not created", indexExists(rollupIndex)) }

waitFor {
val rollupJob = getRollup(rollupId = job.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not started", RollupMetadata.Status.STARTED, rollupMetadata.status)
}
}

// TODO: Test scenarios:
// - Source index deleted after first execution
// * If this is with a source index pattern and the underlying indices are recreated but with different data
Expand Down

0 comments on commit 7ed6299

Please sign in to comment.