Skip to content

Commit

Permalink
[TH2-5234] tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 3, 2024
1 parent 05c852e commit 444113d
Show file tree
Hide file tree
Showing 20 changed files with 550 additions and 476 deletions.
13 changes: 1 addition & 12 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import com.exactpro.th2.rptdataprovider.producers.MessageProducer
import com.exactpro.th2.rptdataprovider.serialization.InstantBackwardCompatibilitySerializer
import com.exactpro.th2.rptdataprovider.server.ServerType
import com.exactpro.th2.rptdataprovider.services.cradle.CradleService
import com.exactpro.th2.rptdataprovider.services.cradle.MessageGroupCradleService
import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -68,7 +67,7 @@ abstract class Context<B, G, RM, PM>(

val grpcConfig: GrpcConfiguration,

val cradleService: CradleService = createCradleService(configuration, cradleManager),
val cradleService: CradleService = CradleService(configuration, cradleManager),

val rabbitMqService: RabbitMqService<B, G, PM>,

Expand Down Expand Up @@ -143,15 +142,5 @@ abstract class Context<B, G, RM, PM>(
)
}
}

@JvmStatic
protected fun createCradleService(
configuration: Configuration,
cradleManager: CradleManager
) = if (configuration.searchBySessionGroup.value.toBoolean()) {
MessageGroupCradleService(configuration, cradleManager)
} else {
CradleService(configuration, cradleManager)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ProtoContext(

grpcConfig: GrpcConfiguration,

cradleService: CradleService = createCradleService(configuration, cradleManager),
cradleService: CradleService = CradleService(configuration, cradleManager),

rabbitMqService: RabbitMqService<MessageGroupBatch, MessageGroup, Message> = ProtoRabbitMqService(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class TransportContext(

grpcConfig: GrpcConfiguration,

cradleService: CradleService = createCradleService(configuration, cradleManager),
cradleService: CradleService = CradleService(configuration, cradleManager),

rabbitMqService: RabbitMqService<GroupBatch, MessageGroup, ParsedMessage> = TransportRabbitMqService(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class CustomConfigurationClass {

val useStrictMode = false

val searchBySessionGroup = true

val useTransportMode: Boolean = true

val serverType: ServerType = ServerType.HTTP
Expand Down Expand Up @@ -103,9 +101,6 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
val messageCacheSize: Variable =
Variable("messageCacheSize", customConfiguration.messageCacheSize.toString(), "1")

val searchBySessionGroup: Variable =
Variable("searchBySessionGroup", customConfiguration.searchBySessionGroup.toString(), "true")

val aliasToGroupCacheSize: Variable = // TODO: added value check
Variable("aliasToGroupCacheSize", customConfiguration.aliasToGroupCacheSize.toString(), "1000")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,62 @@ import com.exactpro.cradle.BookId
import com.exactpro.cradle.Direction
import java.time.Instant

class CommonStreamName(
val bookId: BookId,
val name: String
) {
internal val fullName = "${bookId.name}:$name"

override fun toString(): String {
return fullName
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

data class StreamName(
val name: String,
other as CommonStreamName

return fullName == other.fullName
}

override fun hashCode(): Int {
return fullName.hashCode()
}
}

class StreamName(
bookId: BookId,
name: String,
val direction: Direction,
val bookId: BookId
) {
private val fullName = "$name:$direction"
val common = CommonStreamName(bookId, name)
val bookId: BookId
get() = common.bookId
val name: String
get() = common.name

internal val fullName = "${common.fullName}:$direction"

override fun toString(): String {
return fullName
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
if (!super.equals(other)) return false

other as StreamName

return fullName == other.fullName
}

override fun hashCode(): Int {
var result = super.hashCode()
result = 31 * result + fullName.hashCode()
return result
}
}


Expand All @@ -48,7 +93,7 @@ data class StreamPointer(
timestamp: Instant,
hasStarted: Boolean
) : this(
stream = StreamName(streamName, direction, bookId),
stream = StreamName(bookId, streamName, direction),
sequence = sequence,
timestamp = timestamp,
hasStarted = hasStarted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package com.exactpro.th2.rptdataprovider.entities.responses

import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageId
import java.time.Instant

data class MessageBatchWrapper<RM>(
val batchId: StoredMessageId,
val batchFirstTime: Instant,
val trimmedMessages: Collection<MessageWrapper<RM>>
)

data class StoredMessageBatchWrapper(
val batchId: StoredMessageId,
val batchFirstTime: Instant,
val trimmedMessages: Collection<StoredMessage>
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.exactpro.th2.rptdataprovider.entities.responses

import com.exactpro.cradle.Direction
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.dataprovider.grpc.Stream
import com.exactpro.th2.rptdataprovider.convertToProto
import com.exactpro.th2.rptdataprovider.cradleDirectionToGrpc
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.fasterxml.jackson.annotation.JsonIgnore
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant

data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId: StoredMessageId? = null) {
data class StreamInfo(val streamPointer: CommonStreamName, @JsonIgnore val messageId: StoredMessageId? = null) {

@JsonIgnore
private val lastMessage = messageId ?: let {
Expand All @@ -35,7 +36,7 @@ data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId:
StoredMessageId(
streamPointer.bookId,
streamPointer.name,
streamPointer.direction,
Direction.FIRST,
Instant.ofEpochMilli(0),
0
)
Expand All @@ -48,16 +49,13 @@ data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId:
}

fun convertToProto(): Stream {
return Stream.newBuilder()
.setDirection(cradleDirectionToGrpc(streamPointer.direction))
.setSession(streamPointer.name)
.setLastId(
lastMessage.let {
logger.trace { "stream $streamPointer - lastElement is ${it.sequence}" }
it.convertToProto()

}
)
.build()
return lastMessage.run {
logger.trace { "stream $streamPointer - lastElement is $sequence" }
Stream.newBuilder()
.setDirection(cradleDirectionToGrpc(direction))
.setSession(sessionAlias)
.setLastId(this.convertToProto())
.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
package com.exactpro.th2.rptdataprovider.handlers

import com.exactpro.th2.rptdataprovider.Context
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineStepObject
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import io.github.oshai.kotlinlogging.KotlinLogging


abstract class PipelineComponent<B, G, RM, PM>(
val context: Context<B, G, RM, PM>,
val searchRequest: SseMessageSearchRequest<RM, PM>,
val externalScope: CoroutineScope,
val streamName: StreamName? = null,
open val commonStreamName: CommonStreamName? = null,
val previousComponent: PipelineComponent<B, G, RM, PM>? = null,
messageFlowCapacity: Int
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.exactpro.th2.rptdataprovider.handlers

import com.exactpro.cradle.Direction
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
Expand All @@ -30,12 +29,12 @@ import com.exactpro.th2.rptdataprovider.ProtoMessageGroup
import com.exactpro.th2.rptdataprovider.ProtoRawMessage
import com.exactpro.th2.rptdataprovider.TransportMessageGroup
import com.exactpro.th2.rptdataprovider.TransportRawMessage
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.exactpro.th2.rptdataprovider.entities.internal.EmptyPipelineObject
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineFilteredMessage
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineKeepAlive
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch
import com.exactpro.th2.rptdataprovider.entities.internal.StreamEndObject
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.mappers.TimeRelationMapper
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.entities.responses.StreamInfo
Expand Down Expand Up @@ -183,9 +182,7 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(

val pipelineStatus = PipelineStatus()

val streamNames = resultRequest.stream.flatMap { stream ->
Direction.values().map { StreamName(stream, it, request.bookId) }
}
val streamNames = resultRequest.stream.map { stream -> CommonStreamName(request.bookId, stream) }

val coroutineScope = CoroutineScope(coroutineContext + Job(coroutineContext[Job]))
pipelineStatus.addStreams(streamNames.map { it.toString() })
Expand Down Expand Up @@ -222,7 +219,7 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
} while (listPair.isEmpty())

listPair.first().let {
streamInfoList.add(StreamInfo(messageExtractor.streamName!!, it.first))
streamInfoList.add(StreamInfo(messageExtractor.commonStreamName, it.first))
}
}
return streamInfoList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.exactpro.th2.rptdataprovider.handlers.messages

import com.exactpro.cradle.Direction
import com.exactpro.th2.common.grpc.Message
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch
Expand All @@ -26,6 +25,7 @@ import com.exactpro.th2.rptdataprovider.ProtoMessageGroup
import com.exactpro.th2.rptdataprovider.ProtoRawMessage
import com.exactpro.th2.rptdataprovider.TransportMessageGroup
import com.exactpro.th2.rptdataprovider.TransportRawMessage
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus
Expand All @@ -38,9 +38,7 @@ abstract class ChainBuilder<B, G, RM, PM>(
protected val pipelineStatus: PipelineStatus
) {
fun buildChain(): StreamMerger<B, G, RM, PM> {
val streamNames =
request.stream.flatMap { stream -> Direction.values().map { StreamName(stream, it, request.bookId) } }

val streamNames = request.stream.map { stream -> CommonStreamName(request.bookId, stream) }

pipelineStatus.addStreams(streamNames.map { it.toString() })

Expand Down
Loading

0 comments on commit 444113d

Please sign in to comment.