Skip to content

Commit

Permalink
fix(udp): Now throwing errors when trying to send from an OSCUDPSocke…
Browse files Browse the repository at this point in the history
…t that is only configured to receive. Send is now a suspending function.
  • Loading branch information
Burtan committed Sep 5, 2024
1 parent c772082 commit dd95ecc
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import de.frederikbertling.kosc.core.spec.OSCPacket
* The unit of transmission of OSC is an OSC Packet. Any application that sends OSC Packets is an OSC Client
*/
interface OSCClient {
fun send(packet: OSCPacket)
suspend fun send(packet: OSCPacket)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import kotlinx.coroutines.flow.SharedFlow
*/
interface OSCServer {
val packetFlow: SharedFlow<OSCPacket>
val errorFlow: SharedFlow<Throwable>
val receivingErrorFlow: SharedFlow<Throwable>
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class OSCUDPSocket private constructor(
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
) : this(InetSocketAddress("localhost", portIn), null, null, scope)

private var clientSocket: ConnectedDatagramSocket? = null
private var serverSocket: BoundDatagramSocket? = null
private var clientSocket: (ConnectedDatagramSocket)? = null
private var serverSocket: (ASocket)? = null
private val _packetFlow = MutableSharedFlow<OSCPacket>()
private val _errorFlow = MutableSharedFlow<Throwable>()
private val _receivingErrorFlow = MutableSharedFlow<Throwable>()
override val packetFlow = _packetFlow.asSharedFlow()
override val errorFlow = _errorFlow.asSharedFlow()
override val receivingErrorFlow = _receivingErrorFlow.asSharedFlow()

init {
val selectorManager = SelectorManager(Dispatchers.IO)
Expand All @@ -87,47 +87,51 @@ class OSCUDPSocket private constructor(
.bind(localAddress)
}

if (socket is ConnectedDatagramSocket)
// Socket is always a ConnectedDatagramSocket, even when using "bound".
// So, also check for remoteAddress
if (remoteAddress != null && socket is ConnectedDatagramSocket)
clientSocket = socket
else if (socket is BoundDatagramSocket)

if (localAddress != null) {
serverSocket = socket

scope.launch {
while (!socket.isClosed) {
try {
val datagram = socket
.receive()
scope.launch {
while (!socket.isClosed) {
try {
val datagram = socket
.receive()

val oscPacket = datagram
.packet
.use {
val buffer = Buffer()
buffer.write(it.readBytes())
OSCSerializer.deserialize(buffer)
}
val oscPacket = datagram
.packet
.use {
val buffer = Buffer()
buffer.write(it.readBytes())
OSCSerializer.deserialize(buffer)
}

_packetFlow.emit(oscPacket)
} catch (e: Throwable) {
_errorFlow.emit(e)
_packetFlow.emit(oscPacket)
} catch (e: Throwable) {
_receivingErrorFlow.emit(e)
}
}
}
}
}

override fun send(packet: OSCPacket) {
scope.launch {
clientSocket?.let {
try {
val data = OSCSerializer.serialize(packet)
val datagram = Datagram(
packet = ByteReadPacket(data),
address = it.remoteAddress
)
it.send(datagram)
} catch (e: Throwable) {
_errorFlow.emit(e)
}
override suspend fun send(packet: OSCPacket) {
clientSocket?.let {
try {
val data = OSCSerializer.serialize(packet)
val datagram = Datagram(
packet = ByteReadPacket(data),
address = it.remoteAddress
)
it.send(datagram)
} catch (e: Throwable) {
_receivingErrorFlow.emit(e)
}
} ?: run {
throw IllegalStateException("${this@OSCUDPSocket} is not configured to send OSCPackets (No remote host).")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import de.frederikbertling.kosc.core.spec.OSCMessage
import de.frederikbertling.kosc.core.spec.OSCPacket
import de.frederikbertling.kosc.core.spec.args.OSCFloat32
import de.frederikbertling.kosc.udp.OSCUDPSocket
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.ktor.network.sockets.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.launch
import kotlin.random.Random
import kotlin.random.nextInt

class OSCUDPSocketTest : StringSpec() {

private val testPacket = OSCMessage("/test", OSCFloat32(1.93127f))

init {
"OSCUDPSocket test #1" {
val port = Random.nextInt(8080..8090)
val port = 8080
val listener = OSCUDPSocket(port)
val client = OSCUDPSocket("localhost", port)
testClient(client, listener)
Expand All @@ -29,7 +28,7 @@ class OSCUDPSocketTest : StringSpec() {
}

"OSCUDPSocket test #2" {
val port = Random.nextInt(8080..8090)
val port = 8081
val listener = OSCUDPSocket(port)
val client = OSCUDPSocket(InetSocketAddress("localhost", port))
testClient(client, listener)
Expand All @@ -38,21 +37,32 @@ class OSCUDPSocketTest : StringSpec() {
}

"OSCUDPSocket test #3" {
val port = Random.nextInt(8080..8090)
val port = 8082
val listenerClient = OSCUDPSocket("localhost", port, port)
testClient(listenerClient, listenerClient)
listenerClient.close()
}

"OSCUDPSocket test #4" {
val port = Random.nextInt(8080..8090)
val port = 8083
val listenerClient = OSCUDPSocket(
localAddress = InetSocketAddress("localhost", port),
remoteAddress = InetSocketAddress("localhost", port)
)
testClient(listenerClient, listenerClient)
listenerClient.close()
}

"OSCUDPSocket test fail on send when configured to receive only" {
val port = 8084
val listener = OSCUDPSocket(port)

shouldThrow<IllegalStateException> {
listener.send(testPacket)
}

listener.close()
}
}

private suspend fun testClient(client: OSCUDPSocket, listener: OSCUDPSocket) = coroutineScope {
Expand Down

0 comments on commit dd95ecc

Please sign in to comment.