unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

lwt_unix_impl.ml (8885B)


      1 (*
      2 Non-blocking I/O and select does not (fully) work under Windows.
      3 The library therefore does not use them under Windows, and will
      4 therefore have the following limitations:
      5 - No read will be performed while there are some threads ready to run
      6   or waiting to write;
      7 - When a read is pending, everything else will be blocked: [sleep]
      8   will not terminate and other reads will not be performed before
      9   this read terminates;
     10 - A write on a socket or a pipe can block the execution of the program
     11   if the data are never consumed at the other end of the connection.
     12   In particular, if both ends use this library and write at the same
     13   time, this could result in a dead-lock.
     14 - [connect] is blocking
     15 *)
     16 let windows_hack = Sys.win32
     17 
     18 module SleepQueue =
     19   Pqueue.Make (struct
     20     type t = float * int * unit Lwt.t
     21     let compare (t, i, _) (t', i', _) =
     22       let c = compare t t' in
     23       if c = 0 then i - i' else c
     24   end)
     25 let sleep_queue = ref SleepQueue.empty
     26 
     27 let event_counter = ref 0
     28 
     29 let sleep d =
     30   let res = Lwt.wait () in
     31   incr event_counter;
     32   let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
     33   sleep_queue :=
     34     SleepQueue.add (t, !event_counter, res) !sleep_queue;
     35   res
     36 
     37 let yield () = sleep 0.
     38 
     39 let get_time t =
     40   if !t = -1. then t := Unix.gettimeofday ();
     41   !t
     42 
     43 let in_the_past now t =
     44   t = 0. || t <= get_time now
     45 
     46 let rec restart_threads imax now =
     47   match
     48     try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
     49   with
     50     Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
     51       sleep_queue := SleepQueue.remove_min !sleep_queue;
     52       Lwt.wakeup thr ();
     53       restart_threads imax now
     54   | _ ->
     55       ()
     56 
     57 type file_descr = Unix.file_descr
     58 
     59 let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd
     60 
     61 let inputs = ref []
     62 let outputs = ref []
     63 
     64 let bad_fd fd =
     65   try ignore (Unix.LargeFile.fstat fd); false with
     66     Unix.Unix_error (_, _, _) ->
     67       true
     68 
     69 let wrap_syscall queue fd cont syscall =
     70   let res =
     71     try
     72       Some (syscall ())
     73     with
     74       Exit
     75     | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
     76         None
     77     | e ->
     78         queue := List.remove_assoc fd !queue;
     79         Lwt.wakeup_exn cont e;
     80         None
     81   in
     82   match res with
     83     Some v ->
     84       queue := List.remove_assoc fd !queue;
     85       Lwt.wakeup cont v
     86   | None ->
     87       ()
     88 
     89 let rec run thread =
     90   match Lwt.poll thread with
     91     Some v ->
     92       v
     93   | None ->
     94       let next_event =
     95         try
     96           let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
     97         with Not_found ->
     98           None
     99       in
    100       let now = ref (-1.) in
    101       let delay =
    102         match next_event with
    103           None      -> -1.
    104         | Some 0.   -> 0.
    105         | Some time -> max 0. (time -. get_time now)
    106       in
    107       let infds = List.map fst !inputs in
    108       let outfds = List.map fst !outputs in
    109       let (readers, writers, _) =
    110         if infds = [] && outfds = [] && delay = 0. then
    111           ([], [], [])
    112         else
    113           try
    114             let (readers, writers, _) as res =
    115               Unix.select infds outfds [] delay in
    116             if delay > 0. && !now <> -1. && readers = [] && writers = [] then
    117               now := !now +. delay;
    118             res
    119           with
    120             Unix.Unix_error (Unix.EINTR, _, _) ->
    121               ([], [], [])
    122           | Unix.Unix_error (Unix.EBADF, _, _) ->
    123               (List.filter bad_fd infds, List.filter bad_fd outfds, [])
    124       in
    125       restart_threads !event_counter now;
    126       List.iter
    127         (fun fd ->
    128            try
    129              match List.assoc fd !inputs with
    130                `Read (buf, pos, len, res) ->
    131                   wrap_syscall inputs fd res
    132                     (fun () -> Unix.read fd buf pos len)
    133              | `Accept res ->
    134                   wrap_syscall inputs fd res
    135                     (fun () ->
    136                        let (s, _) as v = Unix.accept ~cloexec:true fd in
    137                        if not windows_hack then Unix.set_nonblock s;
    138                        v)
    139              | `Wait res ->
    140                   wrap_syscall inputs fd res (fun () -> ())
    141            with Not_found ->
    142              ())
    143         readers;
    144       List.iter
    145         (fun fd ->
    146            try
    147              match List.assoc fd !outputs with
    148                `Write (buf, pos, len, res) ->
    149                   wrap_syscall outputs fd res
    150                     (fun () -> Unix.write fd buf pos len)
    151              | `WriteSubstring (buf, pos, len, res) ->
    152                   wrap_syscall outputs fd res
    153                     (fun () -> Unix.write_substring fd buf pos len)
    154              | `CheckSocket res ->
    155                   wrap_syscall outputs fd res
    156                     (fun () ->
    157                        try ignore (Unix.getpeername fd) with
    158                          Unix.Unix_error (Unix.ENOTCONN, _, _) ->
    159                            ignore (Unix.read fd (Bytes.create 1) 0 1))
    160              | `Wait res ->
    161                   wrap_syscall inputs fd res (fun () -> ())
    162            with Not_found ->
    163              ())
    164         writers;
    165       run thread
    166 
    167 (****)
    168 
    169 let wait_read ch =
    170   let res = Lwt.wait () in
    171   inputs := (ch, `Wait res) :: !inputs;
    172   res
    173 
    174 let wait_write ch =
    175   let res = Lwt.wait () in
    176   outputs := (ch, `Wait res) :: !outputs;
    177   res
    178 
    179 let read ch buf pos len =
    180   try
    181     if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
    182     Lwt.return (Unix.read ch buf pos len)
    183   with
    184     Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
    185       let res = Lwt.wait () in
    186       inputs := (ch, `Read (buf, pos, len, res)) :: !inputs;
    187       res
    188   | e ->
    189       Lwt.fail e
    190 
    191 let write ch buf pos len =
    192   try
    193     if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
    194     Lwt.return (Unix.write ch buf pos len)
    195   with
    196     Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
    197       let res = Lwt.wait () in
    198       outputs := (ch, `Write (buf, pos, len, res)) :: !outputs;
    199       res
    200   | e ->
    201       Lwt.fail e
    202 
    203 let write_substring ch buf pos len =
    204   try
    205     if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
    206     Lwt.return (Unix.write_substring ch buf pos len)
    207   with
    208     Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
    209       let res = Lwt.wait () in
    210       outputs := (ch, `WriteSubstring (buf, pos, len, res)) :: !outputs;
    211       res
    212   | e ->
    213       Lwt.fail e
    214 
    215 let pipe_in ?cloexec () =
    216   let (in_fd, out_fd) as fd_pair = Unix.pipe ?cloexec () in
    217   if not windows_hack then
    218     Unix.set_nonblock in_fd;
    219   fd_pair
    220 
    221 let pipe_out ?cloexec () =
    222   let (in_fd, out_fd) as fd_pair = Unix.pipe ?cloexec () in
    223   if not windows_hack then
    224     Unix.set_nonblock out_fd;
    225   fd_pair
    226 
    227 let socket ?cloexec dom typ proto =
    228   let s = Unix.socket ?cloexec dom typ proto in
    229   if not windows_hack then Unix.set_nonblock s;
    230   s
    231 
    232 let socketpair dom typ proto =
    233   let (s1, s2) as spair = Unix.socketpair dom typ proto in
    234   if not windows_hack then begin
    235     Unix.set_nonblock s1; Unix.set_nonblock s2
    236   end;
    237   Lwt.return spair
    238 
    239 let bind = Unix.bind
    240 let setsockopt = Unix.setsockopt
    241 let listen = Unix.listen
    242 let close = Unix.close
    243 let set_close_on_exec = Unix.set_close_on_exec
    244 
    245 let accept ch =
    246   let res = Lwt.wait () in
    247   inputs := (ch, `Accept res) :: !inputs;
    248   res
    249 
    250 let check_socket ch =
    251   let res = Lwt.wait () in
    252   outputs := (ch, `CheckSocket res) :: !outputs;
    253   res
    254 
    255 let connect s addr =
    256   try
    257     Unix.connect s addr;
    258     Lwt.return ()
    259   with
    260     Unix.Unix_error
    261       ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
    262         check_socket s
    263   | e ->
    264       Lwt.fail e
    265 
    266 (****)
    267 
    268 type lwt_in_channel = in_channel
    269 type lwt_out_channel = out_channel
    270 
    271 let intern_in_channel ch =
    272   Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
    273 let intern_out_channel ch =
    274   Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
    275 
    276 
    277 let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
    278 let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
    279 
    280 let stdlib_input_char = input_char
    281 let rec input_char ic =
    282   try
    283     Lwt.return (stdlib_input_char ic)
    284   with
    285     Sys_blocked_io ->
    286       Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
    287   | e ->
    288       Lwt.fail e
    289 
    290 let input_line ic =
    291   let buf = ref (Bytes.create 128) in
    292   let pos = ref 0 in
    293   let rec loop () =
    294     if !pos = Bytes.length !buf then begin
    295       let newbuf = Bytes.create (2 * !pos) in
    296       Bytes.blit !buf 0 newbuf 0 !pos;
    297       buf := newbuf
    298     end;
    299     Lwt.bind (input_char ic) (fun c ->
    300     if c = '\n' then
    301       Lwt.return ()
    302     else begin
    303       Bytes.set !buf !pos c;
    304       incr pos;
    305       loop ()
    306     end)
    307   in
    308   Lwt.bind
    309     (Lwt.catch loop
    310        (fun e ->
    311           match e with
    312             End_of_file when !pos <> 0 ->
    313               Lwt.return ()
    314           | _ ->
    315               Lwt.fail e))
    316     (fun () ->
    317        let res = Bytes.create !pos in
    318        Bytes.blit !buf 0 res 0 !pos;
    319        Lwt.return (Bytes.to_string res))