transport.ml (8993B)
1 (* Unison file synchronizer: src/transport.ml *) 2 (* Copyright 1999-2020, Benjamin C. Pierce 3 4 This program is free software: you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation, either version 3 of the License, or 7 (at your option) any later version. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 *) 17 18 19 open Common 20 open Lwt 21 22 let debug = Trace.debug "transport" 23 24 (*****************************************************************************) 25 (* MAIN FUNCTIONS *) 26 (*****************************************************************************) 27 28 let fileSize uiFrom uiTo = 29 match uiFrom, uiTo with 30 _, Updates (File (props, ContentsUpdated (_, _, ress)), _) -> 31 (Props.length props, Osx.ressLength ress) 32 | Updates (_, Previous (`FILE, props, _, ress)), 33 (NoUpdates | Updates (File (_, ContentsSame), _)) -> 34 (Props.length props, Osx.ressLength ress) 35 | _ -> 36 assert false 37 38 let maxthreads = 39 Prefs.createInt "maxthreads" 0 40 ~category:(`Advanced `General) 41 "maximum number of simultaneous file transfers" 42 ("This preference controls how much concurrency is allowed during \ 43 the transport phase. Normally, it should be set reasonably high \ 44 to maximize performance, but when Unison is used over a \ 45 low-bandwidth link it may be helpful to set it lower (e.g. \ 46 to 1) so that Unison doesn't soak up all the available bandwidth. \ 47 The default is the special value 0, which mean 20 threads \ 48 when file content streaming is deactivated and 1000 threads \ 49 when it is activated.") 50 51 let maxThreads () = 52 let n = Prefs.read maxthreads in 53 if n > 0 then n else 54 if Prefs.read Remote.streamingActivated then 1000 else 20 55 56 let run dispenseTask = 57 let runConcurrent limit dispenseTask = 58 let avail = ref limit in 59 let rec runTask thr = 60 Lwt.try_bind thr 61 (fun () -> nextTask ()) 62 (fun _ -> assert false) 63 (* It is a programming error for an exception to reach this far. *) 64 and nextTask () = 65 match dispenseTask () with 66 | None -> Lwt.return (incr avail) 67 | Some thr -> runTask thr 68 in 69 let rec fillPool () = 70 match dispenseTask () with 71 | None -> () 72 | Some thr -> 73 decr avail; 74 let _ : unit Lwt.t = runTask thr in 75 if !avail > 0 then fillPool () 76 in 77 fillPool () 78 in 79 (* When streaming, we can transfer many file simultaneously: 80 as the contents of only one file is transferred in one direction 81 at any time, little resource is consumed this way. *) 82 let limit = maxThreads () in 83 Lwt_util.resize_region !Files.copyReg limit; 84 runConcurrent limit dispenseTask 85 86 (* Logging for a thread: write a message before and a message after the 87 execution of the thread. *) 88 let logLwt (msgBegin: string) 89 (t: unit -> 'a Lwt.t) 90 (fMsgEnd: 'a -> string) 91 : 'a Lwt.t = 92 Trace.log msgBegin; 93 Lwt.bind (t ()) (fun v -> 94 Trace.log (fMsgEnd v); 95 Lwt.return v) 96 97 (* [logLwtNumbered desc t] provides convenient logging for a thread given a 98 description [desc] of the thread [t ()], generate pair of messages of the 99 following form in the log: 100 * 101 [BGN] <desc> 102 ... 103 [END] <desc> 104 **) 105 let rLogCounter = ref 0 106 let logLwtNumbered (lwtDescription: string) (lwtShortDescription: string) 107 (t: unit -> 'a Lwt.t): 'a Lwt.t = 108 let _ = (rLogCounter := (!rLogCounter) + 1; !rLogCounter) in 109 let lwtDescription = Util.replacesubstring lwtDescription "\n " "" in 110 logLwt (Printf.sprintf "[BGN] %s\n" lwtDescription) t 111 (fun _ -> 112 Printf.sprintf "[END] %s\n" lwtShortDescription) 113 114 let doAction 115 fromRoot fromPath fromContents toRoot toPath toContents notDefault id = 116 if not !Trace.sendLogMsgsToStderr then 117 Trace.statusDetail (Path.toString toPath); 118 Remote.Thread.unwindProtect (fun () -> 119 match fromContents, toContents with 120 {typ = `ABSENT}, {ui = uiTo} -> 121 logLwtNumbered 122 ("Deleting " ^ Path.toString toPath ^ 123 "\n from "^ root2string toRoot) 124 ("Deleting " ^ Path.toString toPath) 125 (fun () -> 126 Files.delete fromRoot fromPath toRoot toPath uiTo notDefault) 127 (* No need to transfer the whole directory/file if there were only 128 property modifications on one side. (And actually, it would be 129 incorrect to transfer a directory in this case.) *) 130 | {status= `Unchanged | `PropsChanged; desc= fromProps; ui= uiFrom}, 131 {status= `Unchanged | `PropsChanged; desc= toProps; ui = uiTo} -> 132 logLwtNumbered 133 ("Copying properties for " ^ Path.toString toPath 134 ^ "\n from " ^ root2string fromRoot ^ "\n to " ^ 135 root2string toRoot) 136 ("Copying properties for " ^ Path.toString toPath) 137 (fun () -> 138 Files.setProp 139 fromRoot fromPath toRoot toPath fromProps toProps uiFrom uiTo) 140 | {typ = `FILE; ui = uiFrom}, {typ = `FILE; ui = uiTo} -> 141 logLwtNumbered 142 ("Updating file " ^ Path.toString toPath ^ "\n from " ^ 143 root2string fromRoot ^ "\n to " ^ 144 root2string toRoot) 145 ("Updating file " ^ Path.toString toPath) 146 (fun () -> 147 Files.copy (`Update (fileSize uiFrom uiTo)) 148 fromRoot fromPath uiFrom [] toRoot toPath uiTo [] 149 notDefault id) 150 | {ui = uiFrom; props = propsFrom}, {ui = uiTo; props = propsTo} -> 151 logLwtNumbered 152 ("Copying " ^ Path.toString toPath ^ "\n from " ^ 153 root2string fromRoot ^ "\n to " ^ 154 root2string toRoot) 155 ("Copying " ^ Path.toString toPath) 156 (fun () -> 157 Files.copy `Copy 158 fromRoot fromPath uiFrom propsFrom 159 toRoot toPath uiTo propsTo 160 notDefault id)) 161 (fun e -> Trace.logonly 162 (Printf.sprintf 163 "Failed [%s]: %s\n" (Path.toString toPath) (Util.printException e)); 164 return ()) 165 166 let propagate root1 root2 reconItem id showMergeFn = 167 let path = reconItem.path1 in 168 match reconItem.replicas with 169 Problem p -> 170 Trace.log (Printf.sprintf "[ERROR] Skipping %s\n %s\n" 171 (Path.toString path) p); 172 return () 173 | Different 174 {rc1 = rc1; rc2 = rc2; direction = dir; default_direction = def} -> 175 let notDefault = dir <> def in 176 match dir with 177 Conflict c -> 178 Trace.log (Printf.sprintf "[CONFLICT] Skipping %s\n %s\n" 179 (Path.toString path) c); 180 return () 181 | Replica1ToReplica2 -> 182 doAction 183 root1 reconItem.path1 rc1 root2 reconItem.path2 rc2 notDefault id 184 | Replica2ToReplica1 -> 185 doAction 186 root2 reconItem.path2 rc2 root1 reconItem.path1 rc1 notDefault id 187 | Merge -> 188 if rc1.typ <> `FILE || rc2.typ <> `FILE then 189 raise (Util.Transient "Can only merge two existing files"); 190 Files.merge 191 root1 reconItem.path1 rc1.ui root2 reconItem.path2 rc2.ui id 192 showMergeFn; 193 return () 194 195 let transportItem reconItem id showMergeFn = 196 let (root1,root2) = Globals.roots() in 197 propagate root1 root2 reconItem id showMergeFn 198 199 (* ---------------------------------------------------------------------- *) 200 201 let lastLogStart = ref 0. 202 203 let logStart () = 204 Abort.reset (); 205 let t = Unix.gettimeofday () in 206 lastLogStart := t; 207 let tm = Util.localtime t in 208 let m = 209 Printf.sprintf 210 "%s%s started propagating changes at %02d:%02d:%02d.%02d on %02d %s %04d\n" 211 (if Prefs.read Trace.terse || Prefs.read Globals.batch then "" else "\n\n") 212 (String.capitalize_ascii Uutil.myNameAndVersion) 213 tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec 214 (min 99 (truncate (mod_float t 1. *. 100.))) 215 tm.Unix.tm_mday (Util.monthname tm.Unix.tm_mon) 216 (tm.Unix.tm_year+1900) in 217 Trace.logverbose m 218 219 let logFinish () = 220 let t = Unix.gettimeofday () in 221 let tm = Util.localtime t in 222 let m = 223 Printf.sprintf 224 "%s finished propagating changes at %02d:%02d:%02d.%02d on %02d %s %04d, %.3f s\n%s" 225 (String.capitalize_ascii Uutil.myNameAndVersion) 226 tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec 227 (min 99 (truncate (mod_float t 1. *. 100.))) 228 tm.Unix.tm_mday (Util.monthname tm.Unix.tm_mon) 229 (tm.Unix.tm_year+1900) 230 ( t -. !lastLogStart ) 231 (if Prefs.read Trace.terse || Prefs.read Globals.batch then "" else "\n\n") in 232 Trace.logverbose m