Source file promise.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
type 'a state =
  | Resolved of 'a
  | Unresolved of Broadcast.t

type !'a promise = {
  id : Trace.id;
  state : 'a state Atomic.t;    (* Note: we always switch to Resolved before broadcasting *)
}

type +!'a t
type -!'a u

type 'a or_exn = ('a, exn) result t

let to_public_promise : 'a promise -> 'a t = Obj.magic
let to_public_resolver : 'a promise -> 'a u = Obj.magic
let of_public_promise : 'a t -> 'a promise = Obj.magic
let of_public_resolver : 'a u -> 'a promise = Obj.magic

let create_with_id id =
  let t = {
    id;
    state = Atomic.make (Unresolved (Broadcast.create ()));
  } in
  to_public_promise t, to_public_resolver t

let create ?label () =
  let id = Trace.mint_id () in
  Trace.create_obj ?label id Promise;
  create_with_id id

let create_resolved x =
  let id = Trace.mint_id () in
  Trace.create_obj id Promise;
  to_public_promise { id; state = Atomic.make (Resolved x) }

let await t =
  let t = of_public_promise t in
  match Atomic.get t.state with
  | Resolved x ->
    Trace.get t.id;
    x
  | Unresolved b ->
    Suspend.enter "Promise.await" (fun ctx enqueue ->
        match Broadcast.suspend b (fun () -> enqueue (Ok ())) with
        | None -> ()  (* We got resumed immediately *)
        | Some request ->
          match Atomic.get t.state with
          | Resolved _ ->
            (* The promise was resolved as we were suspending.
               Resume now if we haven't already done so. *)
            if Broadcast.cancel request then enqueue (Ok ())
          | Unresolved _ ->
            (* We observed the promise to be still unresolved after registering a waiter.
               Therefore any resolution must happen after we were registered and we will be notified. *)
            Trace.try_get t.id;
            Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
                if Broadcast.cancel request then enqueue (Error ex)
                (* else already resumed *)
              )
      );
    match Atomic.get t.state with
    | Resolved x ->
      Trace.get t.id;
      x
    | Unresolved _ -> assert false

let await_exn t =
  match await t with
  | Ok x -> x
  | Error ex -> raise ex

let try_resolve t v =
  let rec resolve' t v =
    match Atomic.get t.state with
    | Resolved _ -> false
    | Unresolved b as prev ->
      if Atomic.compare_and_set t.state prev (Resolved v) then (
        Trace.put t.id;
        Broadcast.resume_all b;
        true
      ) else (
        (* Otherwise, the promise was already resolved. Retry (to get the error). *)
        resolve' t v
      )
  in
  resolve' (of_public_resolver t) v

let resolve u x =
  if not (try_resolve u x) then
    invalid_arg "Can't resolve already-resolved promise"

let resolve_ok    u x = resolve u (Ok x)
let resolve_error u x = resolve u (Error x)

let peek t =
  let t = of_public_promise t in
  match Atomic.get t.state with
  | Unresolved _ -> None
  | Resolved x -> Some x

let id t =
  let t = of_public_promise t in
  t.id

let is_resolved t =
  Option.is_some (peek t)