Skip to content

Commit

Permalink
feat: implement handle_cast and handle_continue (#88)
Browse files Browse the repository at this point in the history
* Implement handling of continue-style requests
* Add handle_cast
  • Loading branch information
pjlast authored Jul 3, 2024
1 parent 8b70e2a commit ac9e22f
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 14 deletions.
35 changes: 30 additions & 5 deletions riot/lib/gen_server.ml
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
open Global

type 'res req = ..
type cast_req = ..
type cont_req = ..

type Message.t +=
| Call : Pid.t * 'res Ref.t * 'res req -> Message.t
| Cast : cast_req -> Message.t
| Reply : 'res Ref.t * 'res -> Message.t

type 'state init_result = Ok of 'state | Error | Ignore

type ('res, 'state) call_result =
| Reply of ('res * 'state)
| Reply_continue of ('res * 'state * cont_req)

type 'state cast_result = No_reply of 'state

module type Impl = sig
type args
type state

val init : args -> state init_result
val handle_call : 'res. 'res req -> Pid.t -> state -> 'res * state

val handle_call :
'res. 'res req -> Pid.t -> state -> ('res, state) call_result

val handle_cast : cast_req -> state -> state cast_result
val handle_continue : cont_req -> state -> state
val handle_info : Message.t -> state -> unit
end

Expand All @@ -35,14 +49,23 @@ let call : type res. Pid.t -> res req -> res =
in
receive ~selector ()

let cast pid req = send pid (Cast req)

let rec loop : type args state. (args, state) impl -> state -> unit =
fun impl state ->
let (module I : Impl with type args = args and type state = state) = impl in
match receive_any () with
| Call (pid, ref, req) ->
let res, state = I.handle_call req pid state in
send pid (Reply (ref, res));
loop impl state
| Call (pid, ref, req) -> (
match I.handle_call req pid state with
| Reply (res, state) ->
send pid (Reply (ref, res));
loop impl state
| Reply_continue (res, state, cont_req) ->
send pid (Reply (ref, res));
let state = I.handle_continue cont_req state in
loop impl state)
| Cast req -> (
match I.handle_cast req state with No_reply state -> loop impl state)
| msg ->
let _res = I.handle_info msg state in
loop impl state
Expand All @@ -65,5 +88,7 @@ let start_link :
module Default = struct
let init _args = Ignore
let handle_call _req _from _state = failwith "unimplemented"
let handle_cast _req _state = failwith "unimplemented"
let handle_continue _req _state = failwith "unimplemented"
let handle_info _msg _state = failwith "unimplemented"
end
10 changes: 7 additions & 3 deletions riot/lib/key_value_store.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ module MakeServer (B : Base) = struct
let init () = Gen_server.Ok { tbl = Hashtbl.create 0 }

let handle_call :
type res. res Gen_server.req -> Pid.t -> state -> res * state =
type res.
res Gen_server.req ->
Pid.t ->
state ->
(res, state) Gen_server.call_result =
fun req _from state ->
match req with
| Get k -> (Hashtbl.find_opt state.tbl k, state)
| Put (k, v) -> (Hashtbl.replace state.tbl k v, state)
| Get k -> Gen_server.Reply (Hashtbl.find_opt state.tbl k, state)
| Put (k, v) -> Gen_server.Reply (Hashtbl.replace state.tbl k v, state)
| _ -> failwith "invalid call"
end

Expand Down
21 changes: 20 additions & 1 deletion riot/riot.mli
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ module Gen_server : sig
]}
*)

type cast_req = ..
type cont_req = ..

(** [state init_result] is used to initialize a new generic server. *)
type 'state init_result =
| Ok of 'state
Expand All @@ -360,6 +363,12 @@ module Gen_server : sig
(** use this value to crash the process and notify a supervisor of it *)
| Ignore (** use this value to exit the process normally *)

type ('res, 'state) call_result =
| Reply of ('res * 'state)
| Reply_continue of ('res * 'state * cont_req)

type 'state cast_result = No_reply of 'state

(** [Impl] is the module type of the generic server base implementations. You
can use this type when defining new gen servers like this:
Expand All @@ -380,7 +389,12 @@ module Gen_server : sig
type state

val init : args -> state init_result
val handle_call : 'res. 'res req -> Pid.t -> state -> 'res * state

val handle_call :
'res. 'res req -> Pid.t -> state -> ('res, state) call_result

val handle_cast : cast_req -> state -> state cast_result
val handle_continue : cont_req -> state -> state
val handle_info : Message.t -> state -> unit
end

Expand All @@ -396,6 +410,11 @@ module Gen_server : sig
TODO(leostera): add ?timeout param
*)

val cast : Pid.t -> cast_req -> unit
(** [cast pid req] will send a type-safe request [req] to the generic server behind [pid]
and does not wait for a response.
*)

val start_link :
('args, 'state) impl -> 'args -> (Pid.t, [> `Exn of exn ]) result
(** [start_link (module S) args] will spawn and link a new process that will
Expand Down
29 changes: 24 additions & 5 deletions test/gen-servers/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ module Twitch = struct

type _ Gen_server.req +=
| Is_connected : bool Gen_server.req
| Status_value : int Gen_server.req
| Profile :
profile_req
-> (user, [ `Twitch_error of error ]) result Gen_server.req

type Gen_server.cont_req += Update_status : int -> Gen_server.cont_req
type args = { verbose : bool }

module Server : Gen_server.Impl with type args = args = struct
Expand All @@ -31,29 +33,46 @@ module Twitch = struct
let init _args = Gen_server.Ok { status = 1 }

let handle_call :
type res. res Gen_server.req -> Pid.t -> state -> res * state =
type res.
res Gen_server.req ->
Pid.t ->
state ->
(res, state) Gen_server.call_result =
fun req _from state ->
match req with
| Is_connected -> (true, state)
| Is_connected -> Gen_server.Reply (true, state)
| Status_value -> Gen_server.Reply (state.status, state)
| Profile _ ->
( Ok { name = "Jonathan Archer"; email = "[email protected]" },
state )
Gen_server.Reply_continue
( Ok { name = "Jonathan Archer"; email = "[email protected]" },
state,
Update_status 2 )

let handle_info _msg _state = ()

let handle_continue cont_req _state =
match cont_req with Update_status n -> { status = n }

let handle_cast _cast_req _state = failwith "unimplemented"
end

let start_link ?(verbose = false) () =
Gen_server.start_link (module Server) { verbose }

let is_connected pid = Gen_server.call pid Is_connected
let profile pid ~id = Gen_server.call pid (Profile { id })
let status pid = Gen_server.call pid Status_value
end

let main () =
let (Ok _) = Logger.start () in
let (Ok pid) = Twitch.start_link () in
if Twitch.is_connected pid then Logger.info (fun f -> f "connected to twitch");
let status = Twitch.status pid in
Logger.info (fun f -> f "Status is %d" status);
let (Ok user) = Twitch.profile pid ~id:1 in
Logger.info (fun f -> f "Welcome, %s!" user.name)
Logger.info (fun f -> f "Welcome, %s!" user.name);
let status = Twitch.status pid in
Logger.info (fun f -> f "Status is %d" status)

let () = Riot.run @@ main

0 comments on commit ac9e22f

Please sign in to comment.