Skip to content

Commit

Permalink
change in server
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdullinAM committed Aug 17, 2023
1 parent 448fbbf commit 9be6be1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,13 @@ interface Worker2MasterConnection : AutoCloseable {
// Impl

class MasterProtocolSocketHandler(
clientPort: Int
override val clientPort: Int
) : MasterProtocolHandler {
private val clientListener = ServerSocket(clientPort)
private val workerListener = ServerSocket(0)

override val clientPort get() = clientListener.localPort
override val workerPort get() = workerListener.localPort

override fun receiveClientConnection(): Master2ClientConnection {
val socket = clientListener.accept()
val socket = Socket("localhost", clientPort)
return Master2ClientSocketConnection(socket)
}

Expand All @@ -79,6 +76,9 @@ class Master2ClientSocketConnection(private val socket: Socket) : Master2ClientC
}

override fun receive(): String {
while (!reader.ready()) {
log.debug("Waiting, reader is not ready yet")
}
return reader.readLine().also {
log.debug("Master received a request $it")
}
Expand All @@ -88,6 +88,7 @@ class Master2ClientSocketConnection(private val socket: Socket) : Master2ClientC
log.debug("Master sends a response")
writer.write(result)
writer.newLine()
writer.flush()
}

override fun close() {
Expand Down Expand Up @@ -127,21 +128,12 @@ class Master2WorkerSocketConnection(private val socket: Socket) : Master2WorkerC
@InternalSerializationApi
class Client2MasterSocketConnection(
val serializer: KexSerializer,
private val port: Int
private val socket: Socket
) : Client2MasterConnection {
private lateinit var socket: Socket
private lateinit var writer: BufferedWriter
private lateinit var reader: BufferedReader

override fun connect(timeout: Duration): Boolean {
log.debug("Trying to connect to master at port $port")
socket = Socket()
try {
socket.connect(InetSocketAddress("localhost", port), timeout.inWholeMilliseconds.toInt())
} catch (_: SocketTimeoutException) {
log.error("Could not connect to master, timeout exception")
return false
}
writer = socket.getOutputStream().bufferedWriter()
reader = socket.getInputStream().bufferedReader()
log.debug("Connected to master")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExecutorMaster(
reInit()

val request = clientConnection.receive()
log.debug("Worker $id receiver request $request")
log.debug("Worker $id received request $request")
workerConnection.send(request)
val result = try {
workerConnection.receive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,12 @@ import kotlin.time.Duration.Companion.seconds
@InternalSerializationApi
internal object ExecutorMasterController : AutoCloseable {
private lateinit var process: Process
private val masterPort: Int
private val controllerSocket = ServerSocket(0)
private val serializers = mutableMapOf<ClassManager, KexSerializer>()

init {
val tempSocket = ServerSocket(0)
masterPort = tempSocket.localPort
tempSocket.close()
// this is fucked up
Thread.sleep(2000)
Runtime.getRuntime().addShutdownHook(thread(start = false) {
if (process.isAlive) process.destroy()
if (::process.isInitialized && process.isAlive) process.destroy()
})
}

Expand Down Expand Up @@ -81,14 +76,13 @@ internal object ExecutorMasterController : AutoCloseable {
executorKlass,
"--output", "${outputDir.toAbsolutePath()}",
"--config", "$executorConfigPath",
"--port", "$masterPort",
"--port", "${controllerSocket.localPort}",
"--kfgClassPath", kfgClassPath.joinToString(getPathSeparator()),
"--workerClassPath", workerClassPath.joinToString(getPathSeparator()),
"--numOfWorkers", "$numberOfWorkers"
)
log.debug("Starting executor master process with command: '${pb.command().joinToString(" ")}'")
process = pb.start()
Thread.sleep(1000)
}

fun getClientConnection(ctx: ExecutionContext): Client2MasterConnection {
Expand All @@ -97,7 +91,7 @@ internal object ExecutorMasterController : AutoCloseable {
ctx.cm,
prettyPrint = false
)
}, masterPort)
}, controllerSocket.accept())
}

override fun close() {
Expand Down

0 comments on commit 9be6be1

Please sign in to comment.