Source file rcfd.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
(* To prevent races closing FDs, we do some ref-counting.

   Logically, the states of the wrapper for an FD [x] are:

   - open : the FD is available for use.
   - closing/users : no further operations can start, but some are still in progress.
   - closing/no-users : all operations have finished.
   - closing/closed : we no longer own the FD.

   We start by dividing ownership of [x] into [max_int] shares
   (enough shares for every sys-thread to take one).
   Having a fractional share (not 0 or 1) means you can use [x] but not close it.

   In the [open] and [closing/users] states, [t] owns the fraction
   [(max_int - ops) / max_int] of [x].

   In [closing/no-users], [t] owns all of [x].

   Initially, [t] is in the [open] state and [ops = 0].

   If you increment [ops] in [open] or [closing/users], you get one share and can use the FD
   (though in [closing/users] you should immediately return it).
   When you decrement [ops] in these cases, you give back your share and must not use the FD further.

   If you increment/decrement when in the state [closing/no-users] or [closing/closed],
   you do not get/return a share.

   A fiber can call {!close} to move from [open] at any time.
   If there are operations in progress when it does this, it transitions to [closing/users].
   Otherwise, it goes directly to [closing/no-users].
   The fiber that does this is known as the "closing fiber"
   and is responsible for finishing the close.

   We move from [closing/users] to [closing/no-users] when [ops] becomes 0
   (and [t] therefore owns all shares of [x]).
   [ops] may continue to change after this, but we never return to [closing/users]
   or give any user a share of [x] after this.

   From [closing/no-users], we transition to [closing/closed],
   transferring the now-full ownership of [x] to the closing fiber.

   In reality, the three [closing/*] states are represented by the single constructor [Closing],
   and the code must work whatever the true state might be. *)

type state =
  | Open of Unix.file_descr
  | Closing of (unit -> unit)     (* Function is called when [ops] becomes 0. *)

type t = {
  ops : int Atomic.t;
  fd : state Atomic.t;
}

let fully_closed = Closing ignore      (* Used for [closing/closed] *)

let put t =
  let old = Atomic.fetch_and_add t.ops (-1) in
  if old = 1 then (
    (* We decremented [ops] from one to zero. We may need to notify the closer. *)
    match Atomic.get t.fd with
    | Open _ -> ()     (* The fast path. We're not closing. *)
    | Closing no_users as prev ->
      (* There are four possibilities for the state when we did the decrement:
         - open: But it got closed after that. There could be new active users by now.
         - closing/users: We were the last user and transitioned to closing/no-users.
                          We need to notify the closer, or make sure someone else will do it later.
         - closing/no-users: We might need to notify, since a previous thread that reached zero
                             might have then seen [ops > 0] and deferred it to us.
         - closing/closed: No need to do anything, but notifying is harmless.
       *)
      if Atomic.get t.ops > 0 then ()  (* Someone else will deal with it. *)
      else if Atomic.compare_and_set t.fd prev fully_closed then (
        (* We observed [t.ops = 0] after closing, so we were then at either [closing/no-users]
           or [closing/closed], and we're now certainly at [closing/closed].
           If it was [closing/no-users] then we now own the FD, which we pass to the closer.
           If it was [closing/closed] then we don't, but [no_users] is [ignore] anyway. *)
        no_users ()
      ) else (
        (* Someone else notified the closer first. We're now in [closing/closed]. *)
      )
  ) else (
    assert (old > 1)
  )

let get t =
  Atomic.incr t.ops;
  (* If the state was [open] or [closing/users] then we now own 1 share of the FD. *)
  match Atomic.get t.fd with
  | Open fd ->
    (* The state was [open]. Give the share that we took to our caller. *)
    Some fd
  | Closing _ ->
    (* We want to close [t], so don't start a new operation.
       If the state was [open] or [closing/users] when we incremented [ops] then
       we return the share we took to [t] (which cannot now be [closing/no-users] as we are a user).
       Otherwise, it was [closing/no-users] or [closing/closed], and still is one of those. *)
    put t;
    None

let close_fd fd =
  Eio.Private.Trace.with_span "close" (fun () -> Unix.close fd)

(* Note: we could simplify this a bit by incrementing [t.ops], as [remove] does.
   However, that makes dscheck too slow. *)
let close t =
  match Atomic.get t.fd with
  | Closing _ ->
    (* Another caller closed [t] before us. *)
    false
  | Open fd as prev ->
    let next = Closing (fun () -> close_fd fd) in
    if Atomic.compare_and_set t.fd prev next then (
      (* We just transitioned from [open] to [closing/users] or [closing/no-users].
         We are now the closer. *)
      if Atomic.get t.ops = 0 && Atomic.compare_and_set t.fd next fully_closed then (
        (* We were in [closing/no-users] and are now in [closing/closed].
           We own the FD (and our original callback will never be called). *)
        close_fd fd
      ) else (
        (* The [next] callback remained installed and there is nothing left for us to do:
           - If [t.ops] was non-zero, another thread will eventually return it to zero and call our callback.
           - If the CAS failed, then another thread is invoking our callback. *)
      );
      true
    ) else (
      (* Another domain became the closer first. *)
      false
    )

let remove t =
  Atomic.incr t.ops;
  match Atomic.get t.fd with
  | Closing _ ->
    (* Another domain is dealing with it. *)
    put t;
    None
  | Open fd as prev ->
    Eio.Private.Suspend.enter_unchecked "Rcfd.remove" (fun _ctx enqueue ->
        if Atomic.compare_and_set t.fd prev (Closing (fun () -> enqueue (Ok (Some fd)))) then (
          (* We transitioned from [open] to [closing/users]. We are the closer. *)
          put t
        ) else (
          (* Another domain is handling the close instead. *)
          put t;
          enqueue (Ok (None))
        )
      )

let make fd =
  {
    ops = Atomic.make 0;
    fd = Atomic.make (Open fd);
  }

let is_open t =
  match Atomic.get t.fd with
  | Open _ -> true
  | Closing _ -> false

let use ~if_closed t f =
  match get t with
  | None -> if_closed ()
  | Some fd ->
    match f fd with
    | r -> put t; r
    | exception ex ->
      let bt = Printexc.get_raw_backtrace () in
      put t;
      Printexc.raise_with_backtrace ex bt

let peek t =
  match Atomic.get t.fd with
  | Open fd -> fd
  | Closing _ -> failwith "FD already closed!"

let pp f t =
  match Atomic.get t.fd with
  | Closing _ -> Fmt.string f "(closed FD)"
  | Open fd ->
    match Sys.os_type with
    | "Unix" ->
      let id : int = Obj.magic (fd : Unix.file_descr) in
      Fmt.pf f "FD-%d" id
    | _ ->
      Fmt.string f "(FD)"