Skip to content

Commit

Permalink
fix(udp): Now throwing errors when trying to send from an OSCUDPSocke…
Browse files Browse the repository at this point in the history
…t that is only configured to receive. Send is now a suspending function. Updated dependencies.
  • Loading branch information
Burtan committed Sep 5, 2024
1 parent c772082 commit defbf5b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ gradle-app.setting
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath
.kotlin
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import de.frederikbertling.kosc.core.spec.OSCPacket
* The unit of transmission of OSC is an OSC Packet. Any application that sends OSC Packets is an OSC Client
*/
interface OSCClient {
fun send(packet: OSCPacket)
suspend fun send(packet: OSCPacket)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import kotlinx.coroutines.flow.SharedFlow
*/
interface OSCServer {
val packetFlow: SharedFlow<OSCPacket>
val errorFlow: SharedFlow<Throwable>
val receivingErrorFlow: SharedFlow<Throwable>
}
12 changes: 6 additions & 6 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[versions]
agp = "8.2.2"
coroutines = "1.8.0"
kotlin = "1.9.24"
kotest = "5.9.0"
ktor = "3.0.0-beta-1"
kx-io = "0.3.4"
agp = "8.6.0"
coroutines = "1.8.1"
kotlin = "2.0.20"
kotest = "5.9.1"
ktor = "3.0.0-beta-2"
kx-io = "0.5.3"
maven-publish = "0.28.0"

[libraries]
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ import de.frederikbertling.kosc.core.serialization.OSCSerializer
import de.frederikbertling.kosc.core.spec.OSCPacket
import de.frederikbertling.kosc.core.transport.OSCClient
import de.frederikbertling.kosc.core.transport.OSCServer
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.ASocket
import io.ktor.network.sockets.ConnectedDatagramSocket
import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.SocketAddress
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.utils.io.core.ByteReadPacket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import kotlinx.io.Buffer
import kotlinx.io.readByteArray

@Suppress("unused")
class OSCUDPSocket private constructor(
Expand Down Expand Up @@ -56,12 +67,12 @@ class OSCUDPSocket private constructor(
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
) : this(InetSocketAddress("localhost", portIn), null, null, scope)

private var clientSocket: ConnectedDatagramSocket? = null
private var serverSocket: BoundDatagramSocket? = null
private var clientSocket: (ConnectedDatagramSocket)? = null
private var serverSocket: (ASocket)? = null
private val _packetFlow = MutableSharedFlow<OSCPacket>()
private val _errorFlow = MutableSharedFlow<Throwable>()
private val _receivingErrorFlow = MutableSharedFlow<Throwable>()
override val packetFlow = _packetFlow.asSharedFlow()
override val errorFlow = _errorFlow.asSharedFlow()
override val receivingErrorFlow = _receivingErrorFlow.asSharedFlow()

init {
val selectorManager = SelectorManager(Dispatchers.IO)
Expand All @@ -87,47 +98,52 @@ class OSCUDPSocket private constructor(
.bind(localAddress)
}

if (socket is ConnectedDatagramSocket)
// Socket is always a ConnectedDatagramSocket, even when using "bound".
// So, also check for remoteAddress
if (remoteAddress != null && socket is ConnectedDatagramSocket)
clientSocket = socket
else if (socket is BoundDatagramSocket)

if (localAddress != null) {
serverSocket = socket

scope.launch {
while (!socket.isClosed) {
try {
val datagram = socket
.receive()
scope.launch {
while (!socket.isClosed) {
try {
val datagram = socket
.receive()

val oscPacket = datagram
.packet
.use {
val buffer = Buffer()
buffer.write(it.readBytes())
OSCSerializer.deserialize(buffer)
}
val oscPacket = datagram
.packet
.readByteArray()
.let {
val buffer = Buffer()
buffer.write(it)
OSCSerializer.deserialize(buffer)
}

_packetFlow.emit(oscPacket)
} catch (e: Throwable) {
_errorFlow.emit(e)
_packetFlow.emit(oscPacket)
} catch (e: Throwable) {
_receivingErrorFlow.emit(e)
}
}
}
}
}

override fun send(packet: OSCPacket) {
scope.launch {
clientSocket?.let {
try {
val data = OSCSerializer.serialize(packet)
val datagram = Datagram(
packet = ByteReadPacket(data),
address = it.remoteAddress
)
it.send(datagram)
} catch (e: Throwable) {
_errorFlow.emit(e)
}
override suspend fun send(packet: OSCPacket) {
clientSocket?.let {
try {
val data = OSCSerializer.serialize(packet)
val datagram = Datagram(
packet = ByteReadPacket(data),
address = it.remoteAddress
)
it.send(datagram)
} catch (e: Throwable) {
_receivingErrorFlow.emit(e)
}
} ?: run {
throw IllegalStateException("${this@OSCUDPSocket} is not configured to send OSCPackets (No remote host).")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import de.frederikbertling.kosc.core.spec.OSCMessage
import de.frederikbertling.kosc.core.spec.OSCPacket
import de.frederikbertling.kosc.core.spec.args.OSCFloat32
import de.frederikbertling.kosc.udp.OSCUDPSocket
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.ktor.network.sockets.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.launch
import kotlin.random.Random
import kotlin.random.nextInt

class OSCUDPSocketTest : StringSpec() {

private val testPacket = OSCMessage("/test", OSCFloat32(1.93127f))

init {
"OSCUDPSocket test #1" {
val port = Random.nextInt(8080..8090)
val port = 8080
val listener = OSCUDPSocket(port)
val client = OSCUDPSocket("localhost", port)
testClient(client, listener)
Expand All @@ -29,7 +28,7 @@ class OSCUDPSocketTest : StringSpec() {
}

"OSCUDPSocket test #2" {
val port = Random.nextInt(8080..8090)
val port = 8081
val listener = OSCUDPSocket(port)
val client = OSCUDPSocket(InetSocketAddress("localhost", port))
testClient(client, listener)
Expand All @@ -38,21 +37,32 @@ class OSCUDPSocketTest : StringSpec() {
}

"OSCUDPSocket test #3" {
val port = Random.nextInt(8080..8090)
val port = 8082
val listenerClient = OSCUDPSocket("localhost", port, port)
testClient(listenerClient, listenerClient)
listenerClient.close()
}

"OSCUDPSocket test #4" {
val port = Random.nextInt(8080..8090)
val port = 8083
val listenerClient = OSCUDPSocket(
localAddress = InetSocketAddress("localhost", port),
remoteAddress = InetSocketAddress("localhost", port)
)
testClient(listenerClient, listenerClient)
listenerClient.close()
}

"OSCUDPSocket test fail on send when configured to receive only" {
val port = 8084
val listener = OSCUDPSocket(port)

shouldThrow<IllegalStateException> {
listener.send(testPacket)
}

listener.close()
}
}

private suspend fun testClient(client: OSCUDPSocket, listener: OSCUDPSocket) = coroutineScope {
Expand Down

0 comments on commit defbf5b

Please sign in to comment.