Source file lwt_throttle.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
open Lwt.Infix
module type S = sig
type key
type t
val create : rate:int -> max:int -> n:int -> t
val wait : t -> key -> bool Lwt.t
end
module Make (H : Hashtbl.HashedType) : (S with type key = H.t) = struct
module MH = Hashtbl.Make(H)
type key = H.t
type elt = {
mutable consumed : int;
queue : bool Lwt.u Queue.t;
}
type t = {
rate : int;
max : int;
mutable waiting : int;
table : elt MH.t;
mutable cleaning : unit Lwt.t option;
}
let create ~rate ~max ~n =
if rate < 1 || max < 1 || n < 0 then
invalid_arg "Lwt_throttle.S.create"
else {
rate = rate;
max = max;
waiting = 0;
table = MH.create n;
cleaning = None;
}
let update_key t key elt (old_waiting,to_run) =
let rec update to_run = function
| 0 -> 0, Queue.length elt.queue, to_run
| i ->
try
let to_run = (Queue.take elt.queue)::to_run in
update to_run (i-1)
with
| Queue.Empty -> i, 0, to_run
in
let not_consumed, waiting, to_run = update to_run t.rate in
let consumed = t.rate - not_consumed in
if consumed = 0
then
MH.remove t.table key
else elt.consumed <- consumed;
(old_waiting+waiting, to_run)
let rec clean_table t =
let waiting,to_run = MH.fold (update_key t) t.table (0,[]) in
t.waiting <- waiting;
if waiting = 0 && to_run = []
then
t.cleaning <- None
else launch_cleaning t;
List.iter (fun u -> Lwt.wakeup u true) to_run
and launch_cleaning t =
t.cleaning <-
let t =
Lwt_unix.sleep 1. >>= fun () ->
Lwt.catch
(fun () ->
clean_table t;
Lwt.return_unit)
(fun _exn ->
prerr_endline "internal error";
Printexc.print_backtrace stderr;
Lwt.return_unit)
in
Some t
let really_wait t elt =
let w,u = Lwt.task () in
if t.max > t.waiting
then (Queue.add u elt.queue;
t.waiting <- succ t.waiting;
w)
else Lwt.return_false
let wait t key =
let res =
try
let elt = MH.find t.table key in
if elt.consumed >= t.rate
then really_wait t elt
else (elt.consumed <- succ elt.consumed;
Lwt.return_true)
with
| Not_found ->
let elt = { consumed = 1;
queue = Queue.create () } in
MH.add t.table key elt;
Lwt.return_true
in
(match t.cleaning with
| None -> launch_cleaning t
| Some _ -> ());
res
end