unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

lwt_util.ml (2085B)


      1 
      2 open Lwt
      3 
      4 let rec iter f l =
      5   let l = List.fold_left (fun acc a -> f a :: acc) [] l in
      6   let l = List.rev l in
      7   List.fold_left (fun rt t -> t >>= fun () -> rt) (Lwt.return ()) l
      8 
      9 let rec map f l =
     10   match l with
     11     [] ->
     12       return []
     13   | v :: r ->
     14       let t = f v in
     15       let rt = map f r in
     16       t >>= (fun v' ->
     17       rt >>= (fun l' ->
     18       return (v' :: l')))
     19 
     20 let map_with_waiting_action f wa l =
     21   let rec loop l =
     22     match l with
     23       [] ->
     24         return []
     25     | v :: r ->
     26         let t = f v in
     27         let rt = loop r in
     28         t >>= (fun v' ->
     29           (* Perform the specified "waiting action" for the next    *)
     30           (* item in the list.                                      *)
     31           if r <> [] then
     32             wa (List.hd r)
     33           else
     34             ();
     35           rt >>= (fun l' ->
     36             return (v' :: l')))
     37   in
     38   if l <> [] then
     39     wa (List.hd l)
     40   else
     41     ();
     42   loop l
     43 
     44 let rec map_serial f l =
     45   match l with
     46     [] ->
     47       return []
     48   | v :: r ->
     49       f v >>= (fun v' ->
     50       map_serial f r >>= (fun l' ->
     51       return (v' :: l')))
     52 
     53 let join l = iter (fun x -> x) l
     54 
     55 type region =
     56   { mutable size : int;
     57     mutable count : int;
     58     waiters : (unit Lwt.t * int) Queue.t }
     59 
     60 let make_region count = { size = count; count = 0; waiters = Queue.create () }
     61 
     62 let resize_region reg sz = reg.size <- sz
     63 
     64 let purge_region reg = Queue.clear reg.waiters
     65 
     66 let leave_region reg sz =
     67    try
     68      if reg.count - sz >= reg.size then raise Queue.Empty;
     69      let (w, sz') = Queue.take reg.waiters in
     70      reg.count <- reg.count - sz + sz';
     71      Lwt.wakeup w ()
     72    with Queue.Empty ->
     73      reg.count <- reg.count - sz
     74 
     75 let run_in_region_1 reg sz thr =
     76   (catch
     77      (fun () -> thr () >>= (fun v -> leave_region reg sz; return v))
     78      (fun e -> leave_region reg sz; fail e))
     79 
     80 let run_in_region reg sz thr =
     81   if reg.count >= reg.size then begin
     82     let res = wait () in
     83     Queue.add (res, sz) reg.waiters;
     84     res >>= (fun () -> run_in_region_1 reg sz thr)
     85   end else begin
     86     reg.count <- reg.count + sz;
     87     run_in_region_1 reg sz thr
     88   end