Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setsockopt() to eio sockets #365

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ We can test it using a mock network:
```ocaml
# Eio_main.run @@ fun _env ->
let net = Eio_mock.Net.make "mocknet" in
let socket = Eio_mock.Flow.make "socket" in
let socket = Eio_mock.Net.stream_socket "socket" in
Eio_mock.Net.on_connect net [`Return socket];
run_client ~net ~addr:(`Tcp (Eio.Net.Ipaddr.V4.loopback, 8080));;
+Connecting to server...
Expand Down Expand Up @@ -473,7 +473,7 @@ This can also be tested on its own using a mock network:
# Eio_main.run @@ fun _env ->
let listening_socket = Eio_mock.Net.listening_socket "tcp/80" in
let mock_addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 37568) in
let connection = Eio_mock.Flow.make "connection" in
let connection = Eio_mock.Net.stream_socket "connection" in
Eio_mock.Net.on_accept listening_socket [`Return (connection, mock_addr)];
Eio_mock.Flow.on_read connection [
`Return "(packet 1)";
Expand Down
36 changes: 23 additions & 13 deletions lib_eio/mock/eio_mock.mli
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,27 @@ module Flow : sig
| `Read_source_buffer (** Use the {!Eio.Flow.Read_source_buffer} optimisation. *)
]

type t = <
Eio.Flow.two_way;
Eio.Flow.close;
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
attach_to_switch : Eio.Switch.t -> unit;
>
class virtual t : ?pp:string Fmt.t -> string -> object
inherit Eio.Flow.two_way
inherit Eio.Flow.close
method on_read : string Handler.t
method on_copy_bytes : int Handler.t
method set_copy_method : copy_method -> unit
method attach_to_switch : Eio.Switch.t -> unit
end

val make : ?pp:string Fmt.t -> string -> t
(** [make label] is a mock Eio flow.
It can be used as a source, sink, or two-way flow.
@param pp Printer to use to display the data. *)

