lwt_unix_impl.ml (8885B)
1 (* 2 Non-blocking I/O and select does not (fully) work under Windows. 3 The library therefore does not use them under Windows, and will 4 therefore have the following limitations: 5 - No read will be performed while there are some threads ready to run 6 or waiting to write; 7 - When a read is pending, everything else will be blocked: [sleep] 8 will not terminate and other reads will not be performed before 9 this read terminates; 10 - A write on a socket or a pipe can block the execution of the program 11 if the data are never consumed at the other end of the connection. 12 In particular, if both ends use this library and write at the same 13 time, this could result in a dead-lock. 14 - [connect] is blocking 15 *) 16 let windows_hack = Sys.win32 17 18 module SleepQueue = 19 Pqueue.Make (struct 20 type t = float * int * unit Lwt.t 21 let compare (t, i, _) (t', i', _) = 22 let c = compare t t' in 23 if c = 0 then i - i' else c 24 end) 25 let sleep_queue = ref SleepQueue.empty 26 27 let event_counter = ref 0 28 29 let sleep d = 30 let res = Lwt.wait () in 31 incr event_counter; 32 let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in 33 sleep_queue := 34 SleepQueue.add (t, !event_counter, res) !sleep_queue; 35 res 36 37 let yield () = sleep 0. 38 39 let get_time t = 40 if !t = -1. then t := Unix.gettimeofday (); 41 !t 42 43 let in_the_past now t = 44 t = 0. || t <= get_time now 45 46 let rec restart_threads imax now = 47 match 48 try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None 49 with 50 Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> 51 sleep_queue := SleepQueue.remove_min !sleep_queue; 52 Lwt.wakeup thr (); 53 restart_threads imax now 54 | _ -> 55 () 56 57 type file_descr = Unix.file_descr 58 59 let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd 60 61 let inputs = ref [] 62 let outputs = ref [] 63 64 let bad_fd fd = 65 try ignore (Unix.LargeFile.fstat fd); false with 66 Unix.Unix_error (_, _, _) -> 67 true 68 69 let wrap_syscall queue fd cont syscall = 70 let res = 71 try 72 Some (syscall ()) 73 with 74 Exit 75 | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> 76 None 77 | e -> 78 queue := List.remove_assoc fd !queue; 79 Lwt.wakeup_exn cont e; 80 None 81 in 82 match res with 83 Some v -> 84 queue := List.remove_assoc fd !queue; 85 Lwt.wakeup cont v 86 | None -> 87 () 88 89 let rec run thread = 90 match Lwt.poll thread with 91 Some v -> 92 v 93 | None -> 94 let next_event = 95 try 96 let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time 97 with Not_found -> 98 None 99 in 100 let now = ref (-1.) in 101 let delay = 102 match next_event with 103 None -> -1. 104 | Some 0. -> 0. 105 | Some time -> max 0. (time -. get_time now) 106 in 107 let infds = List.map fst !inputs in 108 let outfds = List.map fst !outputs in 109 let (readers, writers, _) = 110 if infds = [] && outfds = [] && delay = 0. then 111 ([], [], []) 112 else 113 try 114 let (readers, writers, _) as res = 115 Unix.select infds outfds [] delay in 116 if delay > 0. && !now <> -1. && readers = [] && writers = [] then 117 now := !now +. delay; 118 res 119 with 120 Unix.Unix_error (Unix.EINTR, _, _) -> 121 ([], [], []) 122 | Unix.Unix_error (Unix.EBADF, _, _) -> 123 (List.filter bad_fd infds, List.filter bad_fd outfds, []) 124 in 125 restart_threads !event_counter now; 126 List.iter 127 (fun fd -> 128 try 129 match List.assoc fd !inputs with 130 `Read (buf, pos, len, res) -> 131 wrap_syscall inputs fd res 132 (fun () -> Unix.read fd buf pos len) 133 | `Accept res -> 134 wrap_syscall inputs fd res 135 (fun () -> 136 let (s, _) as v = Unix.accept ~cloexec:true fd in 137 if not windows_hack then Unix.set_nonblock s; 138 v) 139 | `Wait res -> 140 wrap_syscall inputs fd res (fun () -> ()) 141 with Not_found -> 142 ()) 143 readers; 144 List.iter 145 (fun fd -> 146 try 147 match List.assoc fd !outputs with 148 `Write (buf, pos, len, res) -> 149 wrap_syscall outputs fd res 150 (fun () -> Unix.write fd buf pos len) 151 | `WriteSubstring (buf, pos, len, res) -> 152 wrap_syscall outputs fd res 153 (fun () -> Unix.write_substring fd buf pos len) 154 | `CheckSocket res -> 155 wrap_syscall outputs fd res 156 (fun () -> 157 try ignore (Unix.getpeername fd) with 158 Unix.Unix_error (Unix.ENOTCONN, _, _) -> 159 ignore (Unix.read fd (Bytes.create 1) 0 1)) 160 | `Wait res -> 161 wrap_syscall inputs fd res (fun () -> ()) 162 with Not_found -> 163 ()) 164 writers; 165 run thread 166 167 (****) 168 169 let wait_read ch = 170 let res = Lwt.wait () in 171 inputs := (ch, `Wait res) :: !inputs; 172 res 173 174 let wait_write ch = 175 let res = Lwt.wait () in 176 outputs := (ch, `Wait res) :: !outputs; 177 res 178 179 let read ch buf pos len = 180 try 181 if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); 182 Lwt.return (Unix.read ch buf pos len) 183 with 184 Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> 185 let res = Lwt.wait () in 186 inputs := (ch, `Read (buf, pos, len, res)) :: !inputs; 187 res 188 | e -> 189 Lwt.fail e 190 191 let write ch buf pos len = 192 try 193 if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); 194 Lwt.return (Unix.write ch buf pos len) 195 with 196 Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> 197 let res = Lwt.wait () in 198 outputs := (ch, `Write (buf, pos, len, res)) :: !outputs; 199 res 200 | e -> 201 Lwt.fail e 202 203 let write_substring ch buf pos len = 204 try 205 if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", "")); 206 Lwt.return (Unix.write_substring ch buf pos len) 207 with 208 Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> 209 let res = Lwt.wait () in 210 outputs := (ch, `WriteSubstring (buf, pos, len, res)) :: !outputs; 211 res 212 | e -> 213 Lwt.fail e 214 215 let pipe_in ?cloexec () = 216 let (in_fd, out_fd) as fd_pair = Unix.pipe ?cloexec () in 217 if not windows_hack then 218 Unix.set_nonblock in_fd; 219 fd_pair 220 221 let pipe_out ?cloexec () = 222 let (in_fd, out_fd) as fd_pair = Unix.pipe ?cloexec () in 223 if not windows_hack then 224 Unix.set_nonblock out_fd; 225 fd_pair 226 227 let socket ?cloexec dom typ proto = 228 let s = Unix.socket ?cloexec dom typ proto in 229 if not windows_hack then Unix.set_nonblock s; 230 s 231 232 let socketpair dom typ proto = 233 let (s1, s2) as spair = Unix.socketpair dom typ proto in 234 if not windows_hack then begin 235 Unix.set_nonblock s1; Unix.set_nonblock s2 236 end; 237 Lwt.return spair 238 239 let bind = Unix.bind 240 let setsockopt = Unix.setsockopt 241 let listen = Unix.listen 242 let close = Unix.close 243 let set_close_on_exec = Unix.set_close_on_exec 244 245 let accept ch = 246 let res = Lwt.wait () in 247 inputs := (ch, `Accept res) :: !inputs; 248 res 249 250 let check_socket ch = 251 let res = Lwt.wait () in 252 outputs := (ch, `CheckSocket res) :: !outputs; 253 res 254 255 let connect s addr = 256 try 257 Unix.connect s addr; 258 Lwt.return () 259 with 260 Unix.Unix_error 261 ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> 262 check_socket s 263 | e -> 264 Lwt.fail e 265 266 (****) 267 268 type lwt_in_channel = in_channel 269 type lwt_out_channel = out_channel 270 271 let intern_in_channel ch = 272 Unix.set_nonblock (Unix.descr_of_in_channel ch); ch 273 let intern_out_channel ch = 274 Unix.set_nonblock (Unix.descr_of_out_channel ch); ch 275 276 277 let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic) 278 let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc) 279 280 let stdlib_input_char = input_char 281 let rec input_char ic = 282 try 283 Lwt.return (stdlib_input_char ic) 284 with 285 Sys_blocked_io -> 286 Lwt.bind (wait_inchan ic) (fun () -> input_char ic) 287 | e -> 288 Lwt.fail e 289 290 let input_line ic = 291 let buf = ref (Bytes.create 128) in 292 let pos = ref 0 in 293 let rec loop () = 294 if !pos = Bytes.length !buf then begin 295 let newbuf = Bytes.create (2 * !pos) in 296 Bytes.blit !buf 0 newbuf 0 !pos; 297 buf := newbuf 298 end; 299 Lwt.bind (input_char ic) (fun c -> 300 if c = '\n' then 301 Lwt.return () 302 else begin 303 Bytes.set !buf !pos c; 304 incr pos; 305 loop () 306 end) 307 in 308 Lwt.bind 309 (Lwt.catch loop 310 (fun e -> 311 match e with 312 End_of_file when !pos <> 0 -> 313 Lwt.return () 314 | _ -> 315 Lwt.fail e)) 316 (fun () -> 317 let res = Bytes.create !pos in 318 Bytes.blit !buf 0 res 0 !pos; 319 Lwt.return (Bytes.to_string res))