Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyachakilam committed Nov 5, 2024
1 parent 0a67e03 commit 69ea344
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
Expand Down Expand Up @@ -382,7 +383,7 @@ public void testCreateTaskIOConfig()
tuningConfig
);

Assert.assertEquals(supervisor.createTaskIoConfig(
SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig(
1,
ImmutableMap.of(),
ImmutableMap.of(),
Expand All @@ -409,6 +410,8 @@ public void testCreateTaskIOConfig()
null, // latemessagerejectionstartdatetime
1
)
).getTaskDuration(), Duration.standardMinutes(30));
);

Assert.assertEquals(ioConfig.getTaskDuration(), Duration.standardMinutes(30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public InputFormat getInputFormat()
return inputFormat;
}

@Nullable
@JsonProperty
public Duration getTaskDuration()
{
return taskDuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -248,8 +247,8 @@ public enum Status

private final Map<PartitionIdType, Long> partitionsThroughput = new HashMap<>();

private volatile Optional<DateTime> minMessageTime;
private volatile Optional<DateTime> maxMessageTime;
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;

public SeekableStreamIndexTaskRunner(
final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task,
Expand All @@ -272,8 +271,8 @@ public SeekableStreamIndexTaskRunner(
this.ingestionState = IngestionState.NOT_STARTED;
this.lockGranularityToUse = lockGranularityToUse;

minMessageTime = ioConfig.getMinimumMessageTime();
maxMessageTime = ioConfig.getMaximumMessageTime();
minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN);
maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX);

if (ioConfig.getTaskDuration() != null) {
Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d")
Expand Down Expand Up @@ -2113,40 +2112,27 @@ protected abstract void possiblyResetDataSourceMetadata(

private void addTaskDurationToMinMaxTimes()
{
if (minMessageTime.isPresent()) {
minMessageTime = Optional.of(minMessageTime.get()
.plusMinutes(ioConfig.getTaskDuration()
.toStandardMinutes()
.getMinutes()));
}

if (maxMessageTime.isPresent()) {
maxMessageTime = Optional.of(maxMessageTime.get()
.plusMinutes(ioConfig.getTaskDuration()
.toStandardMinutes()
.getMinutes()));
}
minMessageTime = minMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes());
maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes());
}

public boolean withinMinMaxRecordTime(final InputRow row)
{
final boolean beforeMinimumMessageTime = minMessageTime.isPresent() &&
minMessageTime.get().isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = maxMessageTime.isPresent() &&
maxMessageTime.get().isBefore(row.getTimestamp());
final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp());
final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp());

if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMinimumMessageTime().get()
minMessageTime
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMaximumMessageTime().get()
maxMessageTime
);
}
}
Expand Down

0 comments on commit 69ea344

Please sign in to comment.