Skip to content

Commit

Permalink
#impl hierarchy for actor
Browse files Browse the repository at this point in the history
  • Loading branch information
phuvh committed Jun 30, 2022
1 parent 163b7a6 commit be2b22e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/main/kotlin/org/magicghostvu/actor/Behavior.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class MActorRef<in Message>(private val internalChannel: SendChannel<Message>) {
open class Behavior<in T> {

}

abstract class AbstractBehaviour<T>(protected val scope: CoroutineScope) : Behavior<T>() {
@OptIn(ObsoleteCoroutinesApi::class)
abstract class AbstractBehaviour<T>(protected val scope: ActorScope<T>) : Behavior<T>() {
abstract suspend fun onReceive(message: T): Behavior<T>
}

Expand Down
20 changes: 18 additions & 2 deletions src/main/kotlin/org/magicghostvu/actor/Behaviors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Behaviors {
// nếu createNewScope = true thì sẽ create một scope mới cho actor kèm với supervisor
// actor crash sẽ không gây stop scope ban đầu
@OptIn(ObsoleteCoroutinesApi::class)
fun <T> CoroutineScope.spawn(
private fun <T> CoroutineScope.spawn(
debug: Boolean = false,
createNewScope: Boolean = false,
factory: suspend () -> Behavior<T>
Expand All @@ -66,7 +66,7 @@ object Behaviors {

suspend fun unwrapBehavior(behavior: Behavior<T>): Behavior<T> {
var tmp: Behavior<T> = behavior
while (tmp !is AbstractBehaviour<T>) {
while (tmp is TimerBehavior<T> || tmp is SetUpBehavior<T>) {
if (tmp is TimerBehavior<T>) {
tmp = tmp.timerFunc(timerMan)
} else if (tmp is SetUpBehavior<T>) {
Expand Down Expand Up @@ -176,4 +176,20 @@ object Behaviors {
return MActorRef(internalChannel as SendChannel<T>)
}


fun <T> CoroutineScope.spawnChild(
debug: Boolean = false,
factory: suspend () -> Behavior<T>
): MActorRef<T> {
return spawn(debug, createNewScope = false, factory)
}

fun <T> CoroutineScope.spawnNew(
debug: Boolean = false,
factory: suspend () -> Behavior<T>
): MActorRef<T> {
return spawn(debug, createNewScope = true, factory)
}


}
100 changes: 47 additions & 53 deletions src/main/kotlin/org/magicghostvu/run/RunActor.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package org.magicghostvu.run

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ActorScope
import org.magicghostvu.actor.AbstractBehaviour
import org.magicghostvu.actor.Behavior
import org.magicghostvu.actor.Behaviors

import org.magicghostvu.actor.Behaviors.spawn
import org.magicghostvu.actor.Behaviors.spawnChild
import org.magicghostvu.actor.Behaviors.spawnNew
import org.magicghostvu.actor.MActorRef
import org.magicghostvu.mlogger.ActorLogger
import kotlin.math.log


sealed class Msg
class Msg1 : Msg()
class Msg2(val from: String) : Msg()
class Msg3(val repTo: CompletableDeferred<MActorRef<Msg>>) : Msg()
class Msg4() : Msg()

class State1(scope: CoroutineScope, var i: Int) : AbstractBehaviour<Msg>(scope) {
@OptIn(ObsoleteCoroutinesApi::class)
class State1(scope: ActorScope<Msg>, var i: Int) : AbstractBehaviour<Msg>(scope) {
private val logger = ActorLogger.logger

private lateinit var child: MActorRef<Msg>
Expand All @@ -28,8 +30,8 @@ class State1(scope: CoroutineScope, var i: Int) : AbstractBehaviour<Msg>(scope)
if (message is Msg1) {
i++
if (!setChild) {
child = scope.spawn() {
State2(scope, 0.0)
child = scope.spawnChild {
State2.setup()
}
logger.info("set child success")
setChild = true
Expand All @@ -40,15 +42,20 @@ class State1(scope: CoroutineScope, var i: Int) : AbstractBehaviour<Msg>(scope)
val result = message.repTo
result.complete(child)
return Behaviors.same()
} else if (message is Msg2) {
logger.info("msg 2 come to parent")
//logger.info("stop parent")
//throw IllegalArgumentException("crash parent")
return Behaviors.same()
} else {
logger.info("stop parent")
//throw IllegalArgumentException("crash parent")
return Behaviors.stopped()
}
}
}

class State2(scope: CoroutineScope, var d: Double) : AbstractBehaviour<Msg>(scope) {
@OptIn(ObsoleteCoroutinesApi::class)
class State2(scope: ActorScope<Msg>, var d: Double) : AbstractBehaviour<Msg>(scope) {
private val logger = ActorLogger.logger
override suspend fun onReceive(message: Msg): Behavior<Msg> {
return when (message) {
Expand All @@ -57,18 +64,34 @@ class State2(scope: CoroutineScope, var d: Double) : AbstractBehaviour<Msg>(scop
Behaviors.same()
}
is Msg2 -> {
logger.info("msg 2 come from {}", message.from)
logger.info("child received msg 2 come from {}", message.from)
d++
logger.info("d is {}", d)

throw IllegalArgumentException("crash child")
Behaviors.same()
//throw IllegalArgumentException("crash child")
//Behaviors.stopped()
}
is Msg3 -> {
logger.info("msg 3 come")
//Behaviors.stopped<Msg>()
throw IllegalArgumentException("crash child")
}
is Msg4 -> {
logger.info("stop child")
Behaviors.stopped()
}
}
}

companion object {
fun setup(): Behavior<Msg> {
return Behaviors.withTimer { timer ->
timer.startFixedRateTimer(Msg1(), 0, 1000)

timer.startSingleTimer(Msg4(), 5000)

Behaviors.setUp { State2(it, 0.0) }
}
}
}
}
Expand All @@ -78,18 +101,6 @@ class State2(scope: CoroutineScope, var d: Double) : AbstractBehaviour<Msg>(scop
fun main(arr: Array<String>) {
runBlocking {
val logger = ActorLogger.logger
/*val actorRef = spawn {
delay(1000)
State1(0)
}
actorRef.tell(Msg2("normal"))
actorRef.tell(Msg1())
delay(5000)
logger.info("done delay")
actorRef.tell(Msg1())
actorRef.tell(Msg2("normal"))*/


launch {
Expand All @@ -99,39 +110,22 @@ fun main(arr: Array<String>) {
}
}

// khi dùng supervisor thì nó nên được add vào sau context hiện tại

val parent = spawn {
Behaviors.setUp<Msg> {

}
}

val result = CompletableDeferred<MActorRef<Msg>>()
delay(500)
parent.tell(Msg3(result))


val child = result.await()
logger.info("received child")
launch(SupervisorJob()) {
while (true) {
child.tell(Msg1())
delay(1000)
val parent = spawnNew<Msg> {
Behaviors.withTimer { timer ->
timer.startFixedRateTimer(
Msg2("ádasd"),
0,
1000
)
Behaviors.setUp {
State1(it, 0)
}
}
}
parent.tell(Msg1())

launch {
delay(5000)
child.tell(Msg2("crash child"))
}

delay(5000)
parent.tell(Msg2("crash parent"))



/*delay(5000)
parent.tell(Msg4())*/

delay(1000000)
}
}

0 comments on commit be2b22e

Please sign in to comment.