Source file pool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend
type 'a slot = 'a option ref
module Cell = struct
type 'a t =
| In_transition
| Request of ('a slot -> unit)
| Resource of 'a slot
| Finished
let init = In_transition
let segment_order = 2
let dump f = function
| In_transition -> Fmt.string f "In_transition"
| Request _ -> Fmt.string f "Request"
| Resource _ -> Fmt.string f "Resource"
| Finished -> Fmt.string f "Finished"
end
module Q = Cells.Make(Cell)
type 'a t = {
slots : int Atomic.t;
max_slots : int;
alloc : unit -> 'a;
validate : 'a -> bool;
dispose : 'a -> unit;
q : 'a Q.t;
}
let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc =
if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
{
slots = Atomic.make 0;
max_slots = max_size;
alloc;
validate;
dispose;
q = Q.make ();
}
let rec add t x =
let cell = Q.next_resume t.q in
let rec aux () =
match Atomic.get cell with
| In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux ()
| Finished -> add t x
| Request r as prev ->
if Atomic.compare_and_set cell prev Finished then (
r x
) else add t x
| Resource _ -> assert false
in
aux ()
let cancel segment cell =
match Atomic.exchange cell Cell.Finished with
| Request _ -> Q.cancel_cell segment; true
| Finished -> false
| In_transition | Resource _ -> assert false
let rec maybe_add_slot t current =
if current < t.max_slots then (
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
else maybe_add_slot t (Atomic.get t.slots)
)
let run_with t f slot =
match
begin match !slot with
| Some x when t.validate x -> f x
| Some x ->
slot := None;
t.dispose x;
let x = t.alloc () in
slot := Some x;
f x
| None ->
let x = t.alloc () in
slot := Some x;
f x
end
with
| r ->
add t slot;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
add t slot;
Printexc.raise_with_backtrace ex bt
let run_new_and_dispose t f =
let x = t.alloc () in
match f x with
| r ->
t.dispose x;
r
| exception ex ->
let bt = Printexc.get_raw_backtrace () in
t.dispose x;
Printexc.raise_with_backtrace ex bt
let use t ?(never_block=false) f =
let segment, cell = Q.next_suspend t.q in
match Atomic.get cell with
| Finished | Request _ -> assert false
| Resource slot ->
Atomic.set cell Finished;
run_with t f slot
| In_transition ->
let current = Atomic.get t.slots in
match current < t.max_slots with
| false when never_block -> (
match Atomic.exchange cell Finished with
| Resource slot -> run_with t f slot
| _ -> run_new_and_dispose t f
)
| can_add ->
if can_add then maybe_add_slot t current;
let slot =
Suspend.enter_unchecked "Pool.acquire" (fun ctx enqueue ->
let r x = enqueue (Ok x) in
if Atomic.compare_and_set cell In_transition (Request r) then (
match Fiber_context.get_error ctx with
| Some ex ->
if cancel segment cell then enqueue (Error ex);
| None ->
Fiber_context.set_cancel_fn ctx (fun ex ->
if cancel segment cell then enqueue (Error ex)
)
) else (
match Atomic.exchange cell Finished with
| Resource x -> enqueue (Ok x)
| _ -> assert false
);
)
in
run_with t f slot