Skip to content

Commit

Permalink
set reset offset to earliest on manual retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
cipriansofronia committed Sep 1, 2023
1 parent 2b42dec commit 7a5face
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
23 changes: 14 additions & 9 deletions service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.kafkamate.config.ClustersConfig._
import io.kafkamate.config._
import io.kafkamate.kafka.KafkaExplorer._
import io.kafkamate.messages._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import zio._
import zio.blocking.Blocking
Expand Down Expand Up @@ -57,29 +58,28 @@ import zio.stream.ZStream
.getLatestOffset(request.clusterId, tps)
.map(_.view.mapValues(safeOffset(request.maxResults)).toMap)
.provide(dep)
r = OffsetRetrieval.Manual(f)
} yield r
} yield OffsetRetrieval.Manual(f)
case v => ZIO.fail(new IllegalArgumentException(s"Unrecognized OffsetStrategy: $v"))
}

private def protobufDeserializer(settings: ProtoSerdeSettings): Task[Deserializer[Any, Try[Message]]] =
Deserializer
.fromKafkaDeserializer(new KafkaProtobufDeserializer[Message](), settings.configs, false)
.map(_.asTry)

private def consumerSettings(
config: ClusterSettings,
request: ConsumeRequest
): RIO[Env, ConsumerSettings] =
for {
offsetRetrieval <- extractOffsetStrategy(request)
manualResetOffset = offsetRetrieval match {
case _: OffsetRetrieval.Manual => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
case _ => Map.empty[String, AnyRef]
}
} yield {
val uuid = UUID.randomUUID().toString
ConsumerSettings(config.kafkaHosts)
.withGroupId(s"group-kafkamate-$uuid")
.withClientId(s"client-kafkamate-$uuid")
.withProperties(config.protoSerdeSettings.map(_.configs).getOrElse(Map.empty))
.withOffsetRetrieval(offsetRetrieval)
.withProperties(manualResetOffset)
.withCloseTimeout(10.seconds)
}

Expand All @@ -95,9 +95,14 @@ import zio.stream.ZStream
} yield consumer
}

private def protobufDeserializer(settings: ProtoSerdeSettings): Task[Deserializer[Any, Try[Message]]] =
Deserializer
.fromKafkaDeserializer(new KafkaProtobufDeserializer[Message](), settings.configs, false)
.map(_.asTry)

private def deserializeAuto(
messageFormat: MessageFormat,
cachedDeserializer: Task[Deserializer[Any, Try[Message]]],
cachedDeserializer: RIO[Logging, Deserializer[Any, Try[Message]]],
record: CommittableRecord[String, Array[Byte]],
fallback: Throwable => Task[LogicMessage]
) =
Expand Down Expand Up @@ -139,7 +144,7 @@ import zio.stream.ZStream
.orElseFail(new RuntimeException("SchemaRegistry url was not provided!")))
.flatMap(protobufDeserializer)
.zipLeft(log.debug(s"Created proto deserializer"))
.cached(Duration.Infinity)
.memoize
)
response <- Consumer
.subscribeAnd(Subscription.topics(request.topicName))
Expand Down
4 changes: 2 additions & 2 deletions site/src/main/scala/io/kafkamate/messages/ListMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ import slinky.web.html._
th("Offset"),
th("Partition"),
th("Timestamp"),
th("Key"),
th("Value Format"),
th("Key"),
th("Value")
)
),
Expand All @@ -216,8 +216,8 @@ import slinky.web.html._
td(item.offset.toString),
td(item.partition.toString),
td(new Date(item.timestamp).toUTCString()),
td(item.key),
td(item.valueFormat.toString),
td(item.key),
td(item.value)
)
}
Expand Down

0 comments on commit 7a5face

Please sign in to comment.