lwt.ml (3990B)
1 2 (* Either a thread ['a t] has terminated, eithera successfully [Return of 'a] or 3 * unsuccessfully [Fail of exn], or it is sleeping 4 *) 5 type 'a state = 6 Return of 'a 7 | Fail of exn 8 | Sleep 9 10 (* A suspended thread is described by ['a t] 11 * It could have several [waiters], which are thunk functions 12 *) 13 type 'a t = 14 { mutable state : 'a state; 15 mutable waiters : (unit -> unit) list } 16 17 (* [make st] returns a thread of state [st] and no waiters *) 18 let make st = { state = st; waiters = [] } 19 20 (* add a thunk [f] to the waiting list of thread [t] *) 21 let add_waiter t f = t.waiters <- f :: t.waiters 22 23 (* restart a sleeping thread [t], run all its waiters 24 * and running all the waiters, and make the terminating state [st] 25 * [caller] is a string that describes the caller 26 *) 27 let restart t st caller = 28 assert (st <> Sleep); 29 if t.state <> Sleep then invalid_arg caller; 30 t.state <- st; 31 List.iter (fun f -> f ()) t.waiters; 32 t.waiters <- [] 33 34 (* 35 * pre-condition: [t.state] is Sleep (i.e., not terminated) 36 * [connect t t'] connects the two processes when t' finishes up 37 * connecting means: running all the waiters for [t'] 38 * and assigning the state of [t'] to [t] 39 *) 40 let rec connect t t' = 41 if t.state <> Sleep then invalid_arg "connect"; 42 if t'.state = Sleep then 43 add_waiter t' (fun () -> connect t t') 44 else begin 45 t.state <- t'.state; 46 begin match t.waiters with 47 [f] -> 48 t.waiters <- []; 49 f () 50 | _ -> 51 List.iter (fun f -> f ()) t.waiters; 52 t.waiters <- [] 53 end 54 end 55 56 (* apply function, reifying explicit exceptions into the thread type 57 * apply: ('a -(exn)-> 'b t) -> ('a -(n)-> 'b t) 58 * semantically a natural transformation TE -> T, where T is the thread 59 * monad, which is layered over exception monad E. 60 *) 61 let apply f x = try f x with e -> make (Fail e) 62 63 (****) 64 65 let return v = make (Return v) 66 let fail e = make (Fail e) 67 68 let wait () = make Sleep 69 let wakeup t v = restart t (Return v) "wakeup" 70 let wakeup_exn t e = restart t (Fail e) "wakeup_exn" 71 72 let rec bind x f = 73 match x.state with 74 Return v -> 75 f v 76 | Fail e -> 77 fail e 78 | Sleep -> 79 let res = wait () in 80 add_waiter x (fun () -> connect res (bind x (apply f))); 81 res 82 let (>>=) = bind 83 84 let rec catch_rec x f = 85 match x.state with 86 Return v -> 87 x 88 | Fail e -> 89 f e 90 | Sleep -> 91 let res = wait () in 92 add_waiter x (fun () -> connect res (catch_rec x (apply f))); 93 res 94 95 let catch x f = catch_rec (apply x ()) f 96 97 let rec try_bind_rec x f g = 98 match x.state with 99 Return v -> 100 f v 101 | Fail e -> 102 apply g e 103 | Sleep -> 104 let res = wait () in 105 add_waiter x (fun () -> connect res (try_bind_rec x (apply f) g)); 106 res 107 108 let try_bind x f = try_bind_rec (apply x ()) f 109 110 let poll x = 111 match x.state with 112 Fail e -> raise e 113 | Return v -> Some v 114 | Sleep -> None 115 116 let rec ignore_result x = 117 match x.state with 118 Return v -> 119 () 120 | Fail e -> 121 raise e 122 | Sleep -> 123 add_waiter x (fun () -> ignore_result x) 124 125 let rec nth_ready l n = 126 match l with 127 [] -> 128 assert false 129 | x :: rem -> 130 if x.state = Sleep then 131 nth_ready rem n 132 else if n > 0 then 133 nth_ready rem (n - 1) 134 else 135 x 136 137 let choose l = 138 let ready = ref 0 in 139 List.iter (fun x -> if x.state <> Sleep then incr ready) l; 140 if !ready > 0 then 141 nth_ready l (Random.int !ready) 142 else 143 let res = wait () in 144 (* All waiters for this [choose] need to be remembered and cleared 145 out once one of the threads finishes, to not leak memory. *) 146 let waits = ref [] in 147 let choose_done x = 148 List.iter (fun (t, waiter) -> 149 t.waiters <- List.filter (fun f -> f !=(*phys*) waiter) t.waiters) 150 !waits; 151 connect res x 152 in 153 let remember_waiter x = 154 let waiter () = choose_done x in 155 waits := (x, waiter) :: !waits; 156 waiter 157 in 158 List.iter (fun x -> remember_waiter x |> add_waiter x) l; 159 res