Skip to content

Commit

Permalink
Fixing issue about restting streams
Browse files Browse the repository at this point in the history
  • Loading branch information
erwin-kok committed Aug 30, 2023
1 parent e6c6406 commit 6f95b49
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import mu.KotlinLogging
Expand All @@ -18,6 +17,7 @@ import org.erwinkok.libp2p.core.network.Direction
import org.erwinkok.libp2p.core.network.InetMultiaddress
import org.erwinkok.libp2p.core.network.NetworkConnection
import org.erwinkok.libp2p.core.network.Stream
import org.erwinkok.libp2p.core.network.StreamResetException
import org.erwinkok.libp2p.core.network.streammuxer.MuxedStream
import org.erwinkok.libp2p.core.network.transport.TransportConnection
import org.erwinkok.libp2p.core.resourcemanager.ConnectionScope
Expand Down Expand Up @@ -80,14 +80,18 @@ class SwarmConnection(

init {
scope.launch(_context + CoroutineName("swarm-connection-$id")) {
while (isActive && !transportConnection.isClosed) {
transportConnection.acceptStream()
.flatMap { muxedStream ->
resourceManager.openStream(remoteIdentity.peerId, Direction.DirInbound)
.flatMap { streamScope -> addStream(muxedStream, Direction.DirInbound, streamScope) }
.onSuccess { stream -> handleStream(stream) }
.onFailure { muxedStream.reset() }
}
while (!transportConnection.isClosed) {
try {
transportConnection.acceptStream()
.flatMap { muxedStream ->
resourceManager.openStream(remoteIdentity.peerId, Direction.DirInbound)
.flatMap { streamScope -> addStream(muxedStream, Direction.DirInbound, streamScope) }
.onSuccess { stream -> handleStream(stream) }
}
} catch (e: StreamResetException) {
// The stream was reset. That's fine here.
// We just continue accepting and handling a new stream.
}
}
}.invokeOnCompletion {
close()
Expand Down Expand Up @@ -160,11 +164,13 @@ class SwarmConnection(
}
} else {
logger.warn { "no handler registered for $protocol" }
stream.reset()
}
}
}
if (result == null) {
logger.warn { "negotiation timeout in when determining protocol" }
stream.reset()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ fun main() {
private suspend fun listener(scope: CoroutineScope, host: Host, redisClient: Jedis, testTimeout: Duration): Boolean {
logger.info { "Configured as listener..." }
val pingService = PingService(scope, host)
// Temporary implementation until the issue around the identify protocol is solved (next release).
// This temporary implementation accepts an identify stream, and immediately resets it.
host.setStreamHandler(ProtocolId.of("/ipfs/id/1.0.0")) { stream ->
stream.reset()
}
val hostAddress = host.addresses().firstOrNull()
if (hostAddress == null) {
logger.error { "Failed to get listen address" }
Expand Down

0 comments on commit 6f95b49

Please sign in to comment.