1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
module Suspended = Eio_utils.Suspended
module Zzz = Eio_utils.Zzz
module Lf_queue = Eio_utils.Lf_queue
module Fiber_context = Eio.Private.Fiber_context
module Trace = Eio.Private.Trace
module Rcfd = Eio_unix.Private.Rcfd
module Poll = Iomux.Poll
type exit = [`Exit_scheduler]
type runnable =
| IO : runnable
| Thread : 'a Suspended.t * 'a -> runnable
| Failed_thread : 'a Suspended.t * exn -> runnable
type fd_event_waiters = {
read : unit Suspended.t Lwt_dllist.t;
write : unit Suspended.t Lwt_dllist.t;
}
type t = {
run_q : runnable Lf_queue.t;
poll : Poll.t;
mutable poll_maxi : int;
fd_map : (Unix.file_descr, fd_event_waiters) Hashtbl.t;
eventfd : Rcfd.t;
eventfd_r : Unix.file_descr;
mutable active_ops : int;
need_wakeup : bool Atomic.t;
sleep_q: Zzz.t;
thread_pool : Eio_unix.Private.Thread_pool.t;
}
let wake_buffer = Bytes.of_string "!"
let wakeup t =
Atomic.set t.need_wakeup false;
Rcfd.use t.eventfd
~if_closed:ignore
(fun fd ->
try
ignore (Unix.single_write fd wake_buffer 0 1 : int)
with
| Unix.Unix_error ((Unix.EAGAIN | EWOULDBLOCK), _, _) ->
()
| Unix.Unix_error (Unix.EPIPE, _, _) ->
()
)
let enqueue_thread t k x =
Lf_queue.push t.run_q (Thread (k, x));
if Atomic.get t.need_wakeup then wakeup t
let enqueue_failed_thread t k ex =
Lf_queue.push t.run_q (Failed_thread (k, ex));
if Atomic.get t.need_wakeup then wakeup t
let enqueue_at_head t k =
Lf_queue.push_head t.run_q (Thread (k, ()))
let get_waiters t fd =
match Hashtbl.find_opt t.fd_map fd with
| Some x -> x
| None ->
let x = { read = Lwt_dllist.create (); write = Lwt_dllist.create () } in
Hashtbl.add t.fd_map fd x;
x
let clear_event_fd t =
let buf = Bytes.create 8 in
let got = Unix.read t.eventfd_r buf 0 (Bytes.length buf) in
assert (got > 0)
let update t waiters fd =
let fdi = Iomux.Util.fd_of_unix fd in
let flags =
match not (Lwt_dllist.is_empty waiters.read),
not (Lwt_dllist.is_empty waiters.write) with
| false, false -> Poll.Flags.empty
| true, false -> Poll.Flags.pollin
| false, true -> Poll.Flags.pollout
| true, true -> Poll.Flags.(pollin + pollout)
in
if flags = Poll.Flags.empty then (
Poll.invalidate_index t.poll fdi;
let rec lower_maxi = function
| -1 -> t.poll_maxi <- -1
| index ->
if Poll.((get_fd t.poll index) <> invalid_fd) then
t.poll_maxi <- index
else
lower_maxi (pred index)
in
if fdi = t.poll_maxi then
lower_maxi (pred fdi);
Hashtbl.remove t.fd_map fd
) else (
Poll.set_index t.poll fdi fd flags;
if fdi > t.poll_maxi then
t.poll_maxi <- fdi
)
let resume t node =
t.active_ops <- t.active_ops - 1;
let k : unit Suspended.t = Lwt_dllist.get node in
Fiber_context.clear_cancel_fn k.fiber;
enqueue_thread t k ()
let ready t _index fd revents =
assert (not Poll.Flags.(mem revents pollnval));
if fd == t.eventfd_r then (
clear_event_fd t
) else (
let waiters = Hashtbl.find t.fd_map fd in
let pending = Lwt_dllist.create () in
if Poll.Flags.(mem revents (pollout + pollhup + pollerr)) then
Lwt_dllist.transfer_l waiters.write pending;
if Poll.Flags.(mem revents (pollin + pollhup + pollerr)) then
Lwt_dllist.transfer_l waiters.read pending;
if not (Lwt_dllist.is_empty pending) then
update t waiters fd;
Lwt_dllist.iter_node_r (resume t) pending
)
let rec next t : [`Exit_scheduler] =
match Lf_queue.pop t.run_q with
| None -> assert false
| Some Thread (k, v) ->
Fiber_context.clear_cancel_fn k.fiber;
Suspended.continue k v
| Some Failed_thread (k, ex) ->
Fiber_context.clear_cancel_fn k.fiber;
Suspended.discontinue k ex
| Some IO ->
let now = Mtime_clock.now () in
match Zzz.pop ~now t.sleep_q with
| `Due k ->
Lf_queue.push t.run_q IO;
begin match k with
| Fiber k -> Suspended.continue k ()
| Fn fn -> fn (); next t
end
| `Wait_until _ | `Nothing as next_due ->
let timeout =
match next_due with
| `Wait_until time ->
let time = Mtime.to_uint64_ns time in
let now = Mtime.to_uint64_ns now in
let diff_ns = Int64.sub time now in
Poll.Nanoseconds diff_ns
| `Nothing -> Poll.Infinite
in
if timeout = Infinite && t.active_ops = 0 && Lf_queue.is_empty t.run_q then (
Lf_queue.close t.run_q;
`Exit_scheduler
) else (
Atomic.set t.need_wakeup true;
let timeout =
if Lf_queue.is_empty t.run_q then timeout
else (
Poll.Nowait
)
in
Trace.suspend_domain Begin;
let nready =
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
in
Trace.suspend_domain End;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO;
Poll.iter_ready t.poll nready (ready t);
next t
)
let with_sched fn =
let run_q = Lf_queue.create () in
Lf_queue.push run_q IO;
let sleep_q = Zzz.create () in
let eventfd_r, eventfd_w = Unix.pipe ~cloexec:true () in
Unix.set_nonblock eventfd_r;
Unix.set_nonblock eventfd_w;
let eventfd = Rcfd.make eventfd_w in
let cleanup () =
Unix.close eventfd_r;
let was_open = Rcfd.close eventfd in
assert was_open
in
let poll = Poll.create () in
let fd_map = Hashtbl.create 10 in
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
let t = { run_q; poll; poll_maxi = (-1); fd_map; eventfd; eventfd_r;
active_ops = 0; need_wakeup = Atomic.make false; sleep_q; thread_pool } in
let eventfd_ri = Iomux.Util.fd_of_unix eventfd_r in
Poll.set_index t.poll eventfd_ri eventfd_r Poll.Flags.pollin;
if eventfd_ri > t.poll_maxi then
t.poll_maxi <- eventfd_ri;
match fn t with
| x -> cleanup (); x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
cleanup ();
Printexc.raise_with_backtrace ex bt
let await_readable t (k : unit Suspended.t) fd =
match Fiber_context.get_error k.fiber with
| Some e -> Suspended.discontinue k e
| None ->
t.active_ops <- t.active_ops + 1;
let waiters = get_waiters t fd in
let was_empty = Lwt_dllist.is_empty waiters.read in
let node = Lwt_dllist.add_l k waiters.read in
if was_empty then update t waiters fd;
Fiber_context.set_cancel_fn k.fiber (fun ex ->
Lwt_dllist.remove node;
if Lwt_dllist.is_empty waiters.read then
update t waiters fd;
t.active_ops <- t.active_ops - 1;
enqueue_failed_thread t k ex
);
next t
let await_writable t (k : unit Suspended.t) fd =
match Fiber_context.get_error k.fiber with
| Some e -> Suspended.discontinue k e
| None ->
t.active_ops <- t.active_ops + 1;
let waiters = get_waiters t fd in
let was_empty = Lwt_dllist.is_empty waiters.write in
let node = Lwt_dllist.add_l k waiters.write in
if was_empty then update t waiters fd;
Fiber_context.set_cancel_fn k.fiber (fun ex ->
Lwt_dllist.remove node;
if Lwt_dllist.is_empty waiters.write then
update t waiters fd;
t.active_ops <- t.active_ops - 1;
enqueue_failed_thread t k ex
);
next t
let get_enqueue t k = function
| Ok v -> enqueue_thread t k v
| Error ex -> enqueue_failed_thread t k ex
let await_timeout t (k : unit Suspended.t) time =
match Fiber_context.get_error k.fiber with
| Some e -> Suspended.discontinue k e
| None ->
let node = Zzz.add t.sleep_q time (Fiber k) in
Fiber_context.set_cancel_fn k.fiber (fun ex ->
Zzz.remove t.sleep_q node;
enqueue_failed_thread t k ex
);
next t
let with_op t fn x =
t.active_ops <- t.active_ops + 1;
match fn x with
| r ->
t.active_ops <- t.active_ops - 1;
r
| exception ex ->
t.active_ops <- t.active_ops - 1;
raise ex
[@@@alert "-unstable"]
type _ Effect.t += Enter : (t -> 'a Eio_utils.Suspended.t -> [`Exit_scheduler]) -> 'a Effect.t
let enter op fn =
Trace.suspend_fiber op;
Effect.perform (Enter fn)
let run ~ t main x =
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Trace.fiber (Fiber_context.tid fiber);
match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
exnc = (fun ex ->
Fiber_context.destroy fiber;
Printexc.raise_with_backtrace ex (Printexc.get_raw_backtrace ())
);
effc = fun (type a) (e : a Effect.t) ->
match e with
| Enter fn -> Some (fun k ->
match Fiber_context.get_error fiber with
| Some e -> discontinue k e
| None -> fn t { Suspended.k; fiber }
)
| Eio.Private.Effects.Get_context -> Some (fun k -> continue k fiber)
| Eio.Private.Effects.Suspend f -> Some (fun k ->
let k = { Suspended.k; fiber } in
let enqueue = get_enqueue t k in
f fiber enqueue;
next t
)
| Eio.Private.Effects.Fork (new_fiber, f) -> Some (fun k ->
let k = { Suspended.k; fiber } in
enqueue_at_head t k;
fork ~new_fiber f
)
| Eio_unix.Private.Await_readable fd -> Some (fun k ->
await_readable t { Suspended.k; fiber } fd
)
| Eio_unix.Private.Await_writable fd -> Some (fun k ->
await_writable t { Suspended.k; fiber } fd
)
| Eio_unix.Private.Thread_pool.Run_in_systhread fn -> Some (fun k ->
let k = { Suspended.k; fiber } in
let enqueue x = enqueue_thread t k (x, t.thread_pool) in
Eio_unix.Private.Thread_pool.submit t.thread_pool ~ctx:fiber ~enqueue fn;
next t
)
| e -> extra_effects.Effect.Deep.effc e
}
in
let result = ref None in
let `Exit_scheduler =
let new_fiber = Fiber_context.make_root () in
Domain_local_await.using
~prepare_for_await:Eio_utils.Dla.prepare_for_await
~while_running:(fun () ->
fork ~new_fiber (fun () ->
Eio_unix.Private.Thread_pool.run t.thread_pool @@ fun () ->
result := Some (with_op t main x);
)
)
in
match !result with
| Some x -> x
| None -> failwith "BUG in scheduler: deadlock detected"