Source file thread_pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
module Zzz = Eio_utils.Zzz
type job =
| New
| Exit
| Job : {
fn : unit -> 'a;
enqueue : ('a, Eio.Exn.with_bt) result -> unit;
} -> job
module Mailbox = struct
type t = {
available : Semaphore.Binary.t;
mutable cell : job;
}
let create () = { available = Semaphore.Binary.make false; cell = New }
let put mbox x =
mbox.cell <- x;
Semaphore.Binary.release mbox.available
let take mbox =
Semaphore.Binary.acquire mbox.available;
mbox.cell
end
module Free_pool = struct
type list =
| Empty
| Closed
| Free of Mailbox.t * list
type t = list Atomic.t
let rec close_list = function
| Free (x, xs) -> Mailbox.put x Exit; close_list xs
| Empty | Closed -> ()
let close t =
let items = Atomic.exchange t Closed in
close_list items
let rec drop t =
match Atomic.get t with
| Closed | Empty -> ()
| Free _ as items ->
if Atomic.compare_and_set t items Empty then close_list items
else drop t
let rec put t mbox =
match Atomic.get t with
| Closed -> assert false
| (Empty | Free _) as current ->
let next = Free (mbox, current) in
if not (Atomic.compare_and_set t current next) then
put t mbox
let make_thread t =
let mbox = Mailbox.create () in
let _thread : Thread.t = Thread.create (fun () ->
while true do
match Mailbox.take mbox with
| New -> assert false
| Exit -> raise Thread.Exit
| Job { fn; enqueue } ->
let result =
try Ok (fn ())
with exn ->
let bt = Printexc.get_raw_backtrace () in
Error (exn, bt)
in
put t mbox;
enqueue result
done
) ()
in
mbox
let rec get_thread t =
match Atomic.get t with
| Closed -> invalid_arg "Thread pool closed!"
| Empty -> make_thread t
| Free (mbox, next) as current ->
if Atomic.compare_and_set t current next then mbox
else get_thread t
end
type t = {
free : Free_pool.t;
sleep_q : Zzz.t;
mutable timeout : Zzz.Key.t option;
}
type _ Effect.t += Run_in_systhread : (unit -> 'a) -> (('a, Eio.Exn.with_bt) result * t) Effect.t
let terminate t =
Free_pool.close t.free;
Option.iter (fun key -> Zzz.remove t.sleep_q key; t.timeout <- None) t.timeout
let create ~sleep_q =
{ free = Atomic.make Free_pool.Empty; sleep_q; timeout = None }
let run t fn =
match fn () with
| x -> terminate t; x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
terminate t;
Printexc.raise_with_backtrace ex bt
let submit t ~ctx ~enqueue fn =
match Eio.Private.Fiber_context.get_error ctx with
| Some e -> enqueue (Error (e, Eio.Exn.empty_backtrace))
| None ->
let mbox = Free_pool.get_thread t.free in
Mailbox.put mbox (Job { fn; enqueue })
let run_in_systhread ?(label="systhread") fn =
Eio.Private.Trace.suspend_fiber label;
let r, t = Effect.perform (Run_in_systhread fn) in
if t.timeout = None then (
let time =
Mtime.add_span (Mtime_clock.now ()) Mtime.Span.(20 * ms)
|> Option.value ~default:Mtime.max_stamp
in
t.timeout <- Some (Zzz.add t.sleep_q time (Fn (fun () -> Free_pool.drop t.free; t.timeout <- None)))
);
match r with
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt