Skip to content

Commit

Permalink
Fix time series data stream timestamp parsing. (elastic#83196)
Browse files Browse the repository at this point in the history
The timestamp parsing is used to select the backing index a document
should get ingested into.

The `Instant.of(...)` usage isn't able to handle timestamp provided
without timezone or time. This commit changes that by replacing that
usage with `DateFormatters.from(...)`.

(non issue, because this a bug in unreleased code)
  • Loading branch information
martijnvg authored Jan 31, 2022
1 parent 6d14895 commit 0146808
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -451,6 +452,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) {
}

Instant timestamp;
final var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
XContent xContent = request.getContentType().xContent();
try (XContentParser parser = xContent.createParser(TS_EXTRACT_CONFIG, request.source().streamInput())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Expand All @@ -460,7 +462,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) {
// TODO: deal with nanos too here.
// (the index hasn't been resolved yet, keep track of timestamp field metadata at data stream level,
// so we can use it here)
timestamp = Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
timestamp = DateFormatters.from(formatter.parse(parser.text()), formatter.locale()).toInstant();
break;
case VALUE_NUMBER:
timestamp = Instant.ofEpochMilli(parser.longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -284,7 +286,7 @@ public void testRejectsEmptyStringPipeline() {
}

public void testGetConcreteWriteIndex() {
Instant currentTime = Instant.now();
Instant currentTime = ZonedDateTime.of(2022, 12, 12, 6, 0, 0, 0, ZoneOffset.UTC).toInstant();
Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
Instant end1 = currentTime.minus(2, ChronoUnit.HOURS);
Instant start2 = currentTime.minus(2, ChronoUnit.HOURS);
Expand Down Expand Up @@ -360,6 +362,17 @@ public void testGetConcreteWriteIndex() {
var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
}
{
IndexRequest request = new IndexRequest(tsdbDataStream);
request.opType(DocWriteRequest.OpType.CREATE);
request.source(
source.replace("$time", "\"" + DateFormatter.forPattern(FormatNames.STRICT_DATE.getName()).format(start1) + "\""),
XContentType.JSON
);

var result = request.getConcreteWriteIndex(metadata.getIndicesLookup().get(tsdbDataStream), metadata);
assertThat(result, equalTo(metadata.dataStreams().get(tsdbDataStream).getIndices().get(0)));
}
{
// provided timestamp resolves to the latest backing index
IndexRequest request = new IndexRequest(tsdbDataStream);
Expand Down

0 comments on commit 0146808

Please sign in to comment.