1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
type 'a state =
| Resolved of 'a
| Unresolved of Broadcast.t
type !'a promise = {
id : Trace.id;
state : 'a state Atomic.t;
}
type +!'a t
type -!'a u
type 'a or_exn = ('a, exn) result t
let to_public_promise : 'a promise -> 'a t = Obj.magic
let to_public_resolver : 'a promise -> 'a u = Obj.magic
let of_public_promise : 'a t -> 'a promise = Obj.magic
let of_public_resolver : 'a u -> 'a promise = Obj.magic
let create_with_id id =
let t = {
id;
state = Atomic.make (Unresolved (Broadcast.create ()));
} in
to_public_promise t, to_public_resolver t
let create ?label () =
let id = Trace.mint_id () in
Trace.create_obj ?label id Promise;
create_with_id id
let create_resolved x =
let id = Trace.mint_id () in
Trace.create_obj id Promise;
to_public_promise { id; state = Atomic.make (Resolved x) }
let await t =
let t = of_public_promise t in
match Atomic.get t.state with
| Resolved x ->
Trace.get t.id;
x
| Unresolved b ->
Suspend.enter "Promise.await" (fun ctx enqueue ->
match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
| None -> ()
| Some request ->
match Atomic.get t.state with
| Resolved _ ->
if Broadcast.cancel request then enqueue (Ok ())
| Unresolved _ ->
Trace.try_get t.id;
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
)
);
match Atomic.get t.state with
| Resolved x ->
Trace.get t.id;
x
| Unresolved _ -> assert false
let await_exn t =
match await t with
| Ok x -> x
| Error ex -> raise ex
let try_resolve t v =
let rec resolve' t v =
match Atomic.get t.state with
| Resolved _ -> false
| Unresolved b as prev ->
if Atomic.compare_and_set t.state prev (Resolved v) then (
Trace.put t.id;
Broadcast.resume_all b;
true
) else (
resolve' t v
)
in
resolve' (of_public_resolver t) v
let resolve u x =
if not (try_resolve u x) then
invalid_arg "Can't resolve already-resolved promise"
let resolve_ok u x = resolve u (Ok x)
let resolve_error u x = resolve u (Error x)
let peek t =
let t = of_public_promise t in
match Atomic.get t.state with
| Unresolved _ -> None
| Resolved x -> Some x
let id t =
let t = of_public_promise t in
t.id
let is_resolved t =
Option.is_some (peek t)