Skip to content

Commit

Permalink
Refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Nov 15, 2024
1 parent 2e8e379 commit 9e7abe0
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package com.exactpro.th2.rptdataprovider.entities.responses

import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageId
import java.time.Instant

data class MessageBatchWrapper<RM>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@ import com.exactpro.th2.rptdataprovider.ProtoRawMessage
import com.exactpro.th2.rptdataprovider.TransportMessageGroup
import com.exactpro.th2.rptdataprovider.TransportRawMessage
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus
import kotlinx.coroutines.CoroutineScope
Expand All @@ -40,7 +39,7 @@ abstract class ChainBuilder<B, G, RM, PM>(
fun buildChain(): StreamMerger<B, G, RM, PM> {
val streamNames = request.stream.map { stream -> CommonStreamName(request.bookId, stream) }

pipelineStatus.addStreams(streamNames.map { it.toString() })
pipelineStatus.addStreams(streamNames.map(CommonStreamName::toString))

val dataStreams = streamNames.map { streamName ->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ abstract class MessageBatchConverter<B, G, RM, PM>(

val timeStart = System.currentTimeMillis()

logger.trace { "received raw batch (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" }
logger.trace { "received raw batch (stream=$commonStreamName first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" }

val filteredMessages: List<MessageHolder<G, RM>> = pipelineMessage.storedBatchWrapper.trimmedMessages.convertAndFilter()

Expand All @@ -130,9 +130,9 @@ abstract class MessageBatchConverter<B, G, RM, PM>(

if (codecRequest.codecRequest.groupsCount > 0) {
sendToChannel(codecRequest)
logger.trace { "converted batch is sent downstream (stream=${commonStreamName.toString()} first-time=${codecRequest.storedBatchWrapper.batchFirstTime} requestHash=${codecRequest.codecRequest.requestHash})" }
logger.trace { "converted batch is sent downstream (stream=$commonStreamName first-time=${codecRequest.storedBatchWrapper.batchFirstTime} requestHash=${codecRequest.codecRequest.requestHash})" }
} else {
logger.trace { "converted batch is discarded because it has no messages (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" }
logger.trace { "converted batch is discarded because it has no messages (stream=$commonStreamName first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" }
}

pipelineStatus.convertSendDownstream(
Expand Down Expand Up @@ -188,7 +188,7 @@ class ProtoMessageBatchConverter(
((included.isNullOrEmpty() || included.contains(protocol))
&& (excluded.isNullOrEmpty() || !excluded.contains(protocol)))
.also {
logger.trace { "message ${message?.sequence} has protocol $protocol (matchesProtocolFilter=${it}) (stream=${commonStreamName.toString()})" }
logger.trace { "message ${message?.sequence} has protocol $protocol (matchesProtocolFilter=${it}) (stream=$commonStreamName)" }
}
}.toList()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class MessageBatchDecoder<B, G, RM, PM>(
)
)
} else {
logger.trace { "received converted batch (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" }
logger.trace { "received converted batch (stream=$commonStreamName first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" }

pipelineStatus.decodeStart(
commonStreamName.toString(),
Expand All @@ -120,7 +120,7 @@ class MessageBatchDecoder<B, G, RM, PM>(
pipelineMessage.codecRequest.groupsCount.toLong()
)

logger.trace { "decoded batch is sent downstream (stream=${commonStreamName.toString()} first-time=${result.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" }
logger.trace { "decoded batch is sent downstream (stream=$commonStreamName first-time=${result.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" }
}

pipelineStatus.countParseRequested(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ abstract class MessageBatchUnpacker<B, G, RM, PM>(

val messages = pipelineMessage.storedBatchWrapper.trimmedMessages

logger.debug { "codec response unpacking took ${result.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=${commonStreamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${messages.size})" }
logger.debug { "codec response unpacking took ${result.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=$commonStreamName firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${messages.size})" }

pipelineMessage.info.buildMessage = result.duration.toDouble(DurationUnit.MILLISECONDS).toLong()
StreamWriter.setBuildMessage(pipelineMessage.info)
Expand All @@ -165,7 +165,7 @@ abstract class MessageBatchUnpacker<B, G, RM, PM>(
pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong()
)

logger.debug { "unpacked responses are sent (stream=${commonStreamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${result.value.size})" }
logger.debug { "unpacked responses are sent (stream=$commonStreamName firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${result.value.size})" }

} else {
sendToChannel(pipelineMessage)
Expand Down Expand Up @@ -232,7 +232,7 @@ class ProtoMessageBatchUnpacker(
val messages = pipelineMessage.storedBatchWrapper.trimmedMessages

throw CodecResponseException(
"""codec dont parsed all messages
"""codec don't parsed all messages
| (stream=${commonStreamName}
| firstRequestId=${messages.first().id.sequence}
| lastRequestId=${messages.last().id.sequence}
Expand Down Expand Up @@ -320,7 +320,7 @@ class TransportMessageBatchUnpacker(
val messages = pipelineMessage.storedBatchWrapper.trimmedMessages

throw CodecResponseException(
"""codec dont parsed all messages
"""codec don't parsed all messages
| (stream=${commonStreamName}
| firstRequestId=${messages.first().id.sequence}
| lastRequestId=${messages.last().id.sequence}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 9e7abe0

Please sign in to comment.