unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

lwt.ml (3990B)


      1 
      2 (* Either a thread ['a t] has terminated, eithera successfully [Return of 'a] or
      3  *  unsuccessfully [Fail of exn], or it is sleeping
      4  *)
      5 type 'a state =
      6     Return of 'a
      7   | Fail of exn
      8   | Sleep
      9 
     10 (* A suspended thread is described by ['a t]
     11  * It could have several [waiters], which are thunk functions
     12  *)
     13 type 'a t =
     14   { mutable state : 'a state;
     15     mutable waiters : (unit -> unit) list }
     16 
     17 (* [make st] returns a thread of state [st] and no waiters *)
     18 let make st = { state = st; waiters = [] }
     19 
     20 (* add a thunk [f] to the waiting list of thread [t] *)
     21 let add_waiter t f = t.waiters <- f :: t.waiters
     22 
     23 (* restart a sleeping thread [t], run all its waiters
     24  * and running all the waiters, and make the terminating state [st]
     25  * [caller] is a string that describes the caller
     26  *)
     27 let restart t st caller =
     28   assert (st <> Sleep);
     29   if t.state <> Sleep then invalid_arg caller;
     30   t.state <- st;
     31   List.iter (fun f -> f ()) t.waiters;
     32   t.waiters <- []
     33 
     34 (*
     35  * pre-condition: [t.state] is Sleep (i.e., not terminated)
     36  * [connect t t'] connects the two processes when t' finishes up
     37  * connecting means: running all the waiters for [t']
     38  * and assigning the state of [t'] to [t]
     39  *)
     40 let rec connect t t' =
     41   if t.state <> Sleep then invalid_arg "connect";
     42   if t'.state = Sleep then
     43     add_waiter t' (fun () -> connect t t')
     44   else begin
     45     t.state <- t'.state;
     46     begin match t.waiters with
     47       [f] ->
     48         t.waiters <- [];
     49         f ()
     50     | _ ->
     51         List.iter (fun f -> f ()) t.waiters;
     52         t.waiters <- []
     53     end
     54   end
     55 
     56 (* apply function, reifying explicit exceptions into the thread type
     57  * apply: ('a -(exn)-> 'b t) -> ('a -(n)-> 'b t)
     58  * semantically a natural transformation TE -> T, where T is the thread
     59  * monad, which is layered over exception monad E.
     60  *)
     61 let apply f x = try f x with e -> make (Fail e)
     62 
     63 (****)
     64 
     65 let return v = make (Return v)
     66 let fail e = make (Fail e)
     67 
     68 let wait () = make Sleep
     69 let wakeup t v = restart t (Return v) "wakeup"
     70 let wakeup_exn t e = restart t (Fail e) "wakeup_exn"
     71 
     72 let rec bind x f =
     73   match x.state with
     74     Return v ->
     75       f v
     76   | Fail e ->
     77       fail e
     78   | Sleep ->
     79       let res = wait () in
     80       add_waiter x (fun () -> connect res (bind x (apply f)));
     81       res
     82 let (>>=) = bind
     83 
     84 let rec catch_rec x f =
     85   match x.state with
     86     Return v ->
     87       x
     88   | Fail e ->
     89       f e
     90   | Sleep ->
     91       let res = wait () in
     92       add_waiter x (fun () -> connect res (catch_rec x (apply f)));
     93       res
     94 
     95 let catch x f = catch_rec (apply x ()) f
     96 
     97 let rec try_bind_rec x f g =
     98   match x.state with
     99     Return v ->
    100       f v
    101   | Fail e ->
    102       apply g e
    103   | Sleep ->
    104       let res = wait () in
    105       add_waiter x (fun () -> connect res (try_bind_rec x (apply f) g));
    106       res
    107 
    108 let try_bind x f = try_bind_rec (apply x ()) f
    109 
    110 let poll x =
    111   match x.state with
    112     Fail e   -> raise e
    113   | Return v -> Some v
    114   | Sleep -> None
    115 
    116 let rec ignore_result x =
    117   match x.state with
    118     Return v ->
    119       ()
    120   | Fail e ->
    121       raise e
    122   | Sleep ->
    123       add_waiter x (fun () -> ignore_result x)
    124 
    125 let rec nth_ready l n =
    126   match l with
    127     [] ->
    128       assert false
    129   | x :: rem ->
    130       if x.state = Sleep then
    131         nth_ready rem n
    132       else if n > 0 then
    133         nth_ready rem (n - 1)
    134       else
    135         x
    136 
    137 let choose l =
    138   let ready = ref 0 in
    139   List.iter (fun x -> if x.state <> Sleep then incr ready) l;
    140   if !ready > 0 then
    141     nth_ready l (Random.int !ready)
    142   else
    143     let res = wait () in
    144     (* All waiters for this [choose] need to be remembered and cleared
    145        out once one of the threads finishes, to not leak memory. *)
    146     let waits = ref [] in
    147     let choose_done x =
    148       List.iter (fun (t, waiter) ->
    149           t.waiters <- List.filter (fun f -> f !=(*phys*) waiter) t.waiters)
    150         !waits;
    151       connect res x
    152     in
    153     let remember_waiter x =
    154       let waiter () = choose_done x in
    155       waits := (x, waiter) :: !waits;
    156       waiter
    157     in
    158     List.iter (fun x -> remember_waiter x |> add_waiter x) l;
    159     res