Skip to content

Commit

Permalink
Merge pull request #205 from ZettaScaleLabs/fix/scouting
Browse files Browse the repository at this point in the history
fix(scout): error when closing scout + API rework
  • Loading branch information
milyin authored Sep 10, 2024
2 parents 0b80e79 + c1437f3 commit e1972de
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 8 deletions.
11 changes: 11 additions & 0 deletions zenoh-jni/src/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,14 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_scoutViaJNI(
null()
})
}

/// Frees the scout.
#[no_mangle]
#[allow(non_snake_case)]
pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIScout_00024Companion_freePtrViaJNI(
_env: JNIEnv,
_: JClass,
scout_ptr: *const Scout<()>,
) {
Arc::from_raw(scout_ptr);
}
12 changes: 6 additions & 6 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/Zenoh.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ object Zenoh {
* @param callback [Callback] to be run when receiving a [Hello] message.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
* @return A result with the [Scout] object.
*/
fun scout(
callback: Callback<Hello>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<Unit> {
): Result<Scout<Unit>> {
ZenohLoad
return JNIScout.scout(whatAmI = whatAmI, callback = callback, receiver = Unit, config = config)
}
Expand All @@ -55,13 +55,13 @@ object Zenoh {
* @param handler [Handler] to handle incoming [Hello] messages.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
* @return A result with the [Scout] object.
*/
fun <R> scout(
handler: Handler<Hello, R>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<R> {
): Result<Scout<R>> {
ZenohLoad
return JNIScout.scout(
whatAmI = whatAmI,
Expand All @@ -80,13 +80,13 @@ object Zenoh {
* @param channel [Channel] upon which the incoming [Hello] messages will be piped.
* @param whatAmI [WhatAmI] configuration: it indicates the role of the zenoh node sending the HELLO message.
* @param config Optional [Config] for the scout.
* @return A [Scout] object.
* @return A result with the [Scout] object.
*/
fun scout(
channel: Channel<Hello>,
whatAmI: Set<WhatAmI> = setOf(Peer, Router),
config: Config? = null
): Scout<Channel<Hello>> {
): Result<Scout<Channel<Hello>>> {
ZenohLoad
val handler = ChannelHandler(channel)
return JNIScout.scout(
Expand Down
4 changes: 2 additions & 2 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/jni/JNIScout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class JNIScout(private val ptr: Long) {
callback: Callback<Hello>,
config: Config?,
receiver: R
): Scout<R> {
): Result<Scout<R>> = runCatching {
val scoutCallback = JNIScoutCallback { whatAmI2: Int, id: String, locators: List<String> ->
callback.run(Hello(WhatAmI.fromInt(whatAmI2), ZenohID(id), locators))
}
val binaryWhatAmI: Int = whatAmI.map { it.value }.reduce { acc, it -> acc or it }
val ptr = scoutViaJNI(binaryWhatAmI, scoutCallback, config?.jniConfig?.ptr)
return Scout(receiver, JNIScout(ptr))
Scout(receiver, JNIScout(ptr))
}

@Throws(Exception::class)
Expand Down
7 changes: 7 additions & 0 deletions zenoh-kotlin/src/commonMain/kotlin/io/zenoh/scouting/Scout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import io.zenoh.jni.JNIScout
* Scout spawns a task that periodically sends scout messages and waits for Hello replies.
* Drop the returned Scout to stop the scouting task.
*
* To launch a scout, use [io.zenoh.Zenoh.scout]:
* ```kotlin
* Zenoh.scout(callback = { hello ->
* println(hello)
* }).getOrThrow()
* ```
*
* @param R The receiver type.
* @param receiver Receiver to handle incoming hello messages.
*/
Expand Down
54 changes: 54 additions & 0 deletions zenoh-kotlin/src/commonTest/kotlin/io/zenoh/ScoutTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh

import io.zenoh.scouting.Hello
import kotlinx.coroutines.channels.Channel
import kotlin.test.Test
import kotlin.test.assertNotNull

class ScoutTest {

@Test
fun `scouting is declared and undeclared properly test`() {
val scout = Zenoh.scout(Channel()).getOrThrow()
scout.close()
}

@Test
fun `scouting detects session test`() {

val config = Config.fromJson("""
{
"mode": "peer",
"connect": {
"endpoints": ["tcp/localhost:7450"]
},
}
""".trimIndent()).getOrThrow()

val session = Session.open(config).getOrThrow()

var hello: Hello? = null
Zenoh.scout(callback = {
hello = it
}).getOrThrow()

Thread.sleep(1000)

assertNotNull(hello)
session.close()
}
}

0 comments on commit e1972de

Please sign in to comment.