Skip to content

Commit

Permalink
simplify channels build by making TCP native only
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Dec 12, 2024
1 parent 57e9378 commit 7b136a1
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 47 deletions.
8 changes: 6 additions & 2 deletions Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ class NioTCP {
): LatentConnection[MessageBuffer] =
new LatentConnection {
override def prepare(incoming: Receive[MessageBuffer]): Async[Any, Connection[MessageBuffer]] =
TCP.syncAttempt {
handleConnection(bindsocket(), incoming)
Async.fromCallback {
try
Async.handler.succeed {
handleConnection(bindsocket(), incoming)
}
catch case NonFatal(exception) => Async.handler.fail(exception)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package channels

import java.net.{InetAddress, InetSocketAddress, ServerSocket, SocketException}

class EchoServerTestTCP extends EchoCommunicationTest(
{ ec =>
val socket = new ServerSocket

try socket.setReuseAddress(true)
catch {
case _: SocketException =>
// some implementations may not allow SO_REUSEADDR to be set
}

socket.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 0))

val port = socket.getLocalPort
(port, TCP.listen(() => socket, ec))
},
ec => port => TCP.connect(TCP.defaultSocket(new InetSocketAddress("localhost", port)), ec)
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,6 @@ import java.util.concurrent.{Executors, Semaphore}
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class EchoServerTestTCP extends EchoCommunicationTest(
{ ec =>
val socket = new ServerSocket

try socket.setReuseAddress(true)
catch {
case _: SocketException =>
// some implementations may not allow SO_REUSEADDR to be set
}

socket.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 0))

val port = socket.getLocalPort
(port, TCP.listen(() => socket, ec))
},
ec => port => TCP.connect(TCP.defaultSocket(new InetSocketAddress("localhost", port)), ec)
)

def printErrors[T](cb: T => Unit): Callback[T] =
case Success(mb) => cb(mb)
case Failure(ex) => ex match
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package ex2021encfixtodo.sync

import channels.TCP
import channels.NioTCP
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import com.google.crypto.tink.aead.AeadConfig
import com.google.crypto.tink.{Aead, CleartextKeysetHandle, JsonKeysetReader, JsonKeysetWriter, KeyTemplates, KeysetHandle, LegacyKeysetSerialization}
import rdts.base.{Bottom, Lattice, LocalUid}
import rdts.dotted.{Dotted, HasDots, Obrem}
import replication.DeltaDissemination
import replication.JsoniterCodecs.given

import java.net.{InetSocketAddress, Socket, URI}
import java.nio.file.{Files, Path}
Expand All @@ -16,8 +17,6 @@ import scala.concurrent.ExecutionContext
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Random, Try}

import replication.JsoniterCodecs.given

class AeadTranslation(aead: com.google.crypto.tink.Aead) extends replication.Aead {
override def encrypt(data: Array[Byte], associated: Array[Byte]): Array[Byte] = aead.encrypt(data, associated)

Expand Down Expand Up @@ -59,7 +58,11 @@ class DataManagerConnectionManager[State: { JsonValueCodec, Lattice, Bottom, Has
)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)

dataManager.addLatentConnection(TCP.listen(TCP.defaultServerSocket(new InetSocketAddress("127.0.0.1", port)), ec))
val niotcp = new NioTCP {}

dataManager.addLatentConnection(niotcp.listen(
niotcp.defaultServerSocketChannel(new InetSocketAddress("127.0.0.1", port))
))

override val localReplicaId: String = replicaId.toString

Expand All @@ -68,7 +71,7 @@ class DataManagerConnectionManager[State: { JsonValueCodec, Lattice, Bottom, Has
}

override def connectToReplica(remoteReplicaId: String, uri: URI): Unit = {
dataManager.addLatentConnection(TCP.connect(() => new Socket(uri.getHost, uri.getPort), ec))
dataManager.addLatentConnection(niotcp.connect(niotcp.defaultSocketChannel(new InetSocketAddress(uri.getHost, uri.getPort))))
}

override def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package probench

import channels.{Abort, NioTCP, TCP, UDP}
import channels.{Abort, NioTCP, UDP}
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}
import de.rmgk.options.*
Expand All @@ -9,7 +9,7 @@ import probench.clients.{ClientCLI, EtcdClient, ProBenchClient}
import probench.data.{ClientNodeState, ClusterData, KVOperation}
import rdts.base.Uid
import rdts.datatypes.experiments.protocols.Membership
import rdts.datatypes.experiments.protocols.simplified.{GeneralizedPaxos, Paxos}
import rdts.datatypes.experiments.protocols.simplified.Paxos
import replication.{FileConnection, ProtocolMessage}

import java.net.{DatagramSocket, InetSocketAddress}
Expand Down Expand Up @@ -113,18 +113,6 @@ object cli {
subcommand("node", "starts a cluster node") {
val node = KeyValueReplica(name.value, initialClusterIds.value.toSet)

node.addClientConnection(TCP.listen(TCP.defaultServerSocket(socketPath("localhost", clientPort.value)), ec))
node.addClusterConnection(TCP.listen(TCP.defaultServerSocket(socketPath("localhost", peerPort.value)), ec))

Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000)

cluster.value.foreach { (ip, port) =>
node.addClusterConnection(TCP.connect(TCP.defaultSocket(socketPath(ip, port)), ec))
}
},
subcommand("nio-node", "starts a cluster node") {
val node = KeyValueReplica(name.value, initialClusterIds.value.toSet)

val nioTCP = NioTCP()
ec.execute(() => nioTCP.loopSelection(Abort()))

Expand Down Expand Up @@ -158,14 +146,6 @@ object cli {
subcommand("client", "starts a client to interact with a node") {
val client = ProBenchClient(name.value)

val (ip, port) = clientNode.value
client.addLatentConnection(TCP.connect(TCP.defaultSocket(socketPath(ip, port)), ec))

ClientCLI(name.value, client).startCLI()
},
subcommand("nio-client", "starts a client to interact with a node") {
val client = ProBenchClient(name.value)

val (ip, port) = clientNode.value

val nioTCP = NioTCP()
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ lazy val replication = crossProject(JVMPlatform, JSPlatform, NativePlatform).in(
).jvmSettings(
Dependencies.tink,
Dependencies.bouncyCastle,
Test / fork := true,
)

lazy val todolist = project.in(file("Modules/Examples/TodoMVC"))
Expand Down

0 comments on commit 7b136a1

Please sign in to comment.