forked from ygrek/ocurl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
curl_lwt.ml
88 lines (81 loc) · 2.44 KB
/
curl_lwt.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
(** Lwt support for Curl *)
module M = Curl.Multi
let log fmt = Printf.ksprintf prerr_endline fmt
type multi = {
mt : Curl.Multi.mt;
all_events : (Unix.file_descr, Lwt_engine.event list) Hashtbl.t;
wakeners : (Curl.t, Curl.curlCode Lwt.u) Hashtbl.t;
}
let create () =
let mt = M.create () in
let timer_event = ref Lwt_engine.fake_event in
let all_events = Hashtbl.create 32 in
let wakeners = Hashtbl.create 32 in
let finished s =
let rec loop n =
match M.remove_finished mt with
| None -> ()
| Some (h,code) ->
begin try
let w = Hashtbl.find wakeners h in
Hashtbl.remove wakeners h;
Lwt.wakeup w code
with Not_found ->
prerr_endline "curl_lwt: orphan handle, how come?"
end;
loop (n+1)
in
loop 0
in
let on_readable fd _ =
let (_:int) = M.action mt fd M.EV_IN in
finished "on_readable";
in
let on_writable fd _ =
let (_:int) = M.action mt fd M.EV_OUT in
finished "on_writable";
in
let on_timer _ =
Lwt_engine.stop_event !timer_event;
M.action_timeout mt;
finished "on_timer"
in
M.set_timer_function mt begin fun timeout ->
Lwt_engine.stop_event !timer_event; (* duplicate stop_event is ok *)
timer_event := Lwt_engine.on_timer (float_of_int timeout /. 1000.) false on_timer
end;
M.set_socket_function mt begin fun fd what ->
begin
try
List.iter Lwt_engine.stop_event (Hashtbl.find all_events fd);
Hashtbl.remove all_events fd;
with
Not_found -> () (* first event for the socket - no association *)
end;
let events = match what with
| M.POLL_REMOVE | M.POLL_NONE -> []
| M.POLL_IN -> [Lwt_engine.on_readable fd (on_readable fd)]
| M.POLL_OUT -> [Lwt_engine.on_writable fd (on_writable fd)]
| M.POLL_INOUT -> [Lwt_engine.on_readable fd (on_readable fd); Lwt_engine.on_writable fd (on_writable fd)]
in
match events with
| [] -> ()
| _ -> Hashtbl.add all_events fd events;
end;
{ mt; all_events; wakeners; }
(* lwt may not run in parallel so one global is OK'ish *)
let global = lazy (create ())
let setopt opt =
let t = Lazy.force global in
M.setopt t.mt opt
let perform h =
let t = Lazy.force global in
let (waiter,wakener) = Lwt.wait () in
let waiter = Lwt.protected waiter in
Lwt.on_cancel waiter (fun () ->
Curl.Multi.remove t.mt h;
Hashtbl.remove t.wakeners h;
);
Hashtbl.add t.wakeners h wakener;
M.add t.mt h;
waiter