Source file flow.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
open Std

type shutdown_command = [ `Receive | `Send | `All ]

type 't read_method = ..
type 't read_method += Read_source_buffer of ('t -> (Cstruct.t list -> int) -> unit)

type source_ty = [`R | `Flow]
type 'a source = ([> source_ty] as 'a) r

type sink_ty = [`W | `Flow]
type 'a sink = ([> sink_ty] as 'a) r

type shutdown_ty = [`Shutdown]
type 'a shutdown = ([> shutdown_ty] as 'a) r

module Pi = struct
  module type SOURCE = sig
    type t
    val read_methods : t read_method list
    val single_read : t -> Cstruct.t -> int
  end

  module type SINK = sig
    type t
    val single_write : t -> Cstruct.t list -> int
    val copy : t -> src:_ source -> unit
  end

  module type SHUTDOWN = sig
    type t
    val shutdown : t -> shutdown_command -> unit
  end

  type (_, _, _) Resource.pi +=
    | Source : ('t, (module SOURCE with type t = 't), [> source_ty]) Resource.pi
    | Sink : ('t, (module SINK with type t = 't), [> sink_ty]) Resource.pi
    | Shutdown : ('t, (module SHUTDOWN with type t = 't), [> shutdown_ty]) Resource.pi

  let source (type t) (module X : SOURCE with type t = t) =
    Resource.handler [H (Source, (module X))]

  let sink (type t) (module X : SINK with type t = t) =
    Resource.handler [H (Sink, (module X))]

  let shutdown (type t) (module X : SHUTDOWN with type t = t) =
    Resource.handler [ H (Shutdown, (module X))]

  module type TWO_WAY = sig
    include SHUTDOWN
    include SOURCE with type t := t
    include SINK with type t := t
  end

  let two_way (type t) (module X : TWO_WAY with type t = t) =
    Resource.handler [
      H (Shutdown, (module X));
      H (Source, (module X));
      H (Sink, (module X));
    ]

  let simple_copy ~single_write t ~src:(Resource.T (src, src_ops)) =
    let rec write_all buf =
      if not (Cstruct.is_empty buf) then (
        let sent = single_write t [buf] in
        write_all (Cstruct.shift buf sent)
      )
    in
    let module Src = (val (Resource.get src_ops Source)) in
    try
      let buf = Cstruct.create 4096 in
      while true do
        let got = Src.single_read src buf in
        write_all (Cstruct.sub buf 0 got)
      done
    with End_of_file -> ()
end

open Pi

let close = Resource.close

let single_read (Resource.T (t, ops)) buf =
  let module X = (val (Resource.get ops Source)) in
  let got = X.single_read t buf in
  assert (got > 0 && got <= Cstruct.length buf);
  got

let rec read_exact t buf =
  if Cstruct.length buf > 0 then (
    let got = single_read t buf in
    read_exact t (Cstruct.shift buf got)
  )

module Cstruct_source = struct
  type t = Cstruct.t list ref

  let create data = ref data

  let read_source_buffer t fn =
    let rec aux () =
      match !t with
      | [] -> raise End_of_file
      | x :: xs when Cstruct.length x = 0 -> t := xs; aux ()
      | xs ->
        let n = fn xs in
        t := Cstruct.shiftv xs n
    in
    aux ()

  let read_methods =
    [ Read_source_buffer read_source_buffer ]

  let single_read t dst =
    let avail, src = Cstruct.fillv ~dst ~src:!t in
    if avail = 0 then raise End_of_file;
    t := src;
    avail

end

let cstruct_source =
  let ops = Pi.source (module Cstruct_source) in
  fun data -> Resource.T (Cstruct_source.create data, ops)

module String_source = struct
  type t = {
    s : string;
    mutable offset : int;
  }

  let single_read t dst =
    if t.offset = String.length t.s then raise End_of_file;
    let len = min (Cstruct.length dst) (String.length t.s - t.offset) in
    Cstruct.blit_from_string t.s t.offset dst 0 len;
    t.offset <- t.offset + len;
    len

  let read_methods = []

  let create s = { s; offset = 0 }
end

let string_source =
  let ops = Pi.source (module String_source) in
  fun s -> Resource.T (String_source.create s, ops)

let single_write (Resource.T (t, ops)) bufs =
  let module X = (val (Resource.get ops Sink)) in
  X.single_write t bufs

let write (Resource.T (t, ops)) bufs =
  let module X = (val (Resource.get ops Sink)) in
  let rec aux = function
    | [] -> ()
    | bufs ->
      let wrote = X.single_write t bufs in
      aux (Cstruct.shiftv bufs wrote)
  in
  aux bufs

let copy src (Resource.T (t, ops)) =
  let module X = (val (Resource.get ops Sink)) in
  X.copy t ~src

let copy_string s = copy (string_source s)

module Buffer_sink = struct
  type t = Buffer.t

  let single_write t bufs =
    let old_length = Buffer.length t in
    List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
    Buffer.length t - old_length

  let copy t ~src = Pi.simple_copy ~single_write t ~src
end

let buffer_sink =
  let ops = Pi.sink (module Buffer_sink) in
  fun b -> Resource.T (b, ops)

type two_way_ty = [source_ty | sink_ty | shutdown_ty]
type 'a two_way = ([> two_way_ty] as 'a) r

let shutdown (Resource.T (t, ops)) cmd =
  let module X = (val (Resource.get ops Shutdown)) in
  X.shutdown t cmd