-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
supervisor: make rejection periods work with stopTasksCount #17442
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation of refreshing the min / max times every 10 min is a change in behavior, so I think the person submitting the supervisor spec needs to opt in to this new behavior which doesn't appear possible in the current approach.
An alternative approach to consider is to update the IOConfig to include a "refresh interval" field that defaults to infinity that would refresh the min / max times in the IOConfig.
The tasks could then update their min / max times every duration just by adding the refresh interval so we don't need to worry about clock skew across the peons.
Since this patch is meant to just deal with stopTaskCount - have you considered refreshing the min / max times every task duration to mimic the existing behavior if stopTaskCount was not set (aka every task duration the min/max times across all peons are updated)
@JsonProperty("lateMessageRejectionPeriod") Duration lateMessageRejectionPeriod, | ||
@JsonProperty("earlyMessageRejectionPeriod") Duration earlyMessageRejectionPeriod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new fields need to be in addition to the old fields to ensure a newer version of the overlord works with older versions of the indexing tasks. But more importantly, in the case of a rollback - an older overlord needs to work with a newer version of indexing services
e8bef0c
to
1377975
Compare
@suneet-s Thanks for the review, I've changed the patch to just update the min/max times once every task duration. So this also avoids all those version problems. |
...dexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
Fixed
Show fixed
Hide fixed
SeekableStreamStartSequenceNumbers<String, String> sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); | ||
SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); | ||
|
||
DateTime now = DateTimes.nowUtc(); |
Check notice
Code scanning / CodeQL
Unread local variable Note test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not reviewed the tests yet.
For this change, we should also include docs for what this new parameter is in the IOConfig
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
Outdated
Show resolved
Hide resolved
@@ -51,7 +53,8 @@ public SeekableStreamIndexTaskIOConfig( | |||
final Boolean useTransaction, | |||
final DateTime minimumMessageTime, | |||
final DateTime maximumMessageTime, | |||
@Nullable final InputFormat inputFormat | |||
@Nullable final InputFormat inputFormat, | |||
@Nullable final Duration taskDuration // can be null for backward compabitility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an advantage to using Duration? Would it be simpler to pass this in as a long of minutes
@Nullable final Duration taskDuration // can be null for backward compabitility | |
@Nullable final Long refreshRejectionPeriodsInMinutes // can be null for backward compabitility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its good to have Duration and no need to convert back and forth and also confusion of adding mins
to the variable. You think there are any issues with the schema deserializing ? (I see at least Period
used, maybe we can switch to it if so )
Also, regarding variable name change, Do you think there would be a usecase of user editing it to a different value ? I would prefer to use taskDuration
so that no one touches and would also avoid the need for docs (Also, I dont find this documented anywhere, can you point me to the file ?)
.../src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
Show resolved
Hide resolved
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
Outdated
Show resolved
Hide resolved
private void addTaskDurationToMinMaxTimes() | ||
{ | ||
if (minMessageTime.isPresent()) { | ||
minMessageTime = Optional.of(minMessageTime.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok that intelliJ is warning that this is an unsafe update of a volatile variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah its ok.
Its complaining because we are using the same variable to update the value but we only do it once every taskDuration. If it was done more frequently we should have used atomic references.
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
Outdated
Show resolved
Hide resolved
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
Outdated
Show resolved
Hide resolved
6877bc7
to
bba9486
Compare
Description
In cases where
stopTasksCount
is configured which makes streaming tasks run much longer (in some cases) than the configured task duration, early/late rejection periods are not correctly implemented since they are configured during the task startup time in IOConfig. Instead, this PR propagates updates the min/max times once every task duration.Key changed/added classes in this PR
SeekableStreamIndexTaskRunner
SeekableStreamIndexTaskIOConfig
This PR has: