watchercommon.ml (18408B)
1 (* Unison file synchronizer: src/fsmonitoring/watchercommon.ml *) 2 (* Copyright 2012, Benjamin C. Pierce 3 4 This program is free software: you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation, either version 3 of the License, or 7 (at your option) any later version. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 *) 17 18 let debug = ref false 19 20 let _ = 21 if Sys.unix || Sys.cygwin then 22 ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore) 23 24 module StringMap = Map.Make(String) 25 module StringSet = Set.Make(String) 26 module IntSet = 27 Set.Make 28 (struct type t = int let compare (x : int) (y : int) = compare x y end) 29 30 let disallowed_char c = 31 match c with 32 'A'..'Z' | 'a'..'z' | '0'..'9' | '-' | '_' | '.' | '~' 33 | '!' | '*' | '\'' | '(' | ')' | ';' | ':' | '@' | '&' 34 | '=' | '+' | '$' | ',' | '/' | '?' | '#' | '[' | ']' -> 35 false 36 | _ -> 37 true 38 39 let quote s = 40 let l = String.length s in 41 let n = ref 0 in 42 for i = 0 to l - 1 do if disallowed_char s.[i] then incr n done; 43 if !n = 0 then s else begin 44 let q = Bytes.create (l + 2 * !n) in 45 let j = ref 0 in 46 let hex = "0123456789ABCDEF" in 47 for i = 0 to l - 1 do 48 let c = s.[i] in 49 if disallowed_char s.[i] then begin 50 Bytes.set q !j '%'; 51 Bytes.set q (!j + 1) hex.[Char.code c lsr 4]; 52 Bytes.set q (!j + 2) hex.[Char.code c land 15]; 53 j := !j + 3 54 end else begin 55 Bytes.set q !j c; 56 incr j 57 end 58 done; 59 Bytes.to_string q 60 end 61 62 let unquote s = 63 let l = String.length s in 64 let n = ref 0 in 65 for i = 0 to l - 1 do if s.[i] = '%' then incr n done; 66 if !n = 0 then s else begin 67 let hex_char c = 68 match c with 69 '0'..'9' -> Char.code c - Char.code '0' 70 | 'a'..'f' -> Char.code c - Char.code 'a' + 10 71 | 'A'..'F' -> Char.code c - Char.code 'A' + 10 72 | _ -> invalid_arg "unquote" 73 in 74 let u = Bytes.create (l - 2 * !n) in 75 let j = ref 0 in 76 for i = 0 to l - 2 * !n - 1 do 77 let c = s.[!j] in 78 if c = '%' then begin 79 Bytes.set u i (Char.chr ((hex_char s.[!j + 1]) lsl 4 + hex_char s.[!j + 2])); 80 j := !j + 3 81 end else begin 82 Bytes.set u i c; 83 incr j 84 end 85 done; 86 Bytes.to_string u 87 end 88 89 let split_on_space s = 90 try 91 let i = String.index s ' ' in 92 (String.sub s 0 i, 93 String.sub s (i + 1) (String.length s - i - 1)) 94 with Not_found -> 95 (s, "") 96 97 let (>>=) = Lwt.bind 98 99 let rec really_write_substring o s pos len = 100 Lwt_unix.write_substring o s pos len >>= fun l -> 101 if l = len then 102 Lwt.return () 103 else 104 really_write_substring o s (pos + l) (len - l) 105 106 let format_exc e = 107 match e with 108 Unix.Unix_error (code, funct, arg) -> 109 Format.sprintf "%s [%s%s]%s@." 110 (Unix.error_message code) funct 111 (if String.length arg > 0 then "(" ^ arg ^ ")" else "") 112 (match code with 113 Unix.EUNKNOWNERR n -> Format.sprintf " (code %d)" n 114 | _ -> "") 115 | _ -> 116 Format.sprintf "uncaugth exception %s@." (Printexc.to_string e) 117 118 (****) 119 120 let _in = (*Lwt_unix.stdin*) Lwt_unix.of_unix_file_descr Unix.stdin 121 let _out = (*Lwt_unix.stdout*) Lwt_unix.of_unix_file_descr Unix.stdout 122 123 let printf fmt = 124 Printf.ksprintf (fun s -> really_write_substring _out s 0 (String.length s)) fmt 125 126 let read_line = 127 let b = Buffer.create 160 in 128 let buf = Bytes.create 160 in 129 let start = ref 0 in 130 let last = ref 0 in 131 let rec read_line () = 132 begin if !start = !last then begin 133 Lwt_unix.read _in buf 0 160 >>= fun l -> 134 if l = 0 then raise End_of_file; 135 start := 0; last := l; 136 Lwt.return () 137 end else 138 Lwt.return () 139 end >>= fun () -> 140 try 141 let i = Bytes.index_from buf !start '\n' in 142 if i >= !last then raise Not_found; 143 Buffer.add_subbytes b buf !start (i - !start); 144 start := i + 1; 145 let s = Buffer.contents b in 146 Buffer.clear b; 147 Lwt.return s 148 with Not_found -> 149 Buffer.add_subbytes b buf !start (!last - !start); 150 start := 0; last := 0; 151 read_line () 152 in 153 read_line 154 155 let error msg = 156 Lwt_unix.run (printf "ERROR %s\n" (quote msg)); 157 exit 1 158 159 (****) 160 161 exception Already_lost 162 163 module F (M : sig type watch end) = struct 164 include M 165 166 type status = Modified | Created 167 168 type t = 169 { id : int; mutable gen : int; 170 mutable watch : watch option; 171 mutable subdirs : t StringMap.t; 172 parent : parent; 173 archive_hash : string; 174 mutable changed : bool; 175 mutable changed_children : (status * float) StringMap.t } 176 177 and parent = Root of string * string | Parent of string * t 178 179 let get_id file = file.id 180 let get_watch file = file.watch 181 let set_watch file watch = file.watch <- watch 182 let get_subdirs file = file.subdirs 183 184 let current_gen = ref 0 185 186 let file_by_id = Hashtbl.create 16 187 let roots = Hashtbl.create 16 188 189 let concat fspath path = 190 if path = "" then fspath else Filename.concat fspath path 191 192 let is_root file = 193 match file.parent with 194 Root _ -> true 195 | Parent _ -> false 196 197 let rec dir_path dir path = 198 match dir.parent with 199 Root (fspath, path') -> concat fspath (concat path' path) 200 | Parent (name, dir) -> dir_path dir (concat name path) 201 202 (****) 203 204 let delay = 0.5 205 206 let changes = ref StringMap.empty 207 208 let waiting_for_changes = ref StringSet.empty 209 let active_wait = ref false 210 211 let change_table hash = 212 try 213 StringMap.find hash !changes 214 with Not_found -> 215 let h = Hashtbl.create 1024 in 216 changes := StringMap.add hash h !changes; 217 h 218 219 let signal_changes replicas_with_changes = 220 waiting_for_changes := StringSet.empty; 221 printf "CHANGES %s\n" 222 (String.concat " " 223 (List.map quote (StringSet.elements replicas_with_changes))) 224 225 let replicas_with_changes watched_replicas = 226 let time = Unix.gettimeofday () in 227 let changed = ref StringSet.empty in 228 Hashtbl.iter 229 (fun (hash', _, _) r -> 230 if 231 r.changed && 232 not (StringSet.mem hash' !changed) && 233 StringSet.mem hash' watched_replicas 234 then 235 changed := StringSet.add hash' !changed) 236 roots; 237 StringSet.iter 238 (fun hash -> 239 if not (StringSet.mem hash !changed) then 240 try 241 Hashtbl.iter 242 (fun _ time_ref -> if time -. time_ref > delay then raise Exit) 243 (change_table hash) 244 with Exit -> 245 changed := StringSet.add hash !changed) 246 watched_replicas; 247 !changed 248 249 let has_impending_changes watched_replicas = 250 try 251 StringSet.iter 252 (fun hash -> Hashtbl.iter (fun _ _ -> raise Exit) (change_table hash)) 253 watched_replicas; 254 false 255 with Exit -> 256 true 257 258 let rec wait_for_changes watched_replicas = 259 if not (StringSet.is_empty watched_replicas) then begin 260 let changed = replicas_with_changes watched_replicas in 261 if not (StringSet.is_empty changed) then signal_changes changed else 262 if has_impending_changes watched_replicas then signal_impending_changes () 263 else Lwt.return () 264 end else 265 Lwt.return () 266 267 and signal_impending_changes () = 268 if not (StringSet.is_empty !waiting_for_changes || !active_wait) then begin 269 active_wait := true; 270 Lwt_unix.sleep delay >>= fun () -> 271 active_wait := false; 272 wait_for_changes !waiting_for_changes 273 end else 274 Lwt.return () 275 276 let wait hash = 277 waiting_for_changes := StringSet.add hash !waiting_for_changes; 278 ignore (wait_for_changes (StringSet.singleton hash)) 279 280 let add_change dir nm time = 281 Hashtbl.replace (change_table dir.archive_hash) (dir.id, nm) time; 282 ignore (signal_impending_changes ()) 283 let remove_change dir nm = 284 Hashtbl.remove (change_table dir.archive_hash) (dir.id, nm) 285 let clear_change_table hash = 286 changes := StringMap.remove hash !changes 287 288 let clear_changes hash time = 289 let rec clear_rec f = 290 f.changed_children <- 291 StringMap.filter 292 (fun nm (_, time_ref) -> 293 if time -. time_ref <= delay then true else begin 294 remove_change f nm; 295 false 296 end) 297 f.changed_children; 298 StringMap.iter (fun _ f' -> clear_rec f') f.subdirs 299 in 300 Hashtbl.iter 301 (fun (hash', _, _) f -> 302 if hash' = hash then begin 303 f.changed <- false; 304 clear_rec f 305 end) 306 roots 307 308 (****) 309 310 let rec signal_change time dir nm_opt kind = 311 match nm_opt with 312 Some nm -> 313 begin try 314 let (st, _) = StringMap.find nm dir.changed_children in 315 if 316 st = Created && kind = `DEL && 317 not (StringMap.mem nm dir.subdirs) 318 then begin 319 if !debug then Format.eprintf "Deleted: %s@." (dir_path dir nm); 320 dir.changed_children <- StringMap.remove nm dir.changed_children; 321 remove_change dir nm 322 end else begin 323 dir.changed_children <- 324 StringMap.add nm (st, time) dir.changed_children; 325 add_change dir nm time 326 end 327 with Not_found -> 328 if kind = `CREAT && dir.gen <> !current_gen then begin 329 if !debug then Format.eprintf "Created: %s@." (dir_path dir nm); 330 dir.changed_children <- 331 StringMap.add nm (Created, time) dir.changed_children; 332 add_change dir nm time 333 end else begin 334 if !debug then Format.eprintf "Modified: %s@." (dir_path dir nm); 335 dir.changed_children <- 336 StringMap.add nm (Modified, time) dir.changed_children; 337 add_change dir nm time 338 end 339 end 340 | None -> 341 match dir.parent with 342 Root _ -> 343 dir.changed <- true; 344 ignore (signal_impending_changes ()) 345 | Parent (nm, parent_dir) -> 346 signal_change time parent_dir (Some nm) kind 347 348 let signal_overflow () = 349 Hashtbl.iter (fun _ r -> r.changed <- true) roots; 350 if not (StringSet.is_empty !waiting_for_changes) then 351 ignore (signal_changes !waiting_for_changes) 352 353 (****) 354 355 module type S = sig 356 val add_watch : string -> t -> bool -> unit 357 val release_watch : t -> unit 358 val watch : unit -> unit 359 val clear_event_memory : unit -> unit 360 end 361 362 module F (M : S) = struct 363 include M 364 365 let gather_changes hash time = 366 clear_event_memory (); 367 let rec gather_rec path r l = 368 let c = 369 StringMap.filter (fun _ (_, time_ref) -> time -. time_ref > delay) 370 r.changed_children 371 in 372 let l = StringMap.fold (fun nm _ l -> concat path nm :: l) c l in 373 StringMap.fold 374 (fun nm r' l -> 375 if StringMap.mem nm c then l else 376 gather_rec (concat path nm) r' l) 377 r.subdirs l 378 in 379 List.rev 380 (Hashtbl.fold 381 (fun (hash', _, path) r l -> 382 if hash' <> hash then l else 383 if r.changed then gather_rec path r (path :: l) else 384 gather_rec path r l) 385 roots []) 386 387 let rec find_root hash fspath path = 388 if Hashtbl.mem roots (hash, fspath, path) then 389 Some (fspath, path) 390 else 391 try 392 let i = String.rindex path '/' in 393 find_root hash fspath (String.sub path 0 i) 394 with Not_found -> 395 if path = "" then 396 None 397 else 398 find_root hash fspath "" 399 400 let last_file_id = ref 0 401 402 let new_file hash parent = 403 let f = 404 { id = !last_file_id; watch = None; gen = -1; 405 parent = parent; archive_hash = hash; subdirs = StringMap.empty; 406 changed = false; changed_children = StringMap.empty } 407 in 408 incr last_file_id; 409 Hashtbl.add file_by_id f.id f; 410 f 411 412 let new_root hash fspath path = 413 if !debug then Format.eprintf "ROOT %s %s@." fspath path; 414 let r = new_file hash (Root (fspath, path)) in 415 Hashtbl.add roots (hash, fspath, path) r; 416 r 417 418 let dir_child dir name = 419 try 420 StringMap.find name dir.subdirs 421 with Not_found -> 422 let d = new_file dir.archive_hash (Parent (name, dir)) in 423 dir.subdirs <- StringMap.add name d dir.subdirs; 424 d 425 426 let rec follow_path dir path pos = 427 if path = "" then dir else 428 try 429 let i = String.index_from path pos '/' in 430 try 431 let dir = StringMap.find (String.sub path pos (i - pos)) dir.subdirs in 432 follow_path dir path (i + 1) 433 with Not_found -> 434 assert false 435 with Not_found -> 436 dir_child dir (String.sub path pos (String.length path - pos)) 437 438 let rec follow_fspath hash fspath dir path pos = 439 if path = "" then dir else 440 try 441 let i = String.index_from path pos '/' in 442 try 443 let dir = StringMap.find (String.sub path pos (i - pos)) dir.subdirs in 444 follow_fspath hash fspath dir path (i + 1) 445 with Not_found -> 446 new_root hash fspath path 447 with Not_found -> 448 dir_child dir (String.sub path pos (String.length path - pos)) 449 450 let find_start hash fspath path = 451 match find_root hash fspath path with 452 None -> 453 new_root hash fspath path 454 | Some (root_fspath, root_path) -> 455 let root = Hashtbl.find roots (hash, root_fspath, root_path) in 456 if fspath = root_fspath && path = root_path then 457 root 458 else 459 follow_fspath hash fspath root path 460 (if root_path = "" then 0 else String.length root_path + 1) 461 462 let clear_file_changes file = 463 StringMap.iter (fun nm _ -> remove_change file nm) file.changed_children; 464 file.changed_children <- StringMap.empty; 465 file.changed <- false 466 467 let rec remove_file file = 468 if !debug then Format.eprintf "REMOVING %s@." (dir_path file ""); 469 StringMap.iter (fun _ f -> remove_file f) file.subdirs; 470 Hashtbl.remove file_by_id file.id; 471 release_watch file; 472 clear_file_changes file; 473 match file.parent with 474 Root _ -> () 475 | Parent (nm, p) -> p.subdirs <- StringMap.remove nm p.subdirs 476 477 let rec remove_old_files file = 478 if file.gen <> !current_gen then remove_file file else begin 479 StringMap.iter (fun _ f -> remove_old_files f) file.subdirs; 480 if 481 file.watch = None && StringMap.is_empty file.subdirs && 482 not (is_root file) 483 then 484 remove_file file 485 end 486 487 let watch_path path file follow = 488 try 489 add_watch path file follow 490 with 491 | Already_lost -> 492 if is_root file then 493 error (Format.sprintf "Path '%s' does not exist" path) 494 else 495 signal_change 0. file None `DEL 496 (* Most likely cause: A subdir was deleted during the scan. 497 Report it as a deletion. If this was not a deletion (could 498 have been an unmount, perhaps even a lost network connection) 499 then reporting it as a deletion will do no harm. Unison will 500 only know that "there was a change" and do its own scan based 501 on the report. 502 503 While getting here is itself a rare event, it is most likely 504 that the real deletion was already reported via [watch]. 505 The case of needing to report it here is really exceptional 506 but can happen the very first time a watcher is started on 507 a replica. *) 508 509 let print_ack () = printf "OK\n" 510 511 let start_watching hash fspath path = 512 let start_file = find_start hash fspath path in 513 clear_file_changes start_file; 514 start_file.gen <- !current_gen; 515 let fspath = concat fspath path in 516 (*Format.eprintf ">>> %s@." fspath;*) 517 if is_root start_file then watch_path fspath start_file false; 518 print_ack () >>= fun () -> 519 let rec add_directories () = 520 read_line () >>= fun l -> 521 let (cmd, path) = split_on_space l in 522 let path = unquote path in 523 match cmd with 524 "DIR" -> 525 (*Format.eprintf "DIR %s@." path;*) 526 let fullpath = concat fspath path in 527 let file = follow_path start_file path 0 in 528 clear_file_changes file; 529 file.gen <- !current_gen; 530 (*Format.eprintf "%s@." fullpath;*) 531 watch_path fullpath file false; 532 print_ack () >>= fun () -> 533 add_directories () 534 | "LINK" -> 535 (*Format.eprintf "LINK %s@." path;*) 536 let fullpath = concat fspath path in 537 let file = follow_path start_file path 0 in 538 clear_file_changes file; 539 file.gen <- !current_gen; 540 (*Format.eprintf "%s@." fullpath;*) 541 watch_path fullpath file true; 542 print_ack () >>= fun () -> 543 add_directories () 544 | "DONE" -> 545 Lwt.return () 546 | _ -> 547 error (Format.sprintf "unknown command '%s'" cmd) 548 in 549 add_directories () >>= fun () -> 550 (* We remove any file which is not watched anymore, 551 as well as files which are not in fact watched. *) 552 remove_old_files start_file; 553 incr current_gen; 554 Lwt.return () 555 556 (****) 557 558 let reset hash = 559 let l = ref [] in 560 Hashtbl.iter 561 (fun ((hash', _, _) as key) f -> 562 if hash' = hash then begin 563 l := key :: !l; 564 remove_file f 565 end) 566 roots; 567 List.iter (fun key -> Hashtbl.remove roots key) !l; 568 clear_event_memory (); 569 clear_change_table hash 570 571 (****) 572 573 let rec lazy_fold_right f l accu = 574 match l with 575 [] -> accu () 576 | a::l -> f a (fun () -> lazy_fold_right f l accu) 577 578 let output_changes hash = 579 let time = Unix.gettimeofday () in 580 let lst = gather_changes hash time in 581 clear_changes hash time; 582 lazy_fold_right (fun p cont -> printf "RECURSIVE %s\n" (quote p) >>= cont) 583 lst (fun () -> printf "DONE\n") 584 585 let rec loop () = 586 read_line () >>= fun l -> 587 (* Cancel any wait when receiving a command *) 588 let (cmd, args) = split_on_space l in 589 if cmd <> "WAIT" then waiting_for_changes := StringSet.empty; 590 match cmd with 591 "VERSION" -> 592 loop () 593 | "DEBUG" -> 594 debug := true; 595 loop () 596 | "START" -> 597 let (hash, rem) = split_on_space args in 598 let (fspath, path) = split_on_space rem in 599 start_watching (unquote hash) (unquote fspath) (unquote path) 600 >>= fun () -> 601 loop () 602 | "WAIT" -> 603 wait (unquote args); 604 loop () 605 | "CHANGES" -> 606 output_changes (unquote args) >>= fun () -> 607 loop () 608 | "RESET" -> 609 reset (unquote args); 610 loop () 611 | _ -> 612 error (Format.sprintf "unknown command '%s'" cmd) 613 614 let _ = 615 watch (); 616 Lwt_unix.run 617 (printf "VERSION 1\n" >>= fun () -> 618 Lwt.catch (fun () -> loop ()) 619 (fun e -> 620 match e with 621 End_of_file | Unix.Unix_error (Unix.EPIPE, _, _) -> 622 Lwt.return () 623 | _ -> 624 if !debug then Format.eprintf "%s@." (format_exc e); 625 error ("error while communicating with Unison: " ^ format_exc e))) 626 627 end 628 end