1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
open Eio.Std
type copy_method = [
| `Read_into
| `Read_source_buffer
]
module Mock_flow = struct
type tag = [`Generic | `Mock]
type t = {
label : string;
pp : string Fmt.t;
on_close : (unit -> unit) Queue.t;
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
mutable copy_method : copy_method;
}
let pp_default f s =
let rec aux i =
let nl =
match String.index_from_opt s i '\n' with
| None -> String.length s
| Some x -> x + 1
in
Fmt.Dump.string f (String.sub s i (nl - i));
if nl < String.length s then (
Fmt.cut f ();
aux nl
)
in
aux 0
let rec takev len = function
| [] -> []
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs
let write ~pp t bufs =
let size = Handler.run t.on_copy_bytes in
let len = min (Cstruct.lenv bufs) size in
let bufs = takev len bufs in
traceln "%s: wrote %a" t.label pp bufs;
len
let single_write t bufs =
let pp f = function
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
in
write ~pp t bufs
let copy_rsb_iovec t bufs =
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
write ~pp t bufs
let copy_rsb t rsb =
try while true do rsb (copy_rsb_iovec t) done
with End_of_file -> ()
let copy_via_buffer t src =
try
while true do
let size = Handler.run t.on_copy_bytes in
let buf = Cstruct.create size in
let n = Eio.Flow.single_read src buf in
traceln "%s: wrote @[<v>%a@]" t.label t.pp (Cstruct.to_string (Cstruct.sub buf 0 n))
done
with End_of_file -> ()
let read_methods = []
let single_read t buf =
let data = Handler.run t.on_read in
let len = String.length data in
if Cstruct.length buf < len then
Fmt.failwith "%s: read buffer too short for %a!" t.label t.pp data;
Cstruct.blit_from_string data 0 buf 0 len;
traceln "%s: read @[<v>%a@]" t.label t.pp data;
len
let copy t ~src =
match t.copy_method with
| `Read_into -> copy_via_buffer t src
| `Read_source_buffer ->
let Eio.Resource.T (src, ops) = src in
let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
let try_rsb = function
| Eio.Flow.Read_source_buffer rsb -> copy_rsb t (rsb src); true
| _ -> false
in
if not (List.exists try_rsb Src.read_methods) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
let shutdown t cmd =
traceln "%s: shutdown %s" t.label @@
match cmd with
| `Receive -> "receive"
| `Send -> "send"
| `All -> "all"
let close t =
while not (Queue.is_empty t.on_close) do
Queue.take t.on_close ()
done;
traceln "%s: closed" t.label
let make ?(pp=pp_default) label =
{
pp;
label;
on_close = Queue.create ();
on_read = Handler.make (`Raise End_of_file);
on_copy_bytes = Handler.make (`Return 4096);
copy_method = `Read_into;
}
end
type ty = [`Generic | `Mock] Eio.Net.stream_socket_ty
type t = ty r
type (_, _, _) Eio.Resource.pi += Type : ('t, 't -> Mock_flow.t, ty) Eio.Resource.pi
let raw (Eio.Resource.T (t, ops)) = Eio.Resource.get ops Type t
let attach_to_switch t sw =
let t = raw t in
let hook = Switch.on_release_cancellable sw (fun () -> Mock_flow.close t) in
Queue.add (fun () -> Eio.Switch.remove_hook hook) t.on_close
let on_read t = Handler.seq (raw t).on_read
let on_copy_bytes t = Handler.seq (raw t).on_copy_bytes
let set_copy_method t v = (raw t).copy_method <- v
let handler = Eio.Resource.handler (
H (Type, Fun.id) ::
Eio.Resource.bindings (Eio.Net.Pi.stream_socket (module Mock_flow))
)
let make ?pp label : t =
Eio.Resource.T (Mock_flow.make ?pp label, handler)