lwt_unix_impl.ml (12784B)
1 (* 2 - should check all events before looping again for avoiding race 3 conditions... 4 (we have the first, scan the subsequent ones) 5 *) 6 7 let no_overlapped_io = false 8 let d = ref false 9 10 (****) 11 12 type buffer = 13 (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t 14 15 let buffer_create l = Bigarray.Array1.create Bigarray.char Bigarray.c_layout l 16 17 external unsafe_blit_string_to_buffer : 18 string -> int -> buffer -> int -> int -> unit = "ml_blit_string_to_buffer" 19 external unsafe_blit_bytes_to_buffer : 20 bytes -> int -> buffer -> int -> int -> unit = "ml_blit_bytes_to_buffer" 21 external unsafe_blit_buffer_to_bytes : 22 buffer -> int -> bytes -> int -> int -> unit = "ml_blit_buffer_to_bytes" 23 24 let buffer_length = Bigarray.Array1.dim 25 26 let blit_string_to_buffer s i a j l = 27 if l < 0 || i < 0 || i > String.length s - l 28 || j < 0 || j > buffer_length a - l 29 then invalid_arg "Lwt_unix.blit_string_to_buffer" 30 else unsafe_blit_string_to_buffer s i a j l 31 32 let blit_bytes_to_buffer s i a j l = 33 if l < 0 || i < 0 || i > Bytes.length s - l 34 || j < 0 || j > buffer_length a - l 35 then invalid_arg "Lwt_unix.blit_bytes_to_buffer" 36 else unsafe_blit_bytes_to_buffer s i a j l 37 38 let blit_buffer_to_bytes a i s j l = 39 if l < 0 || i < 0 || i > buffer_length a - l 40 || j < 0 || j > Bytes.length s - l 41 then invalid_arg "Lwt_unix.blit_buffer_to_bytes" 42 else unsafe_blit_buffer_to_bytes a i s j l 43 44 let buffer_size = 16384 45 46 let avail_buffers = ref [] 47 48 let acquire_buffer () = 49 match !avail_buffers with 50 [] -> buffer_create buffer_size 51 | b :: r -> avail_buffers := r; b 52 53 let release_buffer b = avail_buffers := b :: !avail_buffers 54 55 (****) 56 57 let last_id = ref 0 58 let free_list = ref (Array.init 1 (fun i -> i)) 59 60 let acquire_id () = 61 let len = Array.length !free_list in 62 if !last_id = len then begin 63 let a = Array.init (len * 2) (fun i -> i) in 64 Array.blit !free_list 0 a 0 len; 65 free_list := a 66 end; 67 let i = !free_list.(!last_id) in 68 incr last_id; 69 i 70 71 let release_id i = 72 decr last_id; 73 !free_list.(!last_id) <- i 74 75 (****) 76 77 let completionEvents = ref [] 78 79 let actionCompleted id len errno name = 80 completionEvents := (id, len, errno, name) :: !completionEvents 81 82 external init_lwt : 83 (int -> int -> Unix.error -> string -> unit) -> int = "init_lwt" 84 85 let max_event_count = init_lwt actionCompleted 86 87 let acquire_event l nm = 88 if List.length l = max_event_count then 89 raise (Unix.Unix_error (Unix.EAGAIN, nm, "")) 90 91 (****) 92 93 type helpers 94 type file_descr = { fd : Unix.file_descr; helpers : helpers } 95 96 external of_unix_file_descr : Unix.file_descr -> file_descr = "win_wrap_fd" 97 98 external win_wrap_async : Unix.file_descr -> file_descr = "win_wrap_overlapped" 99 100 let wrap_async = 101 if no_overlapped_io then of_unix_file_descr else win_wrap_async 102 103 (****) 104 105 module SleepQueue = 106 Pqueue.Make (struct 107 type t = float * int * unit Lwt.t 108 let compare (t, i, _) (t', i', _) = 109 let c = compare t t' in 110 if c = 0 then i - i' else c 111 end) 112 let sleep_queue = ref SleepQueue.empty 113 114 let event_counter = ref 0 115 116 let sleep d = 117 let res = Lwt.wait () in 118 incr event_counter; 119 let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in 120 sleep_queue := 121 SleepQueue.add (t, !event_counter, res) !sleep_queue; 122 res 123 124 let yield () = sleep 0. 125 126 let get_time t = 127 if !t = -1. then t := Unix.gettimeofday (); 128 !t 129 130 let in_the_past now t = 131 t = 0. || t <= get_time now 132 133 let rec restart_threads imax now = 134 match 135 try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None 136 with 137 Some (time, i, thr) when in_the_past now time && i - imax <= 0 -> 138 sleep_queue := SleepQueue.remove_min !sleep_queue; 139 if !d then Format.eprintf "RESTART@."; 140 Lwt.wakeup thr (); 141 if !d then Format.eprintf "RESTART...DONE@."; 142 restart_threads imax now 143 | _ -> 144 () 145 146 module IntTbl = 147 Hashtbl.Make 148 (struct type t = int let equal (x : int) y = x = y let hash x = x end) 149 150 let ioInFlight = IntTbl.create 17 151 152 let handleCompletionEvent (id, len, errno, name) = 153 if !d then Format.eprintf "Handling event %d (len %d)@." id len; 154 let (action, buf, res) = 155 try IntTbl.find ioInFlight id with Not_found -> assert false 156 in 157 begin match action with 158 `Write -> () 159 | `Read (s, pos) -> if len > 0 then blit_buffer_to_bytes buf 0 s pos len 160 | `Readdirectorychanges -> () 161 end; 162 IntTbl.remove ioInFlight id; 163 release_id id; 164 release_buffer buf; 165 if len = -1 then 166 Lwt.wakeup_exn res (Unix.Unix_error (errno, name, "")) 167 else 168 Lwt.wakeup res len 169 170 type handle 171 172 let connInFlight = ref [] 173 174 type kind = CONNECT | ACCEPT 175 176 external win_wait : int -> handle list -> int = "win_wait" 177 178 external win_register_wait : 179 Unix.file_descr -> kind -> handle = "win_register_wait" 180 181 external win_check_connection : 182 Unix.file_descr -> kind -> handle -> unit = "win_check_connection" 183 184 let handle_wait_event h ch kind cont action = 185 if !d then prerr_endline "MMM"; 186 let res = 187 try 188 Some (action ()) 189 with 190 Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> 191 if !d then prerr_endline "NNN"; 192 let h' = win_register_wait ch.fd kind in 193 connInFlight := List.map (fun el -> if fst el <> h then el else (h', snd el)) !connInFlight; 194 None 195 | e -> 196 if !d then prerr_endline "OOO"; 197 connInFlight := List.filter (fun (h', _) -> h' <> h) !connInFlight; 198 Lwt.wakeup_exn cont e; 199 None 200 in 201 match res with 202 Some v -> 203 if !d then prerr_endline "PPP"; 204 connInFlight := List.filter (fun (h', _) -> h' <> h) !connInFlight; 205 Lwt.wakeup cont v 206 | None -> 207 () 208 209 let rec run thread = 210 if !d then Format.eprintf "Main loop@."; 211 match Lwt.poll thread with 212 Some v -> 213 if !d then Format.eprintf "DONE!@."; 214 v 215 | None -> 216 let next_event = 217 try 218 let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time 219 with Not_found -> 220 None 221 in 222 let now = ref (-1.) in 223 let delay = 224 match next_event with 225 None -> -1. 226 | Some 0. -> 0. 227 | Some time -> max 0. (time -. get_time now) 228 in 229 if !d then Format.eprintf "vvv@."; 230 let i = 231 try 232 win_wait (truncate (ceil (delay *. 1000.))) (List.map fst !connInFlight) 233 with 234 Sys.Break as e -> raise e 235 | _ -> assert false 236 in 237 if !d then Format.eprintf "^^^@."; 238 if i = -1 then now := !now +. delay; 239 restart_threads !event_counter now; 240 if !d then Format.eprintf "threads restarted@."; 241 let ev = !completionEvents in 242 completionEvents := []; 243 List.iter handleCompletionEvent (List.rev ev); 244 if i >= 0 then begin 245 let (h, (kind, ch)) = 246 try List.nth !connInFlight i with Failure _ -> assert false in 247 match kind with 248 `CheckSocket res -> 249 if !d then prerr_endline "CHECK CONN"; 250 handle_wait_event h ch CONNECT res 251 (fun () -> win_check_connection ch.fd CONNECT h) 252 | `Accept res -> 253 if !d then prerr_endline "ACCEPT"; 254 handle_wait_event h ch ACCEPT res 255 (fun () -> 256 win_check_connection ch.fd ACCEPT h; 257 let (v, info) = Unix.accept ~cloexec:true ch.fd in 258 (wrap_async v, info)) 259 end; 260 run thread 261 262 (****) 263 264 let wait_read ch = assert false 265 266 let wait_write ch = assert false 267 268 external start_read : 269 file_descr -> buffer -> int -> int -> int -> unit = "win_read" 270 external start_write : 271 file_descr -> buffer -> int -> int -> int -> unit = "win_write" 272 273 let read ch s pos len = 274 if !d then Format.eprintf "Start reading@."; 275 let id = acquire_id () in 276 let buf = acquire_buffer () in 277 let len = if len > buffer_size then buffer_size else len in 278 let res = Lwt.wait () in 279 IntTbl.add ioInFlight id (`Read (s, pos), buf, res); 280 start_read ch buf 0 len id; 281 if !d then Format.eprintf "Reading started@."; 282 res 283 284 let write ch s pos len = 285 if !d then Format.eprintf "Start writing@."; 286 let id = acquire_id () in 287 let buf = acquire_buffer () in 288 let len = if len > buffer_size then buffer_size else len in 289 blit_bytes_to_buffer s pos buf 0 len; 290 let res = Lwt.wait () in 291 IntTbl.add ioInFlight id (`Write, buf, res); 292 start_write ch buf 0 len id; 293 if !d then Format.eprintf "Writing started@."; 294 res 295 296 let write_substring ch s pos len = 297 if !d then Format.eprintf "Start writing@."; 298 let id = acquire_id () in 299 let buf = acquire_buffer () in 300 let len = if len > buffer_size then buffer_size else len in 301 blit_string_to_buffer s pos buf 0 len; 302 let res = Lwt.wait () in 303 IntTbl.add ioInFlight id (`Write, buf, res); 304 start_write ch buf 0 len id; 305 if !d then Format.eprintf "Writing started@."; 306 res 307 308 external win_pipe_in : 309 ?cloexec:bool -> unit -> Unix.file_descr * Unix.file_descr = "win_pipe_in" 310 external win_pipe_out : 311 ?cloexec:bool -> unit -> Unix.file_descr * Unix.file_descr = "win_pipe_out" 312 313 let pipe_in ?cloexec () = 314 let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_in ?cloexec () in 315 (wrap_async i, o) 316 let pipe_out ?cloexec () = 317 let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_out ?cloexec () in 318 (i, wrap_async o) 319 320 external win_socket : ?cloexec:bool -> 321 Unix.socket_domain -> Unix.socket_type -> int -> Unix.file_descr = 322 "win_socket" 323 324 let socket ?cloexec d t p = 325 let s = if no_overlapped_io then Unix.socket ?cloexec d t p 326 else win_socket ?cloexec d t p in 327 Unix.set_nonblock s; 328 wrap_async s 329 330 let bind ch addr = Unix.bind ch.fd addr 331 let setsockopt ch opt v = Unix.setsockopt ch.fd opt v 332 let listen ch n = Unix.listen ch.fd n 333 let set_close_on_exec ch = Unix.set_close_on_exec ch.fd 334 335 external kill_threads : file_descr -> unit = "win_kill_threads" 336 337 let close ch = Unix.close ch.fd; kill_threads ch 338 339 let accept ch = 340 let res = Lwt.wait () in 341 let () = acquire_event !connInFlight "accept" in 342 let h = win_register_wait ch.fd ACCEPT in 343 connInFlight := (h, (`Accept res, ch)) :: !connInFlight; 344 res 345 346 let check_socket ch = 347 let res = Lwt.wait () in 348 let () = acquire_event !connInFlight "connect" in 349 let h = win_register_wait ch.fd CONNECT in 350 connInFlight := (h, (`CheckSocket res, ch)) :: !connInFlight; 351 res 352 353 let connect s addr = 354 try 355 Unix.connect s.fd addr; 356 if !d then prerr_endline "AAA"; 357 Lwt.return () 358 with 359 Unix.Unix_error 360 ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) -> 361 if !d then prerr_endline "BBB"; 362 check_socket s 363 | e -> 364 if !d then prerr_endline "CCC"; 365 Lwt.fail e 366 367 368 type lwt_in_channel 369 let input_line _ = assert false (*XXXXX*) 370 let intern_in_channel _ = assert false (*XXXXX*) 371 372 (***) 373 374 type directory_handle = Unix.file_descr 375 376 external open_dir : string -> directory_handle = "win_open_directory" 377 let open_directory f = open_dir (System_win.extendedPath f) 378 379 type notify_filter_flag = 380 FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME 381 | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SIZE 382 | FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_LAST_ACCESS 383 | FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_SECURITY 384 385 external start_read_dir_changes : 386 directory_handle -> buffer -> bool -> notify_filter_flag list -> int -> unit = 387 "win_readdirtorychanges" 388 389 type file_action = 390 FILE_ACTION_ADDED | FILE_ACTION_REMOVED 391 | FILE_ACTION_MODIFIED | FILE_ACTION_RENAMED_OLD_NAME 392 | FILE_ACTION_RENAMED_NEW_NAME 393 394 external parse_directory_changes : buffer -> (string * file_action) list 395 = "win_parse_directory_changes" 396 397 let readdirectorychanges ch recursive flags = 398 if !d then Format.eprintf "Start reading directory changes@."; 399 let id = acquire_id () in 400 let buf = acquire_buffer () in 401 let res = Lwt.wait () in 402 IntTbl.add ioInFlight id (`Readdirectorychanges, buf, res); 403 start_read_dir_changes ch buf recursive flags id; 404 if !d then Format.eprintf "Reading started@."; 405 Lwt.bind res (fun len -> 406 if len = 0 then 407 Lwt.return [] 408 else 409 Lwt.return (List.rev (parse_directory_changes buf))) 410 411 let close_dir = Unix.close 412 413 external long_name : string -> string = "win_long_path_name" 414 415 let longpathname root path = 416 (* Parameter [path] can be relative. Result value must then also be relative. 417 Input parameter to [long_name] must always be absolute path. *) 418 let epath = System_win.extendedPath (Filename.concat root path) 419 and root = System_win.extendedPath (Filename.concat root "") in 420 let start = String.length root 421 and ln = long_name epath in 422 try 423 (* The assumption is that [root] does not change in [long_name]. The 424 Windows fsmonitor operates under this assumption, so it is ok here. 425 To remove this assumption, we'd have to pass [root] separately through 426 [long_name]. *) 427 String.sub ln start (String.length ln - start) 428 with 429 | Invalid_argument _ -> ln