1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
[@@@alert "-unstable"]
type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t
let yield () =
let fiber = Suspend.enter "" (fun fiber enqueue -> enqueue (Ok fiber)) in
Cancel.check fiber.cancel_context
let fork_raw new_fiber f =
Effect.perform (Fork (new_fiber, f))
let fork ~sw f =
Switch.check_our_domain sw;
if Cancel.is_on sw.cancel then (
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in
fork_raw new_fiber @@ fun () ->
Switch.with_op sw @@ fun () ->
try
f ()
with ex ->
let bt = Printexc.get_raw_backtrace () in
Switch.fail ~bt sw ex;
)
let fork_daemon ~sw f =
Switch.check_our_domain sw;
if Cancel.is_on sw.cancel then (
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in
fork_raw new_fiber @@ fun () ->
Switch.with_daemon sw @@ fun () ->
match f () with
| `Stop_daemon ->
()
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
()
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Switch.fail ~bt sw ex;
)
let fork_promise ~sw f =
Switch.check_our_domain sw;
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match Switch.with_op sw f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
);
p
let fork_promise_exn ~sw f =
Switch.check_our_domain sw;
let vars = Cancel.Fiber_context.get_vars () in
let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match Switch.with_op sw f with
| x -> Promise.resolve r x
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
Switch.fail ~bt sw ex
);
p
let rec forks ~sw = function
| [] -> ()
| [x] -> Switch.check sw; x ()
| x :: xs ->
fork ~sw x;
forks ~sw xs
let all xs =
Switch.run ~name:"all" @@ fun sw ->
forks ~sw xs
let both f g =
Switch.run ~name:"both" @@ fun sw ->
forks ~sw [f; g]
let pair f g =
Switch.run ~name:"pair" @@ fun sw ->
let x = fork_promise ~sw f in
let y = g () in
(Promise.await_exn x, y)
exception Not_first
let await_cancel () =
Suspend.enter "await_cancel" @@ fun fiber enqueue ->
Cancel.Fiber_context.set_cancel_fn fiber (fun ex -> enqueue (Error ex))
type 'a any_status =
| New
| Ex of (exn * Printexc.raw_backtrace)
| OK of 'a
let any_gen ~return ~combine fs =
let r = ref New in
let parent_c =
Cancel.sub_unchecked Any (fun cc ->
let wrap h =
match h () with
| x ->
begin match !r with
| New -> r := OK (return x); Cancel.cancel cc Not_first
| OK prev -> r := OK (combine prev x)
| Ex _ -> ()
end
| exception Cancel.Cancelled _ when not (Cancel.is_on cc) ->
()
| exception ex ->
begin match !r with
| New -> r := Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex
| OK _ -> r := Ex (ex, Printexc.get_raw_backtrace ())
| Ex prev ->
let bt = Printexc.get_raw_backtrace () in
r := Ex (Exn.combine prev (ex, bt))
end
in
let vars = Cancel.Fiber_context.get_vars () in
let rec aux = function
| [] -> await_cancel ()
| [f] -> wrap f; []
| f :: fs ->
let new_fiber = Cancel.Fiber_context.make ~cc ~vars in
let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in
fork_raw new_fiber (fun () ->
match wrap f with
| () -> Promise.resolve_ok r ()
| exception ex -> Promise.resolve_error r ex
);
p :: aux fs
in
let ps = aux fs in
Cancel.protect (fun () -> List.iter Promise.await_exn ps)
)
in
match !r, Cancel.get_error parent_c with
| OK r, None -> r
| (OK _ | New), Some ex -> raise ex
| Ex (ex, bt), None -> Printexc.raise_with_backtrace ex bt
| Ex ex1, Some ex2 ->
let bt2 = Printexc.get_raw_backtrace () in
let ex, bt = Exn.combine ex1 (ex2, bt2) in
Printexc.raise_with_backtrace ex bt
| New, None -> assert false
let n_any fs =
List.rev (any_gen fs ~return:(fun x -> [x]) ~combine:(fun xs x -> x :: xs))
let any ?(combine=(fun x _ -> x)) fs = any_gen fs ~return:Fun.id ~combine
let first ?combine f g = any ?combine [f; g]
let is_cancelled () =
let ctx = Effect.perform Cancel.Get_context in
not (Cancel.is_on ctx.cancel_context)
let check () =
let ctx = Effect.perform Cancel.Get_context in
Cancel.check ctx.cancel_context
module List = struct
let opt_cons x xs =
match x with
| None -> xs
| Some x -> x :: xs
module Limiter : sig
(** This is a bit like using a semaphore, but it assumes that there is only a
single fiber using it. e.g. you must not call {!use}, {!fork}, etc from
two different fibers. *)
type t
val create : sw:Switch.t -> int -> t
(** [create ~sw n] is a limiter that allows running up to [n] jobs at once. *)
val use : t -> ('a -> 'b) -> 'a -> 'b
(** [use t fn x] runs [fn x] in this fiber, counting it as one use of [t]. *)
val fork : t -> ('a -> unit) -> 'a -> unit
(** [fork t fn x] runs [fn x] in a new fibre, once a fiber is free. *)
val fork_promise_exn : t -> ('a -> 'b) -> 'a -> 'b Promise.t
(** [fork_promise_exn t fn x] runs [fn x] in a new fibre, once a fiber is free,
and returns a promise for the result. *)
end = struct
type t = {
mutable free_fibers : int;
cond : unit Single_waiter.t;
sw : Switch.t;
}
let max_fibers_err n =
Fmt.failwith "max_fibers must be positive (got %d)" n
let create ~sw max_fibers =
if max_fibers <= 0 then max_fibers_err max_fibers;
{
free_fibers = max_fibers;
cond = Single_waiter.create ();
sw;
}
let await_free t =
if t.free_fibers = 0 then Single_waiter.await t.cond "Limiter.await_free" t.sw.cancel.id;
assert (t.free_fibers > 0);
t.free_fibers <- t.free_fibers - 1
let release t =
t.free_fibers <- t.free_fibers + 1;
if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond
let use t fn x =
await_free t;
let r = fn x in
release t;
r
let fork_promise_exn t fn x =
await_free t;
fork_promise_exn ~sw:t.sw (fun () -> let r = fn x in release t; r)
let fork t fn x =
await_free t;
fork ~sw:t.sw (fun () -> fn x; release t)
end
let filter_map ?(max_fibers=max_int) fn items =
match items with
| [] -> []
| items ->
Switch.run ~name:"filter_map" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> []
| [x] -> Option.to_list (Limiter.use limiter fn x)
| x :: xs ->
let x = Limiter.fork_promise_exn limiter fn x in
let xs = aux xs in
opt_cons (Promise.await x) xs
in
aux items
let map ?max_fibers fn = filter_map ?max_fibers (fun x -> Some (fn x))
let filter ?max_fibers fn = filter_map ?max_fibers (fun x -> if fn x then Some x else None)
let iter ?(max_fibers=max_int) fn items =
match items with
| [] -> ()
| items ->
Switch.run ~name:"iter" @@ fun sw ->
let limiter = Limiter.create ~sw max_fibers in
let rec aux = function
| [] -> ()
| [x] -> Limiter.use limiter fn x
| x :: xs ->
Limiter.fork limiter fn x;
aux xs
in
aux items
end
type 'a key = 'a Hmap.key
let create_key () = Hmap.Key.create ()
let get key = Hmap.find key (Cancel.Fiber_context.get_vars ())
let with_binding var value fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.add var value ctx.vars) fn
let without_binding var fn =
let ctx = Effect.perform Cancel.Get_context in
Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn
type 'out coroutine =
[ `Init
| `Ready of [`Running of 'out Suspend.enqueue] Suspend.enqueue
| `Running of 'out Suspend.enqueue
| `Finished
| `Client_cancelled of exn
| `Failed of exn ]
let unwrap_cancelled state =
match Atomic.get state with
| `Client_cancelled ex -> ex
| `Finished | `Failed _ -> Invalid_argument "Coroutine has already stopped!"
| `Ready _ -> Invalid_argument "Coroutine has already yielded!"
| `Init | `Running _ -> Invalid_argument "Coroutine in unexpected state!"
let run_coroutine ~state fn =
let await_request ~prev ~on_suspend =
Suspend.enter "await-consumer" (fun ctx enqueue ->
let ready = `Ready enqueue in
if Atomic.compare_and_set state prev ready then (
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state ready (`Failed ex) then
enqueue (Error ex);
);
on_suspend ()
) else (
enqueue (Error (unwrap_cancelled state))
)
)
in
let current_state = ref (await_request ~prev:`Init ~on_suspend:ignore) in
fn (fun v ->
let `Running enqueue as prev = !current_state in
current_state := await_request ~prev ~on_suspend:(fun () -> enqueue (Ok (Some v)))
);
if Atomic.compare_and_set state (!current_state :> _ coroutine) `Finished then (
let `Running enqueue = !current_state in
enqueue (Ok None)
) else (
raise (unwrap_cancelled state)
)
let fork_coroutine ~sw fn =
let state = Atomic.make `Init in
fork_daemon ~sw (fun () ->
try
run_coroutine ~state fn;
`Stop_daemon
with ex ->
match ex, Atomic.exchange state (`Failed ex) with
| _, `Running enqueue ->
enqueue (Error ex);
`Stop_daemon
| Cancel.Cancelled _, _ ->
`Stop_daemon
| _ ->
raise ex
);
fun () ->
Suspend.enter "await-producer" (fun ctx enqueue ->
let rec aux () =
match Atomic.get state with
| `Ready resume as prev ->
let running = `Running enqueue in
if Atomic.compare_and_set state prev running then (
resume (Ok running);
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Atomic.compare_and_set state running (`Client_cancelled ex) then
enqueue (Error ex)
)
) else aux ()
| `Finished -> enqueue (Error (Invalid_argument "Coroutine has already finished!"))
| `Failed ex | `Client_cancelled ex -> enqueue (Error (Invalid_argument ("Coroutine has already failed: " ^ Printexc.to_string ex)))
| `Running _ -> enqueue (Error (Invalid_argument "Coroutine is still running!"))
| `Init -> assert false
in
aux ()
)
let fork_seq ~sw fn =
Seq.of_dispenser (fork_coroutine ~sw fn)