unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

lwt_unix_impl.ml (12784B)


      1 (*
      2 - should check all events before looping again for avoiding race
      3   conditions...
      4   (we have the first, scan the subsequent ones)
      5 *)
      6 
      7 let no_overlapped_io = false
      8 let d = ref false
      9 
     10 (****)
     11 
     12 type buffer =
     13   (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
     14 
     15 let buffer_create l = Bigarray.Array1.create Bigarray.char Bigarray.c_layout l
     16 
     17 external unsafe_blit_string_to_buffer :
     18   string -> int -> buffer -> int -> int -> unit = "ml_blit_string_to_buffer"
     19 external unsafe_blit_bytes_to_buffer :
     20   bytes -> int -> buffer -> int -> int -> unit = "ml_blit_bytes_to_buffer"
     21 external unsafe_blit_buffer_to_bytes :
     22   buffer -> int -> bytes -> int -> int -> unit = "ml_blit_buffer_to_bytes"
     23 
     24 let buffer_length = Bigarray.Array1.dim
     25 
     26 let blit_string_to_buffer s i a j l =
     27   if l < 0 || i < 0 || i > String.length s - l
     28            || j < 0 || j > buffer_length a - l
     29   then invalid_arg "Lwt_unix.blit_string_to_buffer"
     30   else unsafe_blit_string_to_buffer s i a j l
     31 
     32 let blit_bytes_to_buffer s i a j l =
     33   if l < 0 || i < 0 || i > Bytes.length s - l
     34            || j < 0 || j > buffer_length a - l
     35   then invalid_arg "Lwt_unix.blit_bytes_to_buffer"
     36   else unsafe_blit_bytes_to_buffer s i a j l
     37 
     38 let blit_buffer_to_bytes a i s j l =
     39   if l < 0 || i < 0 || i > buffer_length a - l
     40            || j < 0 || j > Bytes.length s - l
     41   then invalid_arg "Lwt_unix.blit_buffer_to_bytes"
     42   else unsafe_blit_buffer_to_bytes a i s j l
     43 
     44 let buffer_size = 16384
     45 
     46 let avail_buffers = ref []
     47 
     48 let acquire_buffer () =
     49   match !avail_buffers with
     50     []     -> buffer_create buffer_size
     51   | b :: r -> avail_buffers := r; b
     52 
     53 let release_buffer b = avail_buffers := b :: !avail_buffers
     54 
     55 (****)
     56 
     57 let last_id = ref 0
     58 let free_list = ref (Array.init 1 (fun i -> i))
     59 
     60 let acquire_id () =
     61   let len = Array.length !free_list in
     62   if !last_id = len then begin
     63     let a = Array.init (len * 2) (fun i -> i) in
     64     Array.blit !free_list 0 a 0 len;
     65     free_list := a
     66   end;
     67   let i = !free_list.(!last_id) in
     68   incr last_id;
     69   i
     70 
     71 let release_id i =
     72   decr last_id;
     73   !free_list.(!last_id) <- i
     74 
     75 (****)
     76 
     77 let completionEvents = ref []
     78 
     79 let actionCompleted id len errno name =
     80   completionEvents := (id, len, errno, name) :: !completionEvents
     81 
     82 external init_lwt :
     83   (int -> int -> Unix.error -> string -> unit) -> int = "init_lwt"
     84 
     85 let max_event_count = init_lwt actionCompleted
     86 
     87 let acquire_event l nm =
     88   if List.length l = max_event_count then
     89     raise (Unix.Unix_error (Unix.EAGAIN, nm, ""))
     90 
     91 (****)
     92 
     93 type helpers
     94 type file_descr = { fd : Unix.file_descr; helpers : helpers }
     95 
     96 external of_unix_file_descr : Unix.file_descr -> file_descr = "win_wrap_fd"
     97 
     98 external win_wrap_async : Unix.file_descr -> file_descr = "win_wrap_overlapped"
     99 
    100 let wrap_async =
    101   if no_overlapped_io then of_unix_file_descr else win_wrap_async
    102 
    103 (****)
    104 
    105 module SleepQueue =
    106   Pqueue.Make (struct
    107     type t = float * int * unit Lwt.t
    108     let compare (t, i, _) (t', i', _) =
    109       let c = compare t t' in
    110       if c = 0 then i - i' else c
    111   end)
    112 let sleep_queue = ref SleepQueue.empty
    113 
    114 let event_counter = ref 0
    115 
    116 let sleep d =
    117   let res = Lwt.wait () in
    118   incr event_counter;
    119   let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
    120   sleep_queue :=
    121     SleepQueue.add (t, !event_counter, res) !sleep_queue;
    122   res
    123 
    124 let yield () = sleep 0.
    125 
    126 let get_time t =
    127   if !t = -1. then t := Unix.gettimeofday ();
    128   !t
    129 
    130 let in_the_past now t =
    131   t = 0. || t <= get_time now
    132 
    133 let rec restart_threads imax now =
    134   match
    135     try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
    136   with
    137     Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
    138       sleep_queue := SleepQueue.remove_min !sleep_queue;
    139 if !d then Format.eprintf "RESTART@.";
    140       Lwt.wakeup thr ();
    141 if !d then Format.eprintf "RESTART...DONE@.";
    142       restart_threads imax now
    143   | _ ->
    144       ()
    145 
    146 module IntTbl =
    147   Hashtbl.Make
    148     (struct type t = int let equal (x : int) y = x = y let hash x = x end)
    149 
    150 let ioInFlight = IntTbl.create 17
    151 
    152 let handleCompletionEvent (id, len, errno, name) =
    153 if !d then Format.eprintf "Handling event %d (len %d)@." id len;
    154   let (action, buf, res) =
    155     try IntTbl.find ioInFlight id with Not_found -> assert false
    156   in
    157   begin match action with
    158     `Write         -> ()
    159   | `Read (s, pos) -> if len > 0 then blit_buffer_to_bytes buf 0 s pos len
    160   | `Readdirectorychanges -> ()
    161   end;
    162   IntTbl.remove ioInFlight id;
    163   release_id id;
    164   release_buffer buf;
    165   if len = -1 then
    166     Lwt.wakeup_exn res (Unix.Unix_error (errno, name, ""))
    167   else
    168     Lwt.wakeup res len
    169 
    170 type handle
    171 
    172 let connInFlight = ref []
    173 
    174 type kind = CONNECT | ACCEPT
    175 
    176 external win_wait : int -> handle list -> int = "win_wait"
    177 
    178 external win_register_wait :
    179   Unix.file_descr -> kind -> handle = "win_register_wait"
    180 
    181 external win_check_connection :
    182   Unix.file_descr -> kind -> handle -> unit = "win_check_connection"
    183 
    184 let handle_wait_event h ch kind cont action =
    185 if !d then prerr_endline "MMM";
    186   let res =
    187     try
    188       Some (action ())
    189     with
    190       Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
    191 if !d then prerr_endline "NNN";
    192         let h' = win_register_wait ch.fd kind in
    193         connInFlight := List.map (fun el -> if fst el <> h then el else (h', snd el)) !connInFlight;
    194         None
    195     | e ->
    196 if !d then prerr_endline "OOO";
    197         connInFlight := List.filter (fun (h', _) -> h' <> h) !connInFlight;
    198         Lwt.wakeup_exn cont e;
    199         None
    200   in
    201   match res with
    202     Some v ->
    203 if !d then prerr_endline "PPP";
    204       connInFlight := List.filter (fun (h', _) -> h' <> h) !connInFlight;
    205       Lwt.wakeup cont v
    206   | None ->
    207       ()
    208 
    209 let rec run thread =
    210 if !d then Format.eprintf "Main loop@.";
    211   match Lwt.poll thread with
    212     Some v ->
    213 if !d then Format.eprintf "DONE!@.";
    214       v
    215   | None ->
    216       let next_event =
    217         try
    218           let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
    219         with Not_found ->
    220           None
    221       in
    222       let now = ref (-1.) in
    223       let delay =
    224         match next_event with
    225           None      -> -1.
    226         | Some 0.   -> 0.
    227         | Some time -> max 0. (time -. get_time now)
    228       in
    229 if !d then Format.eprintf "vvv@.";
    230       let i =
    231         try
    232           win_wait (truncate (ceil (delay *. 1000.))) (List.map fst !connInFlight)
    233         with
    234           Sys.Break as e -> raise e
    235         | _              -> assert false
    236       in
    237 if !d then Format.eprintf "^^^@.";
    238       if i = -1 then now := !now +. delay;
    239       restart_threads !event_counter now;
    240 if !d then Format.eprintf "threads restarted@.";
    241       let ev = !completionEvents in
    242       completionEvents := [];
    243       List.iter handleCompletionEvent (List.rev ev);
    244       if i >= 0 then begin
    245         let (h, (kind, ch)) =
    246           try List.nth !connInFlight i with Failure _ -> assert false in
    247         match kind with
    248           `CheckSocket res  ->
    249 if !d then prerr_endline "CHECK CONN";
    250             handle_wait_event h ch CONNECT res
    251                (fun () -> win_check_connection ch.fd CONNECT h)
    252         | `Accept res ->
    253 if !d then prerr_endline "ACCEPT";
    254             handle_wait_event h ch ACCEPT res
    255               (fun () ->
    256                  win_check_connection ch.fd ACCEPT h;
    257                  let (v, info) = Unix.accept ~cloexec:true ch.fd in
    258                  (wrap_async v, info))
    259       end;
    260       run thread
    261 
    262 (****)
    263 
    264 let wait_read ch = assert false
    265 
    266 let wait_write ch = assert false
    267 
    268 external start_read :
    269   file_descr -> buffer -> int -> int -> int -> unit = "win_read"
    270 external start_write :
    271   file_descr -> buffer -> int -> int -> int -> unit = "win_write"
    272 
    273 let read ch s pos len =
    274 if !d then Format.eprintf "Start reading@.";
    275   let id = acquire_id () in
    276   let buf = acquire_buffer () in
    277   let len = if len > buffer_size then buffer_size else len in
    278   let res = Lwt.wait () in
    279   IntTbl.add ioInFlight id (`Read (s, pos), buf, res);
    280   start_read ch buf 0 len id;
    281 if !d then Format.eprintf "Reading started@.";
    282   res
    283 
    284 let write ch s pos len =
    285 if !d then Format.eprintf "Start writing@.";
    286   let id = acquire_id () in
    287   let buf = acquire_buffer () in
    288   let len = if len > buffer_size then buffer_size else len in
    289   blit_bytes_to_buffer s pos buf 0 len;
    290   let res = Lwt.wait () in
    291   IntTbl.add ioInFlight id (`Write, buf, res);
    292   start_write ch buf 0 len id;
    293 if !d then Format.eprintf "Writing started@.";
    294   res
    295 
    296 let write_substring ch s pos len =
    297 if !d then Format.eprintf "Start writing@.";
    298   let id = acquire_id () in
    299   let buf = acquire_buffer () in
    300   let len = if len > buffer_size then buffer_size else len in
    301   blit_string_to_buffer s pos buf 0 len;
    302   let res = Lwt.wait () in
    303   IntTbl.add ioInFlight id (`Write, buf, res);
    304   start_write ch buf 0 len id;
    305 if !d then Format.eprintf "Writing started@.";
    306   res
    307 
    308 external win_pipe_in :
    309   ?cloexec:bool -> unit -> Unix.file_descr * Unix.file_descr = "win_pipe_in"
    310 external win_pipe_out :
    311   ?cloexec:bool -> unit -> Unix.file_descr * Unix.file_descr = "win_pipe_out"
    312 
    313 let pipe_in ?cloexec () =
    314   let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_in ?cloexec () in
    315   (wrap_async i, o)
    316 let pipe_out ?cloexec () =
    317   let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_out ?cloexec () in
    318   (i, wrap_async o)
    319 
    320 external win_socket : ?cloexec:bool ->
    321   Unix.socket_domain -> Unix.socket_type -> int -> Unix.file_descr =
    322   "win_socket"
    323 
    324 let socket ?cloexec d t p =
    325   let s = if no_overlapped_io then Unix.socket ?cloexec d t p
    326           else win_socket ?cloexec d t p in
    327   Unix.set_nonblock s;
    328   wrap_async s
    329 
    330 let bind ch addr = Unix.bind ch.fd addr
    331 let setsockopt ch opt v = Unix.setsockopt ch.fd opt v
    332 let listen ch n = Unix.listen ch.fd n
    333 let set_close_on_exec ch = Unix.set_close_on_exec ch.fd
    334 
    335 external kill_threads : file_descr -> unit = "win_kill_threads"
    336 
    337 let close ch = Unix.close ch.fd; kill_threads ch
    338 
    339 let accept ch =
    340   let res = Lwt.wait () in
    341   let () = acquire_event !connInFlight "accept" in
    342   let h = win_register_wait ch.fd ACCEPT in
    343   connInFlight := (h, (`Accept res, ch)) :: !connInFlight;
    344   res
    345 
    346 let check_socket ch =
    347   let res = Lwt.wait () in
    348   let () = acquire_event !connInFlight "connect" in
    349   let h = win_register_wait ch.fd CONNECT in
    350   connInFlight := (h, (`CheckSocket res, ch)) :: !connInFlight;
    351   res
    352 
    353 let connect s addr =
    354   try
    355     Unix.connect s.fd addr;
    356 if !d then prerr_endline "AAA";
    357     Lwt.return ()
    358   with
    359     Unix.Unix_error
    360       ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
    361 if !d then prerr_endline "BBB";
    362         check_socket s
    363   | e ->
    364 if !d then prerr_endline "CCC";
    365       Lwt.fail e
    366 
    367 
    368 type lwt_in_channel
    369 let input_line _ = assert false (*XXXXX*)
    370 let intern_in_channel _ = assert false (*XXXXX*)
    371 
    372 (***)
    373 
    374 type directory_handle = Unix.file_descr
    375 
    376 external open_dir : string -> directory_handle = "win_open_directory"
    377 let open_directory f = open_dir (System_win.extendedPath f)
    378 
    379 type notify_filter_flag =
    380     FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME
    381   | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SIZE
    382   | FILE_NOTIFY_CHANGE_LAST_WRITE | FILE_NOTIFY_CHANGE_LAST_ACCESS
    383   | FILE_NOTIFY_CHANGE_CREATION | FILE_NOTIFY_CHANGE_SECURITY
    384 
    385 external start_read_dir_changes :
    386   directory_handle -> buffer -> bool -> notify_filter_flag list -> int -> unit =
    387   "win_readdirtorychanges"
    388 
    389 type file_action =
    390     FILE_ACTION_ADDED | FILE_ACTION_REMOVED
    391   | FILE_ACTION_MODIFIED | FILE_ACTION_RENAMED_OLD_NAME
    392   | FILE_ACTION_RENAMED_NEW_NAME
    393 
    394 external parse_directory_changes : buffer -> (string * file_action) list
    395   = "win_parse_directory_changes"
    396 
    397 let readdirectorychanges ch recursive flags =
    398 if !d then Format.eprintf "Start reading directory changes@.";
    399   let id = acquire_id () in
    400   let buf = acquire_buffer () in
    401   let res = Lwt.wait () in
    402   IntTbl.add ioInFlight id (`Readdirectorychanges, buf, res);
    403   start_read_dir_changes ch buf recursive flags id;
    404 if !d then Format.eprintf "Reading started@.";
    405   Lwt.bind res (fun len ->
    406   if len = 0 then
    407     Lwt.return []
    408   else
    409     Lwt.return (List.rev (parse_directory_changes buf)))
    410 
    411 let close_dir = Unix.close
    412 
    413 external long_name : string -> string = "win_long_path_name"
    414 
    415 let longpathname root path =
    416   (* Parameter [path] can be relative. Result value must then also be relative.
    417      Input parameter to [long_name] must always be absolute path. *)
    418   let epath = System_win.extendedPath (Filename.concat root path)
    419   and root = System_win.extendedPath (Filename.concat root "") in
    420   let start = String.length root
    421   and ln = long_name epath in
    422   try
    423     (* The assumption is that [root] does not change in [long_name]. The
    424        Windows fsmonitor operates under this assumption, so it is ok here.
    425        To remove this assumption, we'd have to pass [root] separately through
    426        [long_name]. *)
    427     String.sub ln start (String.length ln - start)
    428   with
    429   | Invalid_argument _ -> ln