diff --git a/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala b/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala index 81b40a2..bb254c7 100644 --- a/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala +++ b/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala @@ -11,6 +11,7 @@ import io.kafkamate.config.ClustersConfig._ import io.kafkamate.config._ import io.kafkamate.kafka.KafkaExplorer._ import io.kafkamate.messages._ +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import zio._ import zio.blocking.Blocking @@ -57,22 +58,20 @@ import zio.stream.ZStream .getLatestOffset(request.clusterId, tps) .map(_.view.mapValues(safeOffset(request.maxResults)).toMap) .provide(dep) - r = OffsetRetrieval.Manual(f) - } yield r + } yield OffsetRetrieval.Manual(f) case v => ZIO.fail(new IllegalArgumentException(s"Unrecognized OffsetStrategy: $v")) } - private def protobufDeserializer(settings: ProtoSerdeSettings): Task[Deserializer[Any, Try[Message]]] = - Deserializer - .fromKafkaDeserializer(new KafkaProtobufDeserializer[Message](), settings.configs, false) - .map(_.asTry) - private def consumerSettings( config: ClusterSettings, request: ConsumeRequest ): RIO[Env, ConsumerSettings] = for { offsetRetrieval <- extractOffsetStrategy(request) + manualResetOffset = offsetRetrieval match { + case _: OffsetRetrieval.Manual => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest") + case _ => Map.empty[String, AnyRef] + } } yield { val uuid = UUID.randomUUID().toString ConsumerSettings(config.kafkaHosts) @@ -80,6 +79,7 @@ import zio.stream.ZStream .withClientId(s"client-kafkamate-$uuid") .withProperties(config.protoSerdeSettings.map(_.configs).getOrElse(Map.empty)) .withOffsetRetrieval(offsetRetrieval) + .withProperties(manualResetOffset) .withCloseTimeout(10.seconds) } @@ -95,9 +95,14 @@ import zio.stream.ZStream } yield consumer } + private def protobufDeserializer(settings: ProtoSerdeSettings): Task[Deserializer[Any, Try[Message]]] = + Deserializer + .fromKafkaDeserializer(new KafkaProtobufDeserializer[Message](), settings.configs, false) + .map(_.asTry) + private def deserializeAuto( messageFormat: MessageFormat, - cachedDeserializer: Task[Deserializer[Any, Try[Message]]], + cachedDeserializer: RIO[Logging, Deserializer[Any, Try[Message]]], record: CommittableRecord[String, Array[Byte]], fallback: Throwable => Task[LogicMessage] ) = @@ -139,7 +144,7 @@ import zio.stream.ZStream .orElseFail(new RuntimeException("SchemaRegistry url was not provided!"))) .flatMap(protobufDeserializer) .zipLeft(log.debug(s"Created proto deserializer")) - .cached(Duration.Infinity) + .memoize ) response <- Consumer .subscribeAnd(Subscription.topics(request.topicName)) diff --git a/site/src/main/scala/io/kafkamate/messages/ListMessages.scala b/site/src/main/scala/io/kafkamate/messages/ListMessages.scala index 0f15a00..a164611 100644 --- a/site/src/main/scala/io/kafkamate/messages/ListMessages.scala +++ b/site/src/main/scala/io/kafkamate/messages/ListMessages.scala @@ -204,8 +204,8 @@ import slinky.web.html._ th("Offset"), th("Partition"), th("Timestamp"), - th("Key"), th("Value Format"), + th("Key"), th("Value") ) ), @@ -216,8 +216,8 @@ import slinky.web.html._ td(item.offset.toString), td(item.partition.toString), td(new Date(item.timestamp).toUTCString()), - td(item.key), td(item.valueFormat.toString), + td(item.key), td(item.value) ) }