Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-20347: Integrate new delivery tracker #6061

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,9 @@ class LinkManager(
commonComponents,
)
private val outboundLinkManager = OutboundLinkManager(
lifecycleCoordinatorFactory = lifecycleCoordinatorFactory,
commonComponents = commonComponents,
sessionComponents = sessionManagerCommonComponents,
linkManagerHostingMap = linkManagerHostingMap,
groupPolicyProvider = groupPolicyProvider,
membershipGroupReaderProvider = membershipGroupReaderProvider,
configurationReaderService = configurationReaderService,
subscriptionFactory = subscriptionFactory,
publisherFactory = publisherFactory,
messagingConfiguration = messagingConfiguration,
clock = clock,
sessionComponents = sessionManagerCommonComponents,
)
private val inboundLinkManager = InboundLinkManager(
lifecycleCoordinatorFactory = lifecycleCoordinatorFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ internal class CommonComponents(
membershipQueryClient: MembershipQueryClient,
groupParametersReaderService: GroupParametersReaderService,
internal val clock: Clock,
internal val schemaRegistry: AvroSchemaRegistry,
internal val stateManager: StateManager,
internal val schemaRegistry: AvroSchemaRegistry,
sessionEncryptionOpsClient: SessionEncryptionOpsClient,
) : LifecycleWithDominoTile {
private companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package net.corda.p2p.linkmanager.delivery

import net.corda.configuration.read.ConfigurationReadService
import net.corda.crypto.client.CryptoOpsClient
import net.corda.data.p2p.AuthenticatedMessageAndKey
import net.corda.data.p2p.AuthenticatedMessageDeliveryState
Expand All @@ -25,57 +24,49 @@ import net.corda.messaging.api.publisher.config.PublisherConfig
import net.corda.messaging.api.publisher.factory.PublisherFactory
import net.corda.messaging.api.records.Record
import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.messaging.api.subscription.listener.StateAndEventListener
import net.corda.metrics.CordaMetrics
import net.corda.p2p.linkmanager.common.CommonComponents
import net.corda.p2p.linkmanager.sessions.SessionManager
import net.corda.schema.Schemas.P2P.P2P_OUT_MARKERS
import net.corda.utilities.debug
import net.corda.utilities.time.Clock
import net.corda.virtualnode.toCorda
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap

@Suppress("LongParameterList")
internal class DeliveryTracker(
coordinatorFactory: LifecycleCoordinatorFactory,
configReadService: ConfigurationReadService,
publisherFactory: PublisherFactory,
commonComponents: CommonComponents,
messagingConfiguration: SmartConfig,
subscriptionFactory: SubscriptionFactory,
sessionManager: SessionManager,
clock: Clock,
processAuthenticatedMessage: (message: AuthenticatedMessageAndKey) -> List<Record<String, *>>,
): LifecycleWithDominoTile {
): LifecycleWithDominoTile {

private val appMessageReplayer = AppMessageReplayer(
coordinatorFactory,
publisherFactory,
commonComponents.lifecycleCoordinatorFactory,
commonComponents.publisherFactory,
messagingConfiguration,
processAuthenticatedMessage
)
private val replayScheduler = ReplayScheduler<SessionManager.Counterparties, AuthenticatedMessageAndKey>(
coordinatorFactory,
configReadService,
commonComponents,
true,
appMessageReplayer::replayMessage,
clock = clock
)

private val messageTracker = MessageTracker(replayScheduler)
private val subscriptionConfig = SubscriptionConfig("message-tracker-group", P2P_OUT_MARKERS)
private val messageTrackerSubscription = {
subscriptionFactory.createStateAndEventSubscription(
commonComponents.subscriptionFactory.createStateAndEventSubscription(
subscriptionConfig,
messageTracker.processor,
messagingConfiguration,
messageTracker.listener
)
}
private val messageTrackerSubscriptionTile = StateAndEventSubscriptionDominoTile(
coordinatorFactory,
commonComponents.lifecycleCoordinatorFactory,
messageTrackerSubscription,
subscriptionConfig,
setOf(
Expand All @@ -92,7 +83,9 @@ internal class DeliveryTracker(
)
)

override val dominoTile = ComplexDominoTile(this::class.java.simpleName, coordinatorFactory,
override val dominoTile = ComplexDominoTile(
this::class.java.simpleName,
commonComponents.lifecycleCoordinatorFactory,
dependentChildren = setOf(messageTrackerSubscriptionTile.coordinatorName),
managedChildren = setOf(messageTrackerSubscriptionTile.toNamedLifecycle())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ package net.corda.p2p.linkmanager.delivery

import com.typesafe.config.Config
import com.typesafe.config.ConfigException
import net.corda.configuration.read.ConfigurationReadService
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.BASE_REPLAY_PERIOD_KEY
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MAX_REPLAYING_MESSAGES_PER_PEER
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.MESSAGE_REPLAY_PERIOD_KEY
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.REPLAY_ALGORITHM_KEY
import net.corda.libs.configuration.schema.p2p.LinkManagerConfiguration.Companion.REPLAY_PERIOD_CUTOFF_KEY
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.domino.logic.ComplexDominoTile
import net.corda.lifecycle.domino.logic.ConfigurationChangeHandler
import net.corda.lifecycle.domino.logic.LifecycleWithDominoTile
import net.corda.lifecycle.domino.logic.util.ResourcesHolder
import net.corda.p2p.linkmanager.common.CommonComponents
import net.corda.p2p.linkmanager.sessions.SessionManager
import net.corda.schema.configuration.ConfigKeys
import net.corda.utilities.VisibleForTesting
import net.corda.utilities.time.Clock
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.concurrent.CompletableFuture
Expand All @@ -31,19 +29,16 @@ import java.util.concurrent.atomic.AtomicReference
/**
* This class keeps track of messages which may need to be replayed.
*/
@Suppress("LongParameterList")
internal class ReplayScheduler<K: SessionManager.BaseCounterparties, M>(
coordinatorFactory: LifecycleCoordinatorFactory,
private val configReadService: ConfigurationReadService,
private val commonComponents: CommonComponents,
private val limitTotalReplays: Boolean,
private val replayMessage: (message: M, messageId: MessageId) -> Unit,
executorServiceFactory: () -> ScheduledExecutorService = { Executors.newSingleThreadScheduledExecutor() },
private val clock: Clock
) : LifecycleWithDominoTile {
) : LifecycleWithDominoTile {

override val dominoTile = ComplexDominoTile(
this::class.java.simpleName,
coordinatorFactory,
commonComponents.lifecycleCoordinatorFactory,
onClose = { executorService.shutdownNow() },
configurationChangeHandler = ReplaySchedulerConfigurationChangeHandler()
)
Expand Down Expand Up @@ -132,7 +127,8 @@ internal class ReplayScheduler<K: SessionManager.BaseCounterparties, M>(
}
}

inner class ReplaySchedulerConfigurationChangeHandler: ConfigurationChangeHandler<ReplaySchedulerConfig>(configReadService,
inner class ReplaySchedulerConfigurationChangeHandler: ConfigurationChangeHandler<ReplaySchedulerConfig>(
commonComponents.configurationReaderService,
ConfigKeys.P2P_LINK_MANAGER_CONFIG,
::fromConfig) {
override fun applyNewConfiguration(
Expand Down Expand Up @@ -242,7 +238,7 @@ internal class ReplayScheduler<K: SessionManager.BaseCounterparties, M>(
replayInfoPerMessageId.compute(messageId) { _, replayInfo ->
replayInfo?.future?.cancel(false)
val firstReplayPeriod = replayCalculator.get().calculateReplayInterval()
val delay = firstReplayPeriod.toMillis() + originalAttemptTimestamp - clock.instant().toEpochMilli()
val delay = firstReplayPeriod.toMillis() + originalAttemptTimestamp - commonComponents.clock.instant().toEpochMilli()
val future = executorService.schedule({ replay(message, messageId) }, delay, TimeUnit.MILLISECONDS)
ReplayInfo(firstReplayPeriod, future)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import net.corda.p2p.linkmanager.metrics.recordInboundSessionMessagesMetric
import net.corda.p2p.linkmanager.metrics.recordOutboundSessionMessagesMetric
import net.corda.p2p.linkmanager.sessions.StatefulSessionManagerImpl.Companion.LINK_MANAGER_SUBSYSTEM
import net.corda.schema.Schemas
import net.corda.schema.Schemas.P2P.LINK_ACK_IN_TOPIC
import net.corda.tracing.traceEventProcessing
import net.corda.utilities.Either
import net.corda.utilities.debug
Expand Down Expand Up @@ -209,7 +210,7 @@ internal class InboundMessageProcessor(
is SessionManager.SessionDirection.Outbound -> processOutboundDataMessage(sessionIdAndMessage, sessionDirection)?.let {
ItemWithSource(
InboundResponse(
listOf(it),
it,
),
sessionIdAndMessage.message.source,
)
Expand Down Expand Up @@ -243,7 +244,7 @@ internal class InboundMessageProcessor(
private fun <T: InboundMessage> processOutboundDataMessage(
sessionIdAndMessage: SessionIdAndMessage<T>,
sessionDirection: SessionManager.SessionDirection.Outbound
): Record<*, *>? {
): List<Record<*, *>>? {
return if (isCommunicationAllowed(sessionDirection.counterparties)) {
MessageConverter.extractPayload(
sessionDirection.session,
Expand All @@ -255,8 +256,7 @@ internal class InboundMessageProcessor(
is AuthenticatedMessageAck -> {
logger.debug { "Processing ack for message ${ack.messageId} from session $sessionIdAndMessage." }
sessionManager.messageAcknowledged(sessionIdAndMessage.sessionId)
val record = makeMarkerForAckMessage(ack)
record
makeMarkerForAckMessage(ack)
}
else -> {
logger.warn("Received an inbound message with unexpected type for SessionId = $sessionIdAndMessage.")
Expand Down Expand Up @@ -369,12 +369,25 @@ internal class InboundMessageProcessor(
}
}

private fun makeMarkerForAckMessage(message: AuthenticatedMessageAck): Record<String, AppMessageMarker> {
return Record(
Schemas.P2P.P2P_OUT_MARKERS,
message.messageId,
AppMessageMarker(LinkManagerReceivedMarker(), clock.instant().toEpochMilli())
)
private fun makeMarkerForAckMessage(message: AuthenticatedMessageAck):
List<Record<*, *>> {
return listOf(
Record(
Schemas.P2P.P2P_OUT_MARKERS,
message.messageId,
AppMessageMarker(LinkManagerReceivedMarker(), clock.instant().toEpochMilli()),
)
) + if (features.enableP2PStatefulDeliveryTracker) {
listOf(
Record(
LINK_ACK_IN_TOPIC,
message.messageId,
null,
)
)
} else {
emptyList()
}
}

private fun isCommunicationAllowed(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,74 +1,37 @@
package net.corda.p2p.linkmanager.outbound

import net.corda.configuration.read.ConfigurationReadService
import net.corda.libs.configuration.SmartConfig
import net.corda.lifecycle.LifecycleCoordinatorFactory
import net.corda.lifecycle.domino.logic.ComplexDominoTile
import net.corda.lifecycle.domino.logic.LifecycleWithDominoTile
import net.corda.lifecycle.domino.logic.util.PublisherWithDominoLogic
import net.corda.lifecycle.domino.logic.util.SubscriptionDominoTile
import net.corda.membership.grouppolicy.GroupPolicyProvider
import net.corda.membership.read.MembershipGroupReaderProvider
import net.corda.messaging.api.publisher.config.PublisherConfig
import net.corda.messaging.api.publisher.factory.PublisherFactory
import net.corda.messaging.api.subscription.config.SubscriptionConfig
import net.corda.messaging.api.subscription.factory.SubscriptionFactory
import net.corda.p2p.linkmanager.common.CommonComponents
import net.corda.p2p.linkmanager.delivery.DeliveryTracker
import net.corda.p2p.linkmanager.hosting.LinkManagerHostingMap
import net.corda.p2p.linkmanager.sessions.SessionManagerCommonComponents
import net.corda.p2p.linkmanager.tracker.StatefulDeliveryTracker
import net.corda.schema.Schemas
import net.corda.utilities.flags.Features
import net.corda.utilities.time.Clock

@Suppress("LongParameterList")
internal class OutboundLinkManager(
lifecycleCoordinatorFactory: LifecycleCoordinatorFactory,
commonComponents: CommonComponents,
sessionComponents: SessionManagerCommonComponents,
linkManagerHostingMap: LinkManagerHostingMap,
groupPolicyProvider: GroupPolicyProvider,
membershipGroupReaderProvider: MembershipGroupReaderProvider,
configurationReaderService: ConfigurationReadService,
subscriptionFactory: SubscriptionFactory,
publisherFactory: PublisherFactory,
messagingConfiguration: SmartConfig,
clock: Clock,
features: Features = Features()
) : LifecycleWithDominoTile {
companion object {
private const val OUTBOUND_MESSAGE_PROCESSOR_GROUP = "outbound_message_processor_group"
}
private val outboundMessageProcessor = OutboundMessageProcessor(
sessionComponents.sessionManager,
linkManagerHostingMap,
groupPolicyProvider,
membershipGroupReaderProvider,
commonComponents.linkManagerHostingMap,
commonComponents.groupPolicyProvider,
commonComponents.membershipGroupReaderProvider,
commonComponents.messagesPendingSession,
clock,
commonComponents.clock,
commonComponents.messageConverter,
)
private val deliveryTracker = DeliveryTracker(
lifecycleCoordinatorFactory,
configurationReaderService,
publisherFactory,
messagingConfiguration,
subscriptionFactory,
sessionComponents.sessionManager,
clock = clock
) { outboundMessageProcessor.processReplayedAuthenticatedMessage(it) }

private val subscriptionConfig = SubscriptionConfig(OUTBOUND_MESSAGE_PROCESSOR_GROUP, Schemas.P2P.P2P_OUT_TOPIC)

private val outboundMessageSubscription = {
subscriptionFactory.createEventLogSubscription(
subscriptionConfig,
outboundMessageProcessor,
messagingConfiguration,
partitionAssignmentListener = null
)
}

override val dominoTile = if (features.enableP2PStatefulDeliveryTracker) {
val publisher = PublisherWithDominoLogic(
Expand All @@ -88,7 +51,7 @@ internal class OutboundLinkManager(
)
ComplexDominoTile(
OUTBOUND_MESSAGE_PROCESSOR_GROUP,
coordinatorFactory = lifecycleCoordinatorFactory,
coordinatorFactory = commonComponents.lifecycleCoordinatorFactory,
dependentChildren = listOf(
statefulDeliveryTracker.dominoTile.coordinatorName,
publisher.dominoTile.coordinatorName,
Expand All @@ -99,8 +62,25 @@ internal class OutboundLinkManager(
),
)
} else {
val deliveryTracker = DeliveryTracker(
commonComponents,
messagingConfiguration,
sessionComponents.sessionManager,
) { outboundMessageProcessor.processReplayedAuthenticatedMessage(it) }

val subscriptionConfig = SubscriptionConfig(OUTBOUND_MESSAGE_PROCESSOR_GROUP, Schemas.P2P.P2P_OUT_TOPIC)

val outboundMessageSubscription = {
commonComponents.subscriptionFactory.createEventLogSubscription(
subscriptionConfig,
outboundMessageProcessor,
messagingConfiguration,
partitionAssignmentListener = null
)
}

SubscriptionDominoTile(
lifecycleCoordinatorFactory,
commonComponents.lifecycleCoordinatorFactory,
outboundMessageSubscription,
subscriptionConfig,
dependentChildren = listOf(
Expand Down
Loading