diff --git a/libp2p-core/src/main/kotlin/org/erwinkok/libp2p/core/network/swarm/SwarmConnection.kt b/libp2p-core/src/main/kotlin/org/erwinkok/libp2p/core/network/swarm/SwarmConnection.kt index 9cfc23e..07bb6ba 100644 --- a/libp2p-core/src/main/kotlin/org/erwinkok/libp2p/core/network/swarm/SwarmConnection.kt +++ b/libp2p-core/src/main/kotlin/org/erwinkok/libp2p/core/network/swarm/SwarmConnection.kt @@ -6,7 +6,6 @@ import kotlinx.atomicfu.locks.withLock import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull import mu.KotlinLogging @@ -18,6 +17,7 @@ import org.erwinkok.libp2p.core.network.Direction import org.erwinkok.libp2p.core.network.InetMultiaddress import org.erwinkok.libp2p.core.network.NetworkConnection import org.erwinkok.libp2p.core.network.Stream +import org.erwinkok.libp2p.core.network.StreamResetException import org.erwinkok.libp2p.core.network.streammuxer.MuxedStream import org.erwinkok.libp2p.core.network.transport.TransportConnection import org.erwinkok.libp2p.core.resourcemanager.ConnectionScope @@ -80,14 +80,18 @@ class SwarmConnection( init { scope.launch(_context + CoroutineName("swarm-connection-$id")) { - while (isActive && !transportConnection.isClosed) { - transportConnection.acceptStream() - .flatMap { muxedStream -> - resourceManager.openStream(remoteIdentity.peerId, Direction.DirInbound) - .flatMap { streamScope -> addStream(muxedStream, Direction.DirInbound, streamScope) } - .onSuccess { stream -> handleStream(stream) } - .onFailure { muxedStream.reset() } - } + while (!transportConnection.isClosed) { + try { + transportConnection.acceptStream() + .flatMap { muxedStream -> + resourceManager.openStream(remoteIdentity.peerId, Direction.DirInbound) + .flatMap { streamScope -> addStream(muxedStream, Direction.DirInbound, streamScope) } + .onSuccess { stream -> handleStream(stream) } + } + } catch (e: StreamResetException) { + // The stream was reset. That's fine here. + // We just continue accepting and handling a new stream. + } } }.invokeOnCompletion { close() @@ -160,11 +164,13 @@ class SwarmConnection( } } else { logger.warn { "no handler registered for $protocol" } + stream.reset() } } } if (result == null) { logger.warn { "negotiation timeout in when determining protocol" } + stream.reset() } } diff --git a/test-plans/src/main/kotlin/org/erwinkok/libp2p/testplans/ping/Ping.kt b/test-plans/src/main/kotlin/org/erwinkok/libp2p/testplans/ping/Ping.kt index 58561e6..097c92d 100644 --- a/test-plans/src/main/kotlin/org/erwinkok/libp2p/testplans/ping/Ping.kt +++ b/test-plans/src/main/kotlin/org/erwinkok/libp2p/testplans/ping/Ping.kt @@ -140,11 +140,6 @@ fun main() { private suspend fun listener(scope: CoroutineScope, host: Host, redisClient: Jedis, testTimeout: Duration): Boolean { logger.info { "Configured as listener..." } val pingService = PingService(scope, host) - // Temporary implementation until the issue around the identify protocol is solved (next release). - // This temporary implementation accepts an identify stream, and immediately resets it. - host.setStreamHandler(ProtocolId.of("/ipfs/id/1.0.0")) { stream -> - stream.reset() - } val hostAddress = host.addresses().firstOrNull() if (hostAddress == null) { logger.error { "Failed to get listen address" }