unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

transport.ml (8993B)


      1 (* Unison file synchronizer: src/transport.ml *)
      2 (* Copyright 1999-2020, Benjamin C. Pierce
      3 
      4     This program is free software: you can redistribute it and/or modify
      5     it under the terms of the GNU General Public License as published by
      6     the Free Software Foundation, either version 3 of the License, or
      7     (at your option) any later version.
      8 
      9     This program is distributed in the hope that it will be useful,
     10     but WITHOUT ANY WARRANTY; without even the implied warranty of
     11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12     GNU General Public License for more details.
     13 
     14     You should have received a copy of the GNU General Public License
     15     along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 *)
     17 
     18 
     19 open Common
     20 open Lwt
     21 
     22 let debug = Trace.debug "transport"
     23 
     24 (*****************************************************************************)
     25 (*                              MAIN FUNCTIONS                               *)
     26 (*****************************************************************************)
     27 
     28 let fileSize uiFrom uiTo =
     29   match uiFrom, uiTo with
     30     _, Updates (File (props, ContentsUpdated (_, _, ress)), _) ->
     31       (Props.length props, Osx.ressLength ress)
     32   | Updates (_, Previous (`FILE, props, _, ress)),
     33     (NoUpdates | Updates (File (_, ContentsSame), _)) ->
     34       (Props.length props, Osx.ressLength ress)
     35   | _ ->
     36       assert false
     37 
     38 let maxthreads =
     39   Prefs.createInt "maxthreads" 0
     40     ~category:(`Advanced `General)
     41     "maximum number of simultaneous file transfers"
     42     ("This preference controls how much concurrency is allowed during \
     43       the transport phase.  Normally, it should be set reasonably high \
     44       to maximize performance, but when Unison is used over a \
     45       low-bandwidth link it may be helpful to set it lower (e.g. \
     46       to 1) so that Unison doesn't soak up all the available bandwidth. \
     47       The default is the special value 0, which mean 20 threads \
     48       when file content streaming is deactivated and 1000 threads \
     49       when it is activated.")
     50 
     51 let maxThreads () =
     52   let n = Prefs.read maxthreads in
     53   if n > 0 then n else
     54   if Prefs.read Remote.streamingActivated then 1000 else 20
     55 
     56 let run dispenseTask =
     57   let runConcurrent limit dispenseTask =
     58     let avail = ref limit in
     59     let rec runTask thr =
     60       Lwt.try_bind thr
     61         (fun () -> nextTask ())
     62         (fun _ -> assert false)
     63         (* It is a programming error for an exception to reach this far. *)
     64     and nextTask () =
     65        match dispenseTask () with
     66        | None -> Lwt.return (incr avail)
     67        | Some thr -> runTask thr
     68     in
     69     let rec fillPool () =
     70       match dispenseTask () with
     71       | None -> ()
     72       | Some thr ->
     73           decr avail;
     74           let _ : unit Lwt.t = runTask thr in
     75           if !avail > 0 then fillPool ()
     76     in
     77     fillPool ()
     78   in
     79   (* When streaming, we can transfer many file simultaneously:
     80      as the contents of only one file is transferred in one direction
     81      at any time, little resource is consumed this way. *)
     82   let limit = maxThreads () in
     83   Lwt_util.resize_region !Files.copyReg limit;
     84   runConcurrent limit dispenseTask
     85 
     86 (* Logging for a thread: write a message before and a message after the
     87    execution of the thread. *)
     88 let logLwt (msgBegin: string)
     89     (t: unit -> 'a Lwt.t)
     90     (fMsgEnd: 'a -> string)
     91     : 'a Lwt.t =
     92   Trace.log msgBegin;
     93   Lwt.bind (t ()) (fun v ->
     94     Trace.log (fMsgEnd v);
     95     Lwt.return v)
     96 
     97 (* [logLwtNumbered desc t] provides convenient logging for a thread given a
     98    description [desc] of the thread [t ()], generate pair of messages of the
     99    following form in the log:
    100  *
    101     [BGN] <desc>
    102      ...
    103     [END] <desc>
    104  **)
    105 let rLogCounter = ref 0
    106 let logLwtNumbered (lwtDescription: string) (lwtShortDescription: string)
    107     (t: unit -> 'a Lwt.t): 'a Lwt.t =
    108   let _ = (rLogCounter := (!rLogCounter) + 1; !rLogCounter) in
    109   let lwtDescription = Util.replacesubstring lwtDescription "\n " "" in
    110   logLwt (Printf.sprintf "[BGN] %s\n" lwtDescription) t
    111     (fun _ ->
    112       Printf.sprintf "[END] %s\n" lwtShortDescription)
    113 
    114 let doAction
    115       fromRoot fromPath fromContents toRoot toPath toContents notDefault id =
    116   if not !Trace.sendLogMsgsToStderr then
    117     Trace.statusDetail (Path.toString toPath);
    118   Remote.Thread.unwindProtect (fun () ->
    119     match fromContents, toContents with
    120         {typ = `ABSENT}, {ui = uiTo} ->
    121            logLwtNumbered
    122              ("Deleting " ^ Path.toString toPath ^
    123               "\n  from "^ root2string toRoot)
    124              ("Deleting " ^ Path.toString toPath)
    125              (fun () ->
    126                 Files.delete fromRoot fromPath toRoot toPath uiTo notDefault)
    127       (* No need to transfer the whole directory/file if there were only
    128          property modifications on one side.  (And actually, it would be
    129          incorrect to transfer a directory in this case.) *)
    130       | {status= `Unchanged | `PropsChanged; desc= fromProps; ui= uiFrom},
    131         {status= `Unchanged | `PropsChanged; desc= toProps; ui = uiTo} ->
    132           logLwtNumbered
    133             ("Copying properties for " ^ Path.toString toPath
    134              ^ "\n  from " ^ root2string fromRoot ^ "\n  to " ^
    135              root2string toRoot)
    136             ("Copying properties for " ^ Path.toString toPath)
    137             (fun () ->
    138               Files.setProp
    139                 fromRoot fromPath toRoot toPath fromProps toProps uiFrom uiTo)
    140       | {typ = `FILE; ui = uiFrom}, {typ = `FILE; ui = uiTo} ->
    141           logLwtNumbered
    142             ("Updating file " ^ Path.toString toPath ^ "\n  from " ^
    143              root2string fromRoot ^ "\n  to " ^
    144              root2string toRoot)
    145             ("Updating file " ^ Path.toString toPath)
    146             (fun () ->
    147               Files.copy (`Update (fileSize uiFrom uiTo))
    148                 fromRoot fromPath uiFrom [] toRoot toPath uiTo []
    149                 notDefault id)
    150       | {ui = uiFrom; props = propsFrom}, {ui = uiTo; props = propsTo} ->
    151           logLwtNumbered
    152             ("Copying " ^ Path.toString toPath ^ "\n  from " ^
    153              root2string fromRoot ^ "\n  to " ^
    154              root2string toRoot)
    155             ("Copying " ^ Path.toString toPath)
    156             (fun () ->
    157                Files.copy `Copy
    158                  fromRoot fromPath uiFrom propsFrom
    159                  toRoot toPath uiTo propsTo
    160                  notDefault id))
    161     (fun e -> Trace.logonly
    162         (Printf.sprintf
    163            "Failed [%s]: %s\n" (Path.toString toPath) (Util.printException e));
    164       return ())
    165 
    166 let propagate root1 root2 reconItem id showMergeFn =
    167   let path = reconItem.path1 in
    168   match reconItem.replicas with
    169     Problem p ->
    170       Trace.log (Printf.sprintf "[ERROR] Skipping %s\n  %s\n"
    171                    (Path.toString path) p);
    172       return ()
    173   | Different
    174         {rc1 = rc1; rc2 = rc2; direction = dir; default_direction = def} ->
    175       let notDefault = dir <> def in
    176       match dir with
    177         Conflict c ->
    178           Trace.log (Printf.sprintf "[CONFLICT] Skipping %s\n  %s\n"
    179                        (Path.toString path) c);
    180           return ()
    181       | Replica1ToReplica2 ->
    182           doAction
    183             root1 reconItem.path1 rc1 root2 reconItem.path2 rc2 notDefault id
    184       | Replica2ToReplica1 ->
    185           doAction
    186             root2 reconItem.path2 rc2 root1 reconItem.path1 rc1 notDefault id
    187       | Merge ->
    188           if rc1.typ <> `FILE || rc2.typ <> `FILE then
    189             raise (Util.Transient "Can only merge two existing files");
    190           Files.merge
    191             root1 reconItem.path1 rc1.ui root2 reconItem.path2 rc2.ui id
    192             showMergeFn;
    193           return ()
    194 
    195 let transportItem reconItem id showMergeFn =
    196   let (root1,root2) = Globals.roots() in
    197   propagate root1 root2 reconItem id showMergeFn
    198 
    199 (* ---------------------------------------------------------------------- *)
    200 
    201 let lastLogStart = ref 0.
    202 
    203 let logStart () =
    204   Abort.reset ();
    205   let t = Unix.gettimeofday () in
    206   lastLogStart := t;
    207   let tm = Util.localtime t in
    208   let m =
    209     Printf.sprintf
    210       "%s%s started propagating changes at %02d:%02d:%02d.%02d on %02d %s %04d\n"
    211       (if Prefs.read Trace.terse || Prefs.read Globals.batch then "" else "\n\n")
    212       (String.capitalize_ascii Uutil.myNameAndVersion)
    213       tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec
    214       (min 99 (truncate (mod_float t 1. *. 100.)))
    215       tm.Unix.tm_mday (Util.monthname tm.Unix.tm_mon)
    216       (tm.Unix.tm_year+1900) in
    217   Trace.logverbose m
    218 
    219 let logFinish () =
    220   let t = Unix.gettimeofday () in
    221   let tm = Util.localtime t in
    222   let m =
    223     Printf.sprintf
    224       "%s finished propagating changes at %02d:%02d:%02d.%02d on %02d %s %04d, %.3f s\n%s"
    225       (String.capitalize_ascii Uutil.myNameAndVersion)
    226       tm.Unix.tm_hour tm.Unix.tm_min tm.Unix.tm_sec
    227       (min 99 (truncate (mod_float t 1. *. 100.)))
    228       tm.Unix.tm_mday (Util.monthname tm.Unix.tm_mon)
    229       (tm.Unix.tm_year+1900)
    230       ( t -. !lastLogStart )
    231       (if Prefs.read Trace.terse || Prefs.read Globals.batch then "" else "\n\n") in
    232   Trace.logverbose m