diff --git a/core/src/main/kotlin/org/vorpal/research/kex/trace/symbolic/protocol/MasterProtocolHandler.kt b/core/src/main/kotlin/org/vorpal/research/kex/trace/symbolic/protocol/MasterProtocolHandler.kt index cd663306e..0912a446c 100644 --- a/core/src/main/kotlin/org/vorpal/research/kex/trace/symbolic/protocol/MasterProtocolHandler.kt +++ b/core/src/main/kotlin/org/vorpal/research/kex/trace/symbolic/protocol/MasterProtocolHandler.kt @@ -50,16 +50,13 @@ interface Worker2MasterConnection : AutoCloseable { // Impl class MasterProtocolSocketHandler( - clientPort: Int + override val clientPort: Int ) : MasterProtocolHandler { - private val clientListener = ServerSocket(clientPort) private val workerListener = ServerSocket(0) - - override val clientPort get() = clientListener.localPort override val workerPort get() = workerListener.localPort override fun receiveClientConnection(): Master2ClientConnection { - val socket = clientListener.accept() + val socket = Socket("localhost", clientPort) return Master2ClientSocketConnection(socket) } @@ -79,6 +76,9 @@ class Master2ClientSocketConnection(private val socket: Socket) : Master2ClientC } override fun receive(): String { + while (!reader.ready()) { + log.debug("Waiting, reader is not ready yet") + } return reader.readLine().also { log.debug("Master received a request $it") } @@ -88,6 +88,7 @@ class Master2ClientSocketConnection(private val socket: Socket) : Master2ClientC log.debug("Master sends a response") writer.write(result) writer.newLine() + writer.flush() } override fun close() { @@ -127,21 +128,12 @@ class Master2WorkerSocketConnection(private val socket: Socket) : Master2WorkerC @InternalSerializationApi class Client2MasterSocketConnection( val serializer: KexSerializer, - private val port: Int + private val socket: Socket ) : Client2MasterConnection { - private lateinit var socket: Socket private lateinit var writer: BufferedWriter private lateinit var reader: BufferedReader override fun connect(timeout: Duration): Boolean { - log.debug("Trying to connect to master at port $port") - socket = Socket() - try { - socket.connect(InetSocketAddress("localhost", port), timeout.inWholeMilliseconds.toInt()) - } catch (_: SocketTimeoutException) { - log.error("Could not connect to master, timeout exception") - return false - } writer = socket.getOutputStream().bufferedWriter() reader = socket.getInputStream().bufferedReader() log.debug("Connected to master") diff --git a/kex-executor/src/main/kotlin/org/vorpal/research/kex/worker/ExecutorMaster.kt b/kex-executor/src/main/kotlin/org/vorpal/research/kex/worker/ExecutorMaster.kt index 4194aa1bb..ba9f3aba3 100644 --- a/kex-executor/src/main/kotlin/org/vorpal/research/kex/worker/ExecutorMaster.kt +++ b/kex-executor/src/main/kotlin/org/vorpal/research/kex/worker/ExecutorMaster.kt @@ -101,7 +101,7 @@ class ExecutorMaster( reInit() val request = clientConnection.receive() - log.debug("Worker $id receiver request $request") + log.debug("Worker $id received request $request") workerConnection.send(request) val result = try { workerConnection.receive() diff --git a/kex-runner/src/main/kotlin/org/vorpal/research/kex/trace/runner/SymbolicExternalTracingRunner.kt b/kex-runner/src/main/kotlin/org/vorpal/research/kex/trace/runner/SymbolicExternalTracingRunner.kt index a0efe864e..25f392128 100644 --- a/kex-runner/src/main/kotlin/org/vorpal/research/kex/trace/runner/SymbolicExternalTracingRunner.kt +++ b/kex-runner/src/main/kotlin/org/vorpal/research/kex/trace/runner/SymbolicExternalTracingRunner.kt @@ -28,17 +28,12 @@ import kotlin.time.Duration.Companion.seconds @InternalSerializationApi internal object ExecutorMasterController : AutoCloseable { private lateinit var process: Process - private val masterPort: Int + private val controllerSocket = ServerSocket(0) private val serializers = mutableMapOf() init { - val tempSocket = ServerSocket(0) - masterPort = tempSocket.localPort - tempSocket.close() - // this is fucked up - Thread.sleep(2000) Runtime.getRuntime().addShutdownHook(thread(start = false) { - if (process.isAlive) process.destroy() + if (::process.isInitialized && process.isAlive) process.destroy() }) } @@ -81,14 +76,13 @@ internal object ExecutorMasterController : AutoCloseable { executorKlass, "--output", "${outputDir.toAbsolutePath()}", "--config", "$executorConfigPath", - "--port", "$masterPort", + "--port", "${controllerSocket.localPort}", "--kfgClassPath", kfgClassPath.joinToString(getPathSeparator()), "--workerClassPath", workerClassPath.joinToString(getPathSeparator()), "--numOfWorkers", "$numberOfWorkers" ) log.debug("Starting executor master process with command: '${pb.command().joinToString(" ")}'") process = pb.start() - Thread.sleep(1000) } fun getClientConnection(ctx: ExecutionContext): Client2MasterConnection { @@ -97,7 +91,7 @@ internal object ExecutorMasterController : AutoCloseable { ctx.cm, prettyPrint = false ) - }, masterPort) + }, controllerSocket.accept()) } override fun close() {