val on_read : t -> string Handler.actions -> unit
val on_read : #t -> string Handler.actions -> unit
(** [on_read t actions] configures the values to return from the mock's [read] function. *)

val on_copy_bytes : t -> int Handler.actions -> unit
val on_copy_bytes : #t -> int Handler.actions -> unit
(** [on_copy_bytes t actions] configures the number of bytes to copy in each iteration. *)

val set_copy_method : t -> copy_method -> unit
val set_copy_method : #t -> copy_method -> unit
(** [set_copy_method t m] configures [t] to use the given method to read from
a source during a copy operation. *)
end
Expand All @@ -125,9 +125,16 @@ module Net : sig
on_getnameinfo : (string * string) Handler.t;
>

type stream_socket = <
Flow.t;
Eio.Net.stream_socket;
on_setsockopt: unit Handler.t;
>

type listening_socket = <
Eio.Net.listening_socket;
on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t;
on_accept : (stream_socket * Eio.Net.Sockaddr.stream) Handler.t;
on_setsockopt: unit Handler.t;
>

val make : string -> t
Expand All @@ -149,9 +156,12 @@ module Net : sig
val listening_socket : string -> listening_socket
(** [listening_socket label] can be configured to provide mock connections. *)

val stream_socket : string -> stream_socket
(** [stream_socket label] is a mock [stream_socket]. *)

val on_accept :
listening_socket ->
(Flow.t * Eio.Net.Sockaddr.stream) Handler.actions ->
(stream_socket * Eio.Net.Sockaddr.stream) Handler.actions ->
unit
(** [on_accept socket actions] configures how to respond when the server calls "accept". *)
end
Expand Down
23 changes: 8 additions & 15 deletions lib_eio/mock/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ type copy_method = [
| `Read_source_buffer
]

type t = <
Eio.Flow.two_way;
Eio.Flow.close;
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
attach_to_switch : Switch.t -> unit;
>

let pp_default f s =
let rec aux i =
let nl =
Expand All @@ -34,7 +25,7 @@ let rec takev len = function
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs

let make ?(pp=pp_default) label =
class t ?(pp=pp_default) label =
let on_read = Handler.make (`Raise End_of_file) in
let on_copy_bytes = Handler.make (`Return 4096) in
let copy_method = ref `Read_into in
Expand Down Expand Up @@ -89,7 +80,7 @@ let make ?(pp=pp_default) label =
if not (List.exists try_rsb (Eio.Flow.read_methods src)) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation"

method set_copy_method m =
method set_copy_method (m: copy_method) =
copy_method := m

method shutdown cmd =
Expand All @@ -110,7 +101,9 @@ let make ?(pp=pp_default) label =
traceln "%s: closed" label
end

let on_read (t:t) = Handler.seq t#on_read
let on_copy_bytes (t:t) = Handler.seq t#on_copy_bytes
let set_copy_method (t:t) = t#set_copy_method
let attach_to_switch (t:t) = t#attach_to_switch
let make ?(pp=pp_default) label = new t ~pp label

let on_read (t:#t) = Handler.seq t#on_read
let on_copy_bytes (t:#t) = Handler.seq t#on_copy_bytes
let set_copy_method (t:#t) = t#set_copy_method
let attach_to_switch (t:#t) = t#attach_to_switch
52 changes: 49 additions & 3 deletions lib_eio/mock/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,51 @@ let on_getaddrinfo (t:t) actions = Handler.seq t#on_getaddrinfo actions

let on_getnameinfo (t:t) actions = Handler.seq t#on_getnameinfo actions

type stream_socket = <
Flow.t;
Eio.Net.stream_socket;
on_setsockopt: unit Handler.t;
>

type listening_socket = <
Eio.Net.listening_socket;
on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t;
on_accept : (stream_socket * Eio.Net.Sockaddr.stream) Handler.t;
on_setsockopt: unit Handler.t;
>

let setsockopt (type a) label handler (opt: a Eio.Net.Sockopt.t) (x : a) : unit =
let traceln (pp : a Fmt.t) (x: a) s =
traceln "%s: setsockopt %s %a" label s pp x
in
(match opt with
| IPV6_ONLY -> traceln Fmt.bool x "IPV6_ONLY"
| ACCEPTCONN -> traceln Fmt.bool x "SO_ACCEPTCONN"
| BROADCAST -> traceln Fmt.bool x "SO_BROADCAST"
| DEBUG -> traceln Fmt.bool x "SO_DEBUG"
| DONTROUTE -> traceln Fmt.bool x "SO_DONTROUTE"
| KEEPALIVE -> traceln Fmt.bool x "SO_KEEPALIVE"
| LINGER -> traceln Fmt.(option int) x "SO_LINGER"
| OOBINLINE -> traceln Fmt.bool x "SO_OOBINLINE"
| RCVBUF -> traceln Fmt.int x "SO_RCVBUF"
| RCVLOWAT -> traceln Fmt.int x "SO_RCVLOWAT"
| REUSEADDR -> traceln Fmt.bool x "SO_REUSEADDR"
| REUSEPORT -> traceln Fmt.bool x "SO_REUSEPORT"
| SNDBUF -> traceln Fmt.int x "SO_SNDBUF"
| SNDLOWAT -> traceln Fmt.int x "SO_SNDLOWAT"
| RCVTIMEO -> traceln Fmt.float x "SO_RCVTIMEO"
| SNDTIMEO -> traceln Fmt.float x "SO_SNDTIMEO"
| TYPE -> traceln Fmt.int x "SO_TYPE"
| TCP_NODELAY -> traceln Fmt.bool x "TCP_NODELAY"
);
Handler.run handler

let listening_socket label =
let on_accept = Handler.make (`Raise (Failure "Mock accept handler not configured")) in
object
object (self)
inherit Eio.Net.listening_socket

method on_accept = on_accept
method on_setsockopt = Handler.make (`Raise (Failure "Mock setsockopt handler not configured"))

method accept ~sw =
let socket, addr = Handler.run on_accept in
Expand All @@ -91,8 +125,20 @@ let listening_socket label =

method close =
traceln "%s: closed" label

method setsockopt = (setsockopt label self#on_setsockopt)
end

let stream_socket label : stream_socket = object (self)
inherit Flow.t label

method on_setsockopt = Handler.make (`Raise (Failure "Mock setsockopt handler not configured"))
method setsockopt: type a. a Eio.Net.Sockopt.t -> a -> unit =
fun opt x -> setsockopt label self#on_setsockopt opt x
end

let on_accept (l:listening_socket) actions =
let as_accept_pair x = (x :> Flow.t * Eio.Net.Sockaddr.stream) in
let as_accept_pair x = (x :> stream_socket * Eio.Net.Sockaddr.stream) in
Handler.seq l#on_accept (List.map (Action.map as_accept_pair) actions)

let on_setsockopt (l:listening_socket) actions = Handler.seq l#on_setsockopt actions
30 changes: 28 additions & 2 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ module Ipaddr = struct
let elide = min elide zeros in
let parts = if zeros = 0 then acc else zeros :: acc in
((if elide < -1 then Some elide else None), List.rev parts)

in
loop 0 0 [] t

Expand Down Expand Up @@ -156,12 +156,36 @@ module Sockaddr = struct
Format.fprintf f "udp:%a:%d" Ipaddr.pp_for_uri addr port
end

module Sockopt = struct
type 'a t =
| IPV6_ONLY : bool t
| ACCEPTCONN : bool t
| BROADCAST : bool t
| DEBUG : bool t
| DONTROUTE : bool t
| KEEPALIVE : bool t
| LINGER : int option t
| OOBINLINE : bool t
| RCVBUF : int t
| RCVLOWAT : int t
| RCVTIMEO : float t
| REUSEADDR : bool t
| REUSEPORT : bool t
| SNDBUF : int t
| SNDLOWAT : int t
| SNDTIMEO : float t
| TYPE : int t
| TCP_NODELAY : bool t
end

class virtual socket = object (_ : #Generic.t)
method probe _ = None
method virtual setsockopt : 'a. 'a Sockopt.t -> 'a -> unit
end

class virtual stream_socket = object
inherit Flow.two_way
method virtual setsockopt : 'a. 'a Sockopt.t -> 'a -> unit
end

class virtual listening_socket = object
Expand Down Expand Up @@ -221,7 +245,7 @@ let connect ~sw (t:#t) addr =
Exn.reraise_with_context ex bt "connecting to %a" Sockaddr.pp addr

let datagram_socket ?(reuse_addr=false) ?(reuse_port=false) ~sw (t:#t) addr =
let addr = (addr :> [Sockaddr.datagram | `UdpV4 | `UdpV6]) in
let addr = (addr :> [Sockaddr.datagram | `UdpV4 | `UdpV6]) in
t#datagram_socket ~reuse_addr ~reuse_port ~sw addr

let getaddrinfo ?(service="") (t:#t) hostname = t#getaddrinfo ~service hostname
Expand Down Expand Up @@ -267,3 +291,5 @@ let with_tcp_connect ?(timeout=Time.Timeout.none) ~host ~service t f =
| exception (Exn.Io _ as ex) ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %S:%s" host service

let setsockopt (s:#socket) = s#setsockopt
33 changes: 33 additions & 0 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,37 @@ module Sockaddr : sig
val pp : Format.formatter -> [< t] -> unit
end

(** {2 Socket Options} *)

module Sockopt : sig
type 'a t =
| IPV6_ONLY : bool t (** Forbid binding an IPv6 socket to an IPv4 address *)
| ACCEPTCONN : bool t (** Report whether socket listening is enabled *)
| BROADCAST : bool t (** Permit sending of broadcast messages *)
| DEBUG : bool t (** Record debugging information *)
| DONTROUTE : bool t (** Bypass the standard routing algorithms *)
| KEEPALIVE : bool t (** Keep connection active *)
| LINGER : int option t (** Whether to linger on closed connections
that have data present, and for how long
(in seconds) *)
| OOBINLINE : bool t (** Leave out-of-band data in line *)
| RCVBUF : int t (** Size of received buffer *)
| RCVLOWAT : int t (** Minimum number of bytes to process for input operations *)
| RCVTIMEO : float t (** Timeout for input operations *)
| REUSEADDR : bool t (** Allow reuse of local addresses for bind *)
| REUSEPORT : bool t (** Allow reuse of address and port bindings *)
| SNDBUF : int t (** Size of send buffer *)
| SNDLOWAT : int t (** Minimum number of bytes to process for output operations *)
| SNDTIMEO : float t (** Timeout for output operations *)
| TYPE : int t (** Report the socket type *)
| TCP_NODELAY : bool t (** Control the Nagle algorithm for TCP sockets *)
end

(** {2 Provider Interfaces} *)

class virtual socket : object
inherit Generic.t
method virtual setsockopt : 'a Sockopt.t -> 'a -> unit
end

class virtual stream_socket : object
Expand Down Expand Up @@ -253,6 +280,12 @@ val getnameinfo : #t -> Sockaddr.t -> (string * string)
registered domain name represented by [sockaddr]. [service] is the IANA specified textual name of the
port specified in [sockaddr], e.g. 'ftp', 'http', 'https', etc. *)

(** {2 Socket} *)

val setsockopt : #socket -> 'a Sockopt.t -> 'a -> unit
(** [setsockopt s opt v] configures socket [s] with socket option and value [opt]
and [v] respectively. *)

(** {2 Closing} *)
val close : <close: unit; ..> -> unit
(** [close t] marks the socket as closed. It can no longer be used after this. *)
23 changes: 22 additions & 1 deletion lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type socket = <
module Private = struct
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty

type _ Effect.t +=
type _ Effect.t +=
| Await_readable : Unix.file_descr -> unit Effect.t
| Await_writable : Unix.file_descr -> unit Effect.t
| Get_monotonic_clock : Eio.Time.Mono.t Effect.t
Expand Down Expand Up @@ -75,3 +75,24 @@ let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
run_in_systhread (fun () ->
let Unix.{ni_hostname; ni_service} = Unix.getnameinfo sockaddr options in
(ni_hostname, ni_service))

let setsockopt (type a) fd (opt: a Eio.Net.Sockopt.t) (v: a) =
match opt with
| IPV6_ONLY -> Unix.(setsockopt fd IPV6_ONLY v)
| ACCEPTCONN -> Unix.(setsockopt fd SO_ACCEPTCONN v)
| BROADCAST -> Unix.(setsockopt fd SO_BROADCAST v)
| DEBUG -> Unix.(setsockopt fd SO_DEBUG v)
| DONTROUTE -> Unix.(setsockopt fd SO_DONTROUTE v)
| KEEPALIVE -> Unix.(setsockopt fd SO_KEEPALIVE v)
| LINGER -> Unix.(setsockopt_optint fd SO_LINGER v)
| OOBINLINE -> Unix.(setsockopt fd SO_OOBINLINE v)
| RCVBUF -> Unix.(setsockopt_int fd SO_RCVBUF v)
| RCVLOWAT -> Unix.(setsockopt_int fd SO_RCVLOWAT v)
| RCVTIMEO -> Unix.(setsockopt_float fd SO_RCVTIMEO v)
| REUSEADDR -> Unix.(setsockopt fd SO_REUSEADDR v)
| REUSEPORT -> Unix.(setsockopt fd SO_REUSEPORT v)
| SNDBUF -> Unix.(setsockopt_int fd SO_SNDBUF v)
| SNDLOWAT -> Unix.(setsockopt_int fd SO_SNDLOWAT v)
| SNDTIMEO -> Unix.(setsockopt_float fd SO_SNDTIMEO v)
| TYPE -> Unix.(setsockopt_int fd SO_TYPE v)
| TCP_NODELAY -> Unix.(setsockopt fd TCP_NODELAY v)
2 changes: 2 additions & 0 deletions lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,5 @@ module Ctf = Ctf_unix

val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
(** [getnameinfo sockaddr] returns domain name and service for [sockaddr]. *)

val setsockopt : Unix.file_descr -> 'a Eio.Net.Sockopt.t -> 'a -> unit
Loading