diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 381cab2289b3..817c46fa7b1f 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory; import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -382,7 +383,7 @@ public void testCreateTaskIOConfig() tuningConfig ); - Assert.assertEquals(supervisor.createTaskIoConfig( + SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig( 1, ImmutableMap.of(), ImmutableMap.of(), @@ -409,6 +410,8 @@ public void testCreateTaskIOConfig() null, // latemessagerejectionstartdatetime 1 ) - ).getTaskDuration(), Duration.standardMinutes(30)); + ); + + Assert.assertEquals(ioConfig.getTaskDuration(), Duration.standardMinutes(30)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 2318b0c81e81..d3da738ef7b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -139,6 +139,8 @@ public InputFormat getInputFormat() return inputFormat; } + @Nullable + @JsonProperty public Duration getTaskDuration() { return taskDuration; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 744afcfd5038..fbe52d87b83f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -248,8 +247,8 @@ public enum Status private final Map partitionsThroughput = new HashMap<>(); - private volatile Optional minMessageTime; - private volatile Optional maxMessageTime; + private volatile DateTime minMessageTime; + private volatile DateTime maxMessageTime; public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @@ -272,8 +271,8 @@ public SeekableStreamIndexTaskRunner( this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; - minMessageTime = ioConfig.getMinimumMessageTime(); - maxMessageTime = ioConfig.getMaximumMessageTime(); + minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); + maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); if (ioConfig.getTaskDuration() != null) { Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") @@ -2113,40 +2112,27 @@ protected abstract void possiblyResetDataSourceMetadata( private void addTaskDurationToMinMaxTimes() { - if (minMessageTime.isPresent()) { - minMessageTime = Optional.of(minMessageTime.get() - .plusMinutes(ioConfig.getTaskDuration() - .toStandardMinutes() - .getMinutes())); - } - - if (maxMessageTime.isPresent()) { - maxMessageTime = Optional.of(maxMessageTime.get() - .plusMinutes(ioConfig.getTaskDuration() - .toStandardMinutes() - .getMinutes())); - } + minMessageTime = minMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); } public boolean withinMinMaxRecordTime(final InputRow row) { - final boolean beforeMinimumMessageTime = minMessageTime.isPresent() && - minMessageTime.get().isAfter(row.getTimestamp()); - final boolean afterMaximumMessageTime = maxMessageTime.isPresent() && - maxMessageTime.get().isBefore(row.getTimestamp()); + final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp()); if (log.isDebugEnabled()) { if (beforeMinimumMessageTime) { log.debug( "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", row.getTimestamp(), - ioConfig.getMinimumMessageTime().get() + minMessageTime ); } else if (afterMaximumMessageTime) { log.debug( "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", row.getTimestamp(), - ioConfig.getMaximumMessageTime().get() + maxMessageTime ); } }