Source file executor_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
type job = Pack : {
fn : unit -> 'a;
w : ('a, exn) Result.t Promise.u;
weight : int;
} -> job
type t = {
queue : job Sync.t;
}
let max_capacity = 1_000_000
let max_capacity_f = float max_capacity
let run_worker { queue } =
Switch.run ~name:"run_worker" @@ fun sw ->
let capacity = ref 0 in
let condition = Condition.create () in
let rec loop () =
while !capacity >= max_capacity do Condition.await_no_mutex condition done;
match Sync.take queue with
| Error `Closed -> `Stop_daemon
| Ok (Pack { fn; w; weight }) ->
capacity := !capacity + weight;
Option.iter (Promise.resolve_error w) (Switch.get_error sw);
Fiber.fork ~sw (fun () ->
Promise.resolve w (try Ok (fn ()) with ex -> Error ex);
capacity := !capacity - weight;
Condition.broadcast condition
);
Fiber.yield ();
(loop [@tailcall]) ()
in
loop ()
let create ~sw ~domain_count domain_mgr =
let queue = Sync.create () in
let t = { queue } in
Switch.on_release sw (fun () -> Sync.close queue);
for _ = 1 to domain_count do
Fiber.fork_daemon ~sw (fun () ->
Domain_manager.run domain_mgr (fun () ->
run_worker t))
done;
t
let enqueue { queue } ~weight fn =
if not (weight >= 0. && weight <= 1.)
then Fmt.invalid_arg "Executor_pool: weight %g not >= 0.0 && <= 1.0" weight
else (
let weight = Float.to_int (weight *. max_capacity_f) in
let p, w = Promise.create () in
Sync.put queue (Pack { fn; w; weight });
p
)
let submit t ~weight fn =
enqueue t ~weight fn |> Promise.await
let submit_exn t ~weight fn =
enqueue t ~weight fn |> Promise.await_exn
let submit_fork ~sw t ~weight fn =
Fiber.fork_promise ~sw (fun () -> submit_exn t ~weight fn)