Source file lwt_process.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# 1 "src/unix/lwt_process.cppo.ml"
(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
   details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)



open Lwt.Infix

type command = string * string array

let shell =
  if Sys.win32 then
    fun cmd -> ("", [|"cmd.exe"; "/c"; "\000" ^ cmd|])
  else
    fun cmd -> ("", [|"/bin/sh"; "-c"; cmd|])

type redirection =
  [ `Keep
  | `Dev_null
  | `Close
  | `FD_copy of Unix.file_descr
  | `FD_move of Unix.file_descr ]

(* +-----------------------------------------------------------------+
   | OS-dependent command spawning                                   |
   +-----------------------------------------------------------------+ *)

type proc = {
  id : int;
  (* The process id. *)
  fd : Unix.file_descr;
  (* A handle on windows, and a dummy value of Unix. *)
}

let win32_get_fd fd redirection =
  match redirection with
  | `Keep ->
    Some fd
  | `Dev_null ->
    Some (Unix.openfile "nul" [Unix.O_RDWR; Unix.O_KEEPEXEC] 0o666)
  | `Close ->
    None
  | `FD_copy fd' ->
    Some fd'
  | `FD_move fd' ->
    Some fd'

external win32_create_process :
  string option -> string -> string option -> string option ->
  (Unix.file_descr option * Unix.file_descr option * Unix.file_descr option) ->
    proc = "lwt_process_create_process"

let win32_quote arg =
  if String.length arg > 0 && arg.[0] = '\000' then
    String.sub arg 1 (String.length arg - 1)
  else
    Filename.quote arg

let win32_spawn
    ?cwd
    ?(stdin:redirection=`Keep)
    ?(stdout:redirection=`Keep)
    ?(stderr:redirection=`Keep)
    (prog, args) env
  =
  let cmdline = String.concat " " (List.map win32_quote (Array.to_list args)) in
  let env =
    match env with
    | None ->
      None
    | Some env ->
      let len =
        Array.fold_left (fun len str -> String.length str + len + 1) 1 env in
      let res = Bytes.create len in
      let ofs =
        Array.fold_left
          (fun ofs str ->
             let len = String.length str in
             String.blit str 0 res ofs len;
             Bytes.set res (ofs + len) '\000';
             ofs + len + 1)
          0 env
      in
      Bytes.set res ofs '\000';
      Some (Bytes.unsafe_to_string res)
  in
  let stdin_fd  = win32_get_fd Unix.stdin stdin
  and stdout_fd = win32_get_fd Unix.stdout stdout
  and stderr_fd = win32_get_fd Unix.stderr stderr in
  let proc =
    win32_create_process
      (if prog = "" then None else Some prog) cmdline env cwd
      (stdin_fd, stdout_fd, stderr_fd)
  in
  let close fd fd' =
    match fd with
    | `FD_move _ | `Dev_null ->
      Unix.close (match fd' with Some fd' -> fd' | _ -> assert false)
    | _ -> ()
  in
  close stdin stdin_fd;
  close stdout stdout_fd;
  close stderr stderr_fd;
  proc

external win32_wait_job : Unix.file_descr -> int Lwt_unix.job =
  "lwt_process_wait_job"

let win32_waitproc proc =
  Lwt_unix.run_job (win32_wait_job proc.fd) >>= fun code ->
  Lwt.return
    (proc.id,
     Lwt_unix.WEXITED code,
     {Lwt_unix.ru_utime = 0.; Lwt_unix.ru_stime = 0.})

external win32_terminate_process : Unix.file_descr -> int -> unit =
  "lwt_process_terminate_process"

let win32_terminate proc =
  win32_terminate_process proc.fd 1

let unix_redirect fd redirection = match redirection with
  | `Keep ->
    ()
  | `Dev_null ->
    let dev_null = Unix.openfile "/dev/null" [Unix.O_RDWR; Unix.O_KEEPEXEC] 0o666 in
    Unix.dup2 ~cloexec:false dev_null fd;
    Unix.close dev_null
  | `Close ->
    Unix.close fd
  | `FD_copy fd' ->
    Unix.dup2 ~cloexec:false fd' fd
  | `FD_move fd' ->
    Unix.dup2 ~cloexec:false fd' fd;
    Unix.close fd'

# 137 "src/unix/lwt_process.cppo.ml"
external unix_exit : int -> 'a = "caml_unix_exit"

# 142 "src/unix/lwt_process.cppo.ml"
let unix_spawn
    ?cwd
    ?(stdin:redirection=`Keep)
    ?(stdout:redirection=`Keep)
    ?(stderr:redirection=`Keep)
    (prog, args) env
  =
  let prog = if prog = "" && Array.length args > 0 then args.(0) else prog in
  match Lwt_unix.fork () with
  | 0 ->
    unix_redirect Unix.stdin stdin;
    unix_redirect Unix.stdout stdout;
    unix_redirect Unix.stderr stderr;
    begin
      try
        begin match cwd with
          | None -> ()
          | Some dir ->
            Sys.chdir dir
        end;
        match env with
        | None ->
          Unix.execvp prog args
        | Some env ->
          Unix.execvpe prog args env
      with _ ->
        (* Do not run at_exit hooks *)
        unix_exit 127
    end
  | id ->
    let close = function
      | `FD_move fd ->
        Unix.close fd
      | _ ->
        ()
    in
    close stdin;
    close stdout;
    close stderr;
    {id; fd = Unix.stdin}

let unix_waitproc proc = Lwt_unix.wait4 [] proc.id

let unix_terminate proc =
  Unix.kill proc.id Sys.sigkill

let spawn     = if Sys.win32 then win32_spawn     else unix_spawn
let waitproc  = if Sys.win32 then win32_waitproc  else unix_waitproc
let terminate = if Sys.win32 then win32_terminate else unix_terminate

(* +-----------------------------------------------------------------+
   | Objects                                                         |
   +-----------------------------------------------------------------+ *)

type state =
  | Running
  | Exited of Unix.process_status

let status (_pid, status, _rusage) = status
let rusage (_pid, _status, rusage) = rusage

external cast_chan : 'a Lwt_io.channel -> unit Lwt_io.channel = "%identity"
(* Transform a channel into a channel that only support closing. *)

let ignore_close chan = ignore (Lwt_io.close chan)

class virtual common timeout proc channels =
  let wait = waitproc proc in
  object(self)
    val mutable closed = false

    method pid = proc.id

    method state =
      match Lwt.poll wait with
      | None -> Running
      | Some (_pid, status, _rusage) -> Exited status

    method kill signum =
      if Lwt.state wait = Lwt.Sleep then
        Unix.kill proc.id signum

    method terminate =
      if Lwt.state wait = Lwt.Sleep then
        terminate proc

    method close =
      if closed then self#status
      else (
        closed <- true;
        Lwt.protected (Lwt.join (List.map Lwt_io.close channels))
        >>= fun () -> self#status
      )
    method status = Lwt.protected wait >|= status
    method rusage = Lwt.protected wait >|= rusage

    initializer
      (* Ensure channels are closed when no longer used. *)
      List.iter (Gc.finalise ignore_close) channels;
      (* Handle timeout. *)
      match timeout with
      | None ->
        ()
      | Some dt ->
        ignore (
          (* Ignore errors since they can be obtained by
             self#close. *)
          Lwt.try_bind
            (fun () ->
               Lwt.choose [(Lwt_unix.sleep dt >>= fun () -> Lwt.return_false);
                           (wait >>= fun _ -> Lwt.return_true)])
            (function
              | true ->
                Lwt.return_unit
              | false ->
                self#terminate;
                self#close >>= fun _ -> Lwt.return_unit)
            (fun _ ->
               (* The exception is dropped because it can be
                  obtained with self#close. *)
               Lwt.return_unit)
        )
  end

class process_none ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd =
  let proc = spawn cmd env ?cwd ?stdin ?stdout ?stderr in
  object
    inherit common timeout proc []
  end

class process_in ?timeout ?env ?cwd ?stdin ?stderr cmd =
  let stdout_r, stdout_w = Lwt_unix.pipe_in ~cloexec:true () in
  let proc = spawn cmd env ?cwd ?stdin ~stdout:(`FD_move stdout_w) ?stderr in
  let stdout = Lwt_io.of_fd ~mode:Lwt_io.input stdout_r in
  object
    inherit common timeout proc [cast_chan stdout]
    method stdout = stdout
  end

class process_out ?timeout ?env ?cwd ?stdout ?stderr cmd =
  let stdin_r, stdin_w = Lwt_unix.pipe_out ~cloexec:true () in
  let proc = spawn cmd env ?cwd ~stdin:(`FD_move stdin_r) ?stdout ?stderr in
  let stdin = Lwt_io.of_fd ~mode:Lwt_io.output stdin_w in
  object
    inherit common timeout proc [cast_chan stdin]
    method stdin = stdin
  end

class process ?timeout ?env ?cwd ?stderr cmd =
  let stdin_r, stdin_w = Lwt_unix.pipe_out ~cloexec:true ()
  and stdout_r, stdout_w = Lwt_unix.pipe_in ~cloexec:true () in
  let proc =
    spawn
      cmd env ?cwd ~stdin:(`FD_move stdin_r) ~stdout:(`FD_move stdout_w) ?stderr
  in
  let stdin = Lwt_io.of_fd ~mode:Lwt_io.output stdin_w
  and stdout = Lwt_io.of_fd ~mode:Lwt_io.input stdout_r in
  object
    inherit common timeout proc [cast_chan stdin; cast_chan stdout]
    method stdin = stdin
    method stdout = stdout
  end

class process_full ?timeout ?env ?cwd cmd =
  let stdin_r, stdin_w = Lwt_unix.pipe_out ~cloexec:true ()
  and stdout_r, stdout_w = Lwt_unix.pipe_in ~cloexec:true ()
  and stderr_r, stderr_w = Lwt_unix.pipe_in ~cloexec:true () in
  let proc =
    spawn
      cmd env ?cwd
      ~stdin:(`FD_move stdin_r)
      ~stdout:(`FD_move stdout_w)
      ~stderr:(`FD_move stderr_w)
  in
  let stdin = Lwt_io.of_fd ~mode:Lwt_io.output stdin_w
  and stdout = Lwt_io.of_fd ~mode:Lwt_io.input stdout_r
  and stderr = Lwt_io.of_fd ~mode:Lwt_io.input stderr_r in
  object
    inherit
      common timeout proc [cast_chan stdin; cast_chan stdout; cast_chan stderr]
    method stdin = stdin
    method stdout = stdout
    method stderr = stderr
  end

let open_process_none ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd =
  new process_none ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd

let open_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd =
  new process_in ?timeout ?env ?cwd ?stdin ?stderr cmd

let open_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd =
  new process_out ?timeout ?env ?cwd ?stdout ?stderr cmd

let open_process ?timeout ?env ?cwd ?stderr cmd =
  new process ?timeout ?env ?cwd ?stderr cmd

let open_process_full ?timeout ?env ?cwd cmd =
  new process_full ?timeout ?env ?cwd cmd

let make_with backend ?timeout ?env ?cwd cmd f =
  let process = backend ?timeout ?env ?cwd cmd in
  Lwt.finalize
    (fun () -> f process)
    (fun () ->
       process#close >>= fun _ ->
       Lwt.return_unit)

let with_process_none ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd f =
  make_with (open_process_none ?stdin ?stdout ?stderr) ?timeout ?env ?cwd cmd f

let with_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd f =
  make_with (open_process_in ?stdin ?stderr) ?timeout ?env ?cwd cmd f

let with_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd f =
  make_with (open_process_out ?stdout ?stderr) ?timeout ?env ?cwd cmd f

let with_process ?timeout ?env ?cwd ?stderr cmd f =
  make_with (open_process ?stderr) ?timeout ?env ?cwd cmd f

let with_process_full ?timeout ?env ?cwd cmd f =
  make_with open_process_full ?timeout ?env ?cwd cmd f

(* +-----------------------------------------------------------------+
   | High-level functions                                            |
   +-----------------------------------------------------------------+ *)

let exec ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd =
  (open_process_none ?timeout ?env ?cwd ?stdin ?stdout ?stderr cmd)#close

let ignore_close ch =
  ignore (Lwt_io.close ch)

let read_opt read ic =
  Lwt.catch
    (fun () -> read ic >|= fun x -> Some x)
    (function
      | Unix.Unix_error (Unix.EPIPE, _, _) | End_of_file ->
        Lwt.return_none
      | exn -> Lwt.reraise exn) [@ocaml.warning "-4"]

let recv_chars pr =
  let ic = pr#stdout in
  Gc.finalise ignore_close ic;
  Lwt_stream.from (fun _ ->
    read_opt Lwt_io.read_char ic >>= fun x ->
    if x = None then begin
      Lwt_io.close ic >>= fun () ->
      Lwt.return x
    end else
      Lwt.return x)

let recv_lines pr =
  let ic = pr#stdout in
  Gc.finalise ignore_close ic;
  Lwt_stream.from (fun _ ->
    read_opt Lwt_io.read_line ic >>= fun x ->
    if x = None then begin
      Lwt_io.close ic >>= fun () ->
      Lwt.return x
    end else
      Lwt.return x)

let recv pr =
  let ic = pr#stdout in
  Lwt.finalize
    (fun () -> Lwt_io.read ic)
    (fun () -> Lwt_io.close ic)

let recv_line pr =
  let ic = pr#stdout in
  Lwt.finalize
    (fun () -> Lwt_io.read_line ic)
    (fun () -> Lwt_io.close ic)

let send f pr data =
  let oc = pr#stdin in
  Lwt.finalize
    (fun () -> f oc data)
    (fun () -> Lwt_io.close oc)

(* Receiving *)

let pread ?timeout ?env ?cwd ?stdin ?stderr cmd =
  recv (open_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd)

let pread_chars ?timeout ?env ?cwd ?stdin ?stderr cmd =
  recv_chars (open_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd)

let pread_line ?timeout ?env ?cwd ?stdin ?stderr cmd =
  recv_line (open_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd)

let pread_lines ?timeout ?env ?cwd ?stdin ?stderr cmd =
  recv_lines (open_process_in ?timeout ?env ?cwd ?stdin ?stderr cmd)

(* Sending *)

let pwrite ?timeout ?env ?cwd ?stdout ?stderr cmd text =
  send Lwt_io.write (open_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd) text

let pwrite_chars ?timeout ?env ?cwd ?stdout ?stderr cmd chars =
  send
    Lwt_io.write_chars
    (open_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd)
    chars

let pwrite_line ?timeout ?env ?cwd ?stdout ?stderr cmd line =
  send
    Lwt_io.write_line
    (open_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd)
    line

let pwrite_lines ?timeout ?env ?cwd ?stdout ?stderr cmd lines =
  send
    Lwt_io.write_lines
    (open_process_out ?timeout ?env ?cwd ?stdout ?stderr cmd)
    lines

(* Mapping *)

type 'a map_state =
  | Init
  | Save of 'a option Lwt.t
  | Done

(* Monitor the thread [sender] in the stream [st] so write errors are
   reported. *)
let monitor sender st =
  let sender = sender >|= fun () -> None in
  let state = ref Init in
  Lwt_stream.from
    (fun () ->
       match !state with
       | Init ->
         let getter = Lwt.apply Lwt_stream.get st in
         let result _ =
           match Lwt.state sender with
           | Lwt.Sleep ->
             (* The sender is still sleeping, behave as the
                getter. *)
             getter
           | Lwt.Return _ ->
             (* The sender terminated successfully, we are
                done monitoring it. *)
             state := Done;
             getter
           | Lwt.Fail _ ->
             (* The sender failed, behave as the sender for
                this element and save current getter. *)
             state := Save getter;
             sender
         in
         Lwt.try_bind (fun () -> Lwt.choose [sender; getter]) result result
       | Save t ->
         state := Done;
         t
       | Done ->
         Lwt_stream.get st)

let pmap ?timeout ?env ?cwd ?stderr cmd text =
  let pr = open_process ?timeout ?env ?cwd ?stderr cmd in
  (* Start the sender and getter at the same time. *)
  let sender = send Lwt_io.write pr text in
  let getter = recv pr in
  Lwt.catch
    (fun () ->
       (* Wait for both to terminate, returning the result of the
          getter. *)
       sender >>= fun () -> getter)
    (function
      | Lwt.Canceled as exn ->
        (* Cancel the getter if the sender was canceled. *)
        Lwt.cancel getter;
        Lwt.reraise exn
      | exn -> Lwt.reraise exn)

let pmap_chars ?timeout ?env ?cwd ?stderr cmd chars =
  let pr = open_process ?timeout ?env ?cwd ?stderr cmd in
  let sender = send Lwt_io.write_chars pr chars in
  monitor sender (recv_chars pr)

let pmap_line ?timeout ?env ?cwd ?stderr cmd line =
  let pr = open_process ?timeout ?env ?cwd ?stderr cmd in
  (* Start the sender and getter at the same time. *)
  let sender = send Lwt_io.write_line pr line in
  let getter = recv_line pr in
  Lwt.catch
    (fun () ->
       (* Wait for both to terminate, returning the result of the
          getter. *)
       sender >>= fun () -> getter)
    (function
      | Lwt.Canceled as exn ->
        (* Cancel the getter if the sender was canceled. *)
        Lwt.cancel getter;
        Lwt.reraise exn
      | exn -> Lwt.reraise exn)

let pmap_lines ?timeout ?env ?cwd ?stderr cmd lines =
  let pr = open_process ?timeout ?env ?cwd ?stderr cmd in
  let sender = send Lwt_io.write_lines pr lines in
  monitor sender (recv_lines pr)