1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
open Std
type shutdown_command = [ `Receive | `Send | `All ]
type 't read_method = ..
type 't read_method += Read_source_buffer of ('t -> (Cstruct.t list -> int) -> unit)
type source_ty = [`R | `Flow]
type 'a source = ([> source_ty] as 'a) r
type sink_ty = [`W | `Flow]
type 'a sink = ([> sink_ty] as 'a) r
type shutdown_ty = [`Shutdown]
type 'a shutdown = ([> shutdown_ty] as 'a) r
module Pi = struct
module type SOURCE = sig
type t
val read_methods : t read_method list
val single_read : t -> Cstruct.t -> int
end
module type SINK = sig
type t
val single_write : t -> Cstruct.t list -> int
val copy : t -> src:_ source -> unit
end
module type SHUTDOWN = sig
type t
val shutdown : t -> shutdown_command -> unit
end
type (_, _, _) Resource.pi +=
| Source : ('t, (module SOURCE with type t = 't), [> source_ty]) Resource.pi
| Sink : ('t, (module SINK with type t = 't), [> sink_ty]) Resource.pi
| Shutdown : ('t, (module SHUTDOWN with type t = 't), [> shutdown_ty]) Resource.pi
let source (type t) (module X : SOURCE with type t = t) =
Resource.handler [H (Source, (module X))]
let sink (type t) (module X : SINK with type t = t) =
Resource.handler [H (Sink, (module X))]
let shutdown (type t) (module X : SHUTDOWN with type t = t) =
Resource.handler [ H (Shutdown, (module X))]
module type TWO_WAY = sig
include SHUTDOWN
include SOURCE with type t := t
include SINK with type t := t
end
let two_way (type t) (module X : TWO_WAY with type t = t) =
Resource.handler [
H (Shutdown, (module X));
H (Source, (module X));
H (Sink, (module X));
]
let simple_copy ~single_write t ~src:(Resource.T (src, src_ops)) =
let rec write_all buf =
if not (Cstruct.is_empty buf) then (
let sent = single_write t [buf] in
write_all (Cstruct.shift buf sent)
)
in
let module Src = (val (Resource.get src_ops Source)) in
try
let buf = Cstruct.create 4096 in
while true do
let got = Src.single_read src buf in
write_all (Cstruct.sub buf 0 got)
done
with End_of_file -> ()
end
open Pi
let close = Resource.close
let single_read (Resource.T (t, ops)) buf =
let module X = (val (Resource.get ops Source)) in
let got = X.single_read t buf in
assert (got > 0 && got <= Cstruct.length buf);
got
let rec read_exact t buf =
if Cstruct.length buf > 0 then (
let got = single_read t buf in
read_exact t (Cstruct.shift buf got)
)
module Cstruct_source = struct
type t = Cstruct.t list ref
let create data = ref data
let read_source_buffer t fn =
let rec aux () =
match !t with
| [] -> raise End_of_file
| x :: xs when Cstruct.length x = 0 -> t := xs; aux ()
| xs ->
let n = fn xs in
t := Cstruct.shiftv xs n
in
aux ()
let read_methods =
[ Read_source_buffer read_source_buffer ]
let single_read t dst =
let avail, src = Cstruct.fillv ~dst ~src:!t in
if avail = 0 then raise End_of_file;
t := src;
avail
end
let cstruct_source =
let ops = Pi.source (module Cstruct_source) in
fun data -> Resource.T (Cstruct_source.create data, ops)
module String_source = struct
type t = {
s : string;
mutable offset : int;
}
let single_read t dst =
if t.offset = String.length t.s then raise End_of_file;
let len = min (Cstruct.length dst) (String.length t.s - t.offset) in
Cstruct.blit_from_string t.s t.offset dst 0 len;
t.offset <- t.offset + len;
len
let read_methods = []
let create s = { s; offset = 0 }
end
let string_source =
let ops = Pi.source (module String_source) in
fun s -> Resource.T (String_source.create s, ops)
let single_write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.single_write t bufs
let write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
let rec aux = function
| [] -> ()
| bufs ->
let wrote = X.single_write t bufs in
aux (Cstruct.shiftv bufs wrote)
in
aux bufs
let copy src (Resource.T (t, ops)) =
let module X = (val (Resource.get ops Sink)) in
X.copy t ~src
let copy_string s = copy (string_source s)
module Buffer_sink = struct
type t = Buffer.t
let single_write t bufs =
let old_length = Buffer.length t in
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
Buffer.length t - old_length
let copy t ~src = Pi.simple_copy ~single_write t ~src
end
let buffer_sink =
let ops = Pi.sink (module Buffer_sink) in
fun b -> Resource.T (b, ops)
type two_way_ty = [source_ty | sink_ty | shutdown_ty]
type 'a two_way = ([> two_way_ty] as 'a) r
let shutdown (Resource.T (t, ops)) cmd =
let module X = (val (Resource.get ops Shutdown)) in
X.shutdown t cmd