Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyachakilam committed Nov 3, 2024
1 parent 9988d8f commit 1377975
Show file tree
Hide file tree
Showing 19 changed files with 483 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand All @@ -53,7 +54,9 @@ public RabbitStreamIndexTaskIOConfig(
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("uri") String uri)
@JsonProperty("uri") String uri,
@JsonProperty("taskDuration") Duration taskDuration
)
{
super(
taskGroupId,
Expand All @@ -63,7 +66,9 @@ public RabbitStreamIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat);
inputFormat,
taskDuration
);

this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS;
this.uri = uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat(),
rabbitConfig.getUri());
rabbitConfig.getUri(),
ioConfig.getTaskDuration()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Map;
Expand Down Expand Up @@ -63,7 +64,8 @@ public KafkaIndexTaskIOConfig(
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides,
@JsonProperty("multiTopic") @Nullable Boolean multiTopic
@JsonProperty("multiTopic") @Nullable Boolean multiTopic,
@JsonProperty("taskDuration") Duration taskDuration
)
{
super(
Expand All @@ -76,7 +78,8 @@ public KafkaIndexTaskIOConfig(
useTransaction,
minimumMessageTime,
maximumMessageTime,
inputFormat
inputFormat,
taskDuration
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down Expand Up @@ -107,7 +110,8 @@ public KafkaIndexTaskIOConfig(
DateTime minimumMessageTime,
DateTime maximumMessageTime,
InputFormat inputFormat,
KafkaConfigOverrides configOverrides
KafkaConfigOverrides configOverrides,
Duration taskDuration
)
{
this(
Expand All @@ -124,7 +128,8 @@ public KafkaIndexTaskIOConfig(
maximumMessageTime,
inputFormat,
configOverrides,
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC
KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC,
taskDuration
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
maximumMessageTime,
ioConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic()
kafkaIoConfig.isMultiTopic(),
ioConfig.getTaskDuration()
);
}

Expand Down
Loading

0 comments on commit 1377975

Please sign in to comment.