unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

transfer.ml (31980B)


      1 (* Unison file synchronizer: src/transfer.ml *)
      2 (* Copyright 1999-2020, Benjamin C. Pierce
      3 
      4     This program is free software: you can redistribute it and/or modify
      5     it under the terms of the GNU General Public License as published by
      6     the Free Software Foundation, either version 3 of the License, or
      7     (at your option) any later version.
      8 
      9     This program is distributed in the hope that it will be useful,
     10     but WITHOUT ANY WARRANTY; without even the implied warranty of
     11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12     GNU General Public License for more details.
     13 
     14     You should have received a copy of the GNU General Public License
     15     along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 *)
     17 
     18 
     19 (* rsync compression algorithm
     20 
     21      To compress, we use a compression buffer with a size a lot
     22    greater than the size of a block, typically half a megabyte. This
     23    buffer is loaded with the file contents. Its valid part is
     24    represented by its limit 'length'.
     25      We scan the file contents by sliding a window with the size of a
     26    block over the compression buffer. This window is represented by
     27    its 'offset' and its size 'blockSize'.
     28      We transmit STRING tokens, containing the differences between the
     29    files, and BLOCK tokens, containing the number of a block from the
     30    old file found in the new one. The data not transmitted yet are
     31    pointed by 'toBeSent'.
     32      For each position of the window, we compute the checksum of the
     33    block it contains and try to find a matching entry in the hashed
     34    block information data. If there is a match, we compute the
     35    fingerprint of our block to match it with the candidates'
     36    fingerprints :
     37    - if there is a match, we've just hit, we can transmit the data not
     38    sent yet as a STRING token and emit a BLOCK token representing our
     39    match, then we slide the window one block ahead and try again;
     40   - in any other case, we've missed, we just slide the window one
     41    character ahead and try again.
     42      If the file size is greater than the compression buffer size,
     43    then we have to update the compression buffer when the window
     44    reaches its limit. We do so by sending any data not sent yet, then
     45    copying the end of the buffer at its beginning and filling it up
     46    with the file contents coming next. We now place our window at the
     47    beginning of the buffer and we continue the process.
     48      The compression is over when we reach the end of the file. We
     49    just have to send the data not sent yet together with the last
     50    characters that could not fill a block.  *)
     51 
     52 let debug  = Trace.debug "transfer"
     53 let debugV = Trace.debug "transfer+"
     54 let debugToken = Trace.debug "rsynctoken"
     55 let debugLog =   Trace.debug "rsynclog"
     56 
     57 open Lwt
     58 
     59 type transfer_instruction = Bytearray.t * int * int
     60 
     61 type transmitter = transfer_instruction -> unit Lwt.t
     62 
     63 (*************************************************************************)
     64 (*                          BUFFERED DISK I/O                            *)
     65 (*************************************************************************)
     66 
     67 let reallyRead infd buffer pos length =
     68   let rec read pos length =
     69     let n = input infd buffer pos length in
     70     if n = length || n = 0 then pos + n else
     71     read (pos + n) (length - n)
     72   in
     73   read pos length - pos
     74 
     75 let rec reallyWrite outfd buffer pos length =
     76   output outfd buffer pos length
     77 
     78 let rec reallyWriteSubstring outfd buffer pos length =
     79   output_substring outfd buffer pos length
     80 
     81 (*************************************************************************)
     82 (*                            TOKEN QUEUE                                *)
     83 (*************************************************************************)
     84 
     85 (* There are two goals:
     86    1) to merge consecutive compatible tokens (catenating STRING tokens
     87       and combining BLOCK tokens when the referenced blocks are
     88       consecutive)
     89    2) to delay the transmission of the tokens across the network until
     90       their total size is greater than a limit, not to make a costly
     91       RPC for each token (therefore, the rsync module uses memory up to
     92       (2 * comprBufSize + tokenQueueLimit) bytes at a time) *)
     93 
     94 type token =
     95   | STRING   of bytes * int * int
     96   | BLOCK of int
     97   | EOF
     98 
     99 (* Size of a block *)
    100 let minBlockSize = 700
    101 
    102 (* This should at most 65535+3 bytes, as we are using this size to
    103    ensure that string token lengths will fit in 2 bytes. *)
    104 let queueSize = 65500
    105 let queueSizeFS = Uutil.Filesize.ofInt queueSize
    106 type tokenQueue =
    107   { mutable data : Bytearray.t;  (* the queued tokens *)
    108     mutable previous : [`Str of int | `Block of int | `None];
    109                                  (* some information about the
    110                                     previous token *)
    111     mutable pos : int;           (* head of the queue *)
    112     mutable prog : int;          (* the size of the data they represent *)
    113     mutable bSize : int }        (* block size *)
    114 
    115 let encodeInt3 s pos i =
    116   assert (i >= 0 && i < 256 * 256 * 256);
    117   s.{pos + 0} <- Char.chr ((i lsr 0) land 0xff);
    118   s.{pos + 1} <- Char.chr ((i lsr 8) land 0xff);
    119   s.{pos + 2} <- Char.chr ((i lsr 16) land 0xff)
    120 
    121 let decodeInt3 s pos =
    122   (Char.code s.{pos + 0} lsl 0) lor
    123   (Char.code s.{pos + 1} lsl 8) lor
    124   (Char.code s.{pos + 2} lsl 16)
    125 
    126 let encodeInt2 s pos i =
    127   assert (i >= 0 && i < 65536);
    128   s.{pos + 0} <- Char.chr ((i lsr 0) land 0xff);
    129   s.{pos + 1} <- Char.chr ((i lsr 8) land 0xff)
    130 
    131 let decodeInt2 s pos =
    132   (Char.code s.{pos + 0} lsl 0) lor (Char.code s.{pos + 1} lsl 8)
    133 
    134 let encodeInt1 s pos i =
    135   assert (i >= 0 && i < 256);
    136   s.{pos + 0} <- Char.chr i
    137 
    138 let decodeInt1 s pos =
    139   Char.code s.{pos + 0}
    140 
    141 (* Transmit the contents of the tokenQueue *)
    142 let flushQueue q showProgress transmit cond =
    143   if cond && q.pos > 0 then begin
    144     debugToken (fun() -> Util.msg "flushing the token queue\n");
    145     transmit (q.data, 0, q.pos) >>= (fun () ->
    146     showProgress q.prog;
    147     q.pos <- 0; q.prog <- 0; q.previous <- `None;
    148     return ())
    149   end else
    150     return ()
    151 
    152 let pushEOF q showProgress transmit =
    153   if Trace.enabled "rsynctoken" then
    154     debugToken (fun() ->
    155       Util.msg "pushing EOF (pos:%d/%d)\n" q.pos queueSize);
    156   flushQueue q showProgress transmit
    157     (q.pos + 1 > queueSize) >>= (fun () ->
    158   assert (q.pos < queueSize);
    159   q.data.{q.pos} <- 'E';
    160   q.pos <- q.pos + 1;
    161   q.previous <- `None;
    162   return ())
    163 
    164 let rec pushString q id transmit s pos len =
    165   flushQueue q id transmit (q.pos + len + 3 > queueSize) >>= fun () ->
    166   if Trace.enabled "rsynctoken" then
    167     debugToken (fun() ->
    168       Util.msg "pushing string (pos:%d/%d len:%d)\n" q.pos queueSize len);
    169   let l = min len (queueSize - q.pos - 3) in
    170   assert (l > 0);
    171   q.data.{q.pos} <- 'S';
    172   encodeInt2 q.data (q.pos + 1) l;
    173   Bytearray.blit_from_bytes s pos q.data (q.pos + 3) l;
    174   q.pos <- q.pos + l + 3;
    175   q.prog <- q.prog + l;
    176   q.previous <- `Str l;
    177   if l < len then
    178     pushString q id transmit s (pos + l) (len - l)
    179   else
    180     return ()
    181 
    182 let growString q id transmit len' s pos len =
    183   if Trace.enabled "rsynctoken" then
    184     debugToken (fun() ->
    185       Util.msg "growing string (pos:%d/%d len:%d+%d)\n"
    186         q.pos queueSize len' len);
    187   let l = min (queueSize - q.pos) len in
    188   Bytearray.blit_from_bytes s pos q.data q.pos l;
    189   assert (q.pos - len' - 3 >= 0);
    190   assert (q.data.{q.pos - len' - 3} = 'S');
    191   assert (decodeInt2 q.data (q.pos - len' - 2) = len');
    192   let len'' = len' + l in
    193   encodeInt2 q.data (q.pos - len' - 2) len'';
    194   q.pos <- q.pos + l;
    195   q.prog <- q.prog + l;
    196   q.previous <- `Str len'';
    197   if l < len then
    198     pushString q id transmit s (pos + l) (len - l)
    199   else
    200     return ()
    201 
    202 let pushBlock q id transmit pos =
    203   flushQueue q id transmit (q.pos + 5 > queueSize) >>= (fun () ->
    204   if Trace.enabled "rsynctoken" then
    205     debugToken (fun() ->
    206       Util.msg "pushing block (pos:%d/%d)\n" q.pos queueSize);
    207   assert (q.pos + 5 <= queueSize);
    208   q.data.{q.pos} <- 'B';
    209   encodeInt3 q.data (q.pos + 1) pos;
    210   encodeInt1 q.data (q.pos + 4) 1;
    211   q.pos <- q.pos + 5;
    212   q.prog <- q.prog + q.bSize;
    213   q.previous <- `Block (pos + 1);
    214   return ())
    215 
    216 let growBlock q id transmit pos =
    217   if Trace.enabled "rsynctoken" then
    218     debugToken (fun() ->
    219       Util.msg "growing blocks (pos:%d/%d)\n" q.pos queueSize);
    220   assert (q.pos >= 5);
    221   let count = decodeInt1 q.data (q.pos - 1) in
    222   assert (q.data.{q.pos - 5} = 'B');
    223   assert (decodeInt3 q.data (q.pos - 4) + count = pos);
    224   assert (count < 255);
    225   encodeInt1 q.data (q.pos - 1) (count + 1);
    226   q.prog <- q.prog + q.bSize;
    227   q.previous <- if count = 254 then `None else `Block (pos + 1);
    228   return ()
    229 
    230 (* Queue a new token, possibly merging it with a previous compatible
    231    token and flushing the queue if its size becomes greater than the
    232    limit *)
    233 let queueToken q id transmit token =
    234   match token, q.previous with
    235     EOF, _ ->
    236       pushEOF q id transmit
    237   | STRING (s, pos, len), `Str len' ->
    238       growString q id transmit len' s pos len
    239   | STRING (s, pos, len), _ ->
    240       pushString q id transmit s pos len
    241   | BLOCK pos, `Block pos' when pos = pos' ->
    242       growBlock q id transmit pos
    243   | BLOCK pos, _ ->
    244       pushBlock q id transmit pos
    245 
    246 let makeQueue blockSize =
    247   { data =
    248       (* We need to make sure here that the size of the queue is not
    249          larger than 65538
    250          (1 byte: header, 2 bytes: string size, 65535 bytes: string) *)
    251       Bytearray.create queueSize;
    252     pos = 0; previous = `None; prog = 0;
    253     bSize = blockSize }
    254 
    255 (*************************************************************************)
    256 (* GENERIC TRANSMISSION                                                  *)
    257 (*************************************************************************)
    258 
    259 let debug = Trace.debug "generic"
    260 
    261 (* Slice the file into STRING tokens that are transmitted incrementally *)
    262 let send infd length showProgress transmit =
    263   debug (fun() -> Util.msg "sending file\n");
    264   let timer = Trace.startTimer "Sending file using generic transmission" in
    265   let bufSz = 8192 in
    266   let bufSzFS = Uutil.Filesize.ofInt 8192 in
    267   let buf = Bytes.create bufSz in
    268   let q = makeQueue 0 in
    269   let rec sendSlice length =
    270     if length > Uutil.Filesize.zero then begin
    271       let count =
    272         reallyRead infd buf 0
    273           (if length > bufSzFS then bufSz else Uutil.Filesize.toInt length) in
    274       if count = 0 then
    275         Lwt.return ()
    276       else begin
    277         queueToken q showProgress transmit (STRING (buf, 0, count))
    278           >>= fun () ->
    279         let length = Uutil.Filesize.sub length (Uutil.Filesize.ofInt count) in
    280         sendSlice length
    281       end
    282     end else
    283       Lwt.return ()
    284   in
    285   sendSlice length >>= (fun () ->
    286   queueToken q showProgress transmit EOF >>= (fun () ->
    287   flushQueue q showProgress transmit true >>= (fun () ->
    288   Trace.showTimer timer;
    289   return ())))
    290 
    291 let rec receiveRec outfd showProgress data pos maxPos =
    292   if pos = maxPos then false else
    293   match data.{pos} with
    294     'S' ->
    295       let length = decodeInt2 data (pos + 1) in
    296       if Trace.enabled "generic" then debug (fun() -> Util.msg
    297           "receiving %d bytes\n" length);
    298       reallyWriteSubstring outfd (Bytearray.sub data (pos + 3) length) 0 length;
    299       showProgress length;
    300       receiveRec outfd showProgress data (pos + length + 3) maxPos
    301   | 'E' ->
    302       true
    303   | _   ->
    304       assert false
    305 
    306 let receive outfd showProgress (data, pos, len) =
    307   receiveRec outfd showProgress data pos (pos + len)
    308 
    309 (*************************************************************************)
    310 (* RSYNC TRANSMISSION                                                    *)
    311 (*************************************************************************)
    312 
    313 module Rsync =
    314 struct
    315 
    316   (* Debug messages *)
    317   let debug =      Trace.debug "rsync"
    318 
    319 
    320 (**************************** DESTINATION HOST ***************************)
    321 
    322   (* It is impossible to use rsync when the file size is smaller than
    323      the size of a block *)
    324   let minBlockSizeFs = Uutil.Filesize.ofInt minBlockSize
    325   let aboveRsyncThreshold sz = sz > minBlockSizeFs
    326 
    327   (* The type of the info that will be sent to the source host *)
    328   type rsync_block_info =
    329     { blockSize : int;
    330       blockCount : int;
    331       checksumSize : int;
    332       weakChecksum :
    333         (int32, Bigarray.int32_elt, Bigarray.c_layout) Bigarray.Array1.t;
    334       strongChecksum : Bytearray.t }
    335 
    336   let mrsync_block_info =
    337     Umarshal.(prod5 int int int int32bigarray Bytearray.m
    338                 (fun {blockSize; blockCount; checksumSize; weakChecksum; strongChecksum} ->
    339                   blockSize, blockCount, checksumSize, weakChecksum, strongChecksum)
    340                 (fun (blockSize, blockCount, checksumSize, weakChecksum, strongChecksum) ->
    341                   {blockSize; blockCount; checksumSize; weakChecksum; strongChecksum}))
    342 
    343   (*** PREPROCESS ***)
    344 
    345   (* Worst case probability of a failure *)
    346   let logProba = -27. (* One time in 100 millions *)
    347   (* Strength of the weak checksum
    348      (how many bit of the weak checksum we can rely on) *)
    349   let weakLen = 27.
    350   (* This is what rsync uses:
    351        let logProba = -10.
    352        let weakLen = 31.
    353      This would save almost 3 bytes per block, but one need to be able
    354      to recover from an rsync error.
    355      (We would have to take into account that our weak checksum is
    356       only 31 bits.)
    357   *)
    358   (* Block size *)
    359   (* Block size (including the minimum and maximum limits used here) is
    360      calculated similar to the rsync reference implementation. The main
    361      difference here is rounding down to a power of 2 to allow block-level
    362      links on filesystems that support it. *)
    363   let computeBlockSize l =
    364     1 lsl (truncate (max 10. (min (Float.round (log (sqrt l) /. log 2.)) 17.)))
    365   (* Size of each strong checksum *)
    366   let checksumSize bs sl dl =
    367     let bits =
    368       -. logProba -. weakLen +. log (sl *. dl /. float bs) /. log 2. in
    369     max 2 (min 16 (truncate ((bits +. 7.99) /. 8.)))
    370 
    371   let sizes srcLength dstLength =
    372     let blockSize = computeBlockSize (Uutil.Filesize.toFloat dstLength) in
    373     let blockCount =
    374       let count =
    375         Int64.div (Uutil.Filesize.toInt64 dstLength) (Int64.of_int blockSize)
    376       in
    377       Int64.to_int (min 16777216L count)
    378     in
    379     let csSize =
    380       checksumSize blockSize
    381         (Uutil.Filesize.toFloat srcLength)(Uutil.Filesize.toFloat dstLength)
    382     in
    383     (blockSize, blockCount, csSize)
    384 
    385   (* Incrementally build arg by executing f on successive blocks (of size
    386      'blockSize') of the input stream (pointed by 'infd').
    387      The procedure uses a buffer of size 'bufferSize' to load the input,
    388      and eventually handles the buffer update. *)
    389   let blockIter infd f blockSize maxCount =
    390     let bufferSize = 8192 + blockSize in
    391     let buffer = Bytes.create bufferSize in
    392     let rec iter count offset length =
    393       if count = maxCount then
    394         count
    395       else begin
    396         let newOffset = offset + blockSize in
    397         if newOffset <= length then begin
    398           f count buffer offset;
    399           iter (count + 1) newOffset length
    400         end else if offset > 0 then begin
    401           let chunkSize = length - offset in
    402           Bytes.blit buffer offset buffer 0 chunkSize;
    403           iter count 0 chunkSize
    404         end else begin
    405           let l = input infd buffer length (bufferSize - length) in
    406           if l = 0 then
    407             count
    408           else
    409             iter count 0 (length + l)
    410         end
    411       end
    412     in
    413     iter 0 0 0
    414 
    415   (* Given a block size, get blocks from the old file and compute a
    416      checksum and a fingerprint for each one. *)
    417   let rsyncPreprocess infd srcLength dstLength =
    418     debug (fun() -> Util.msg "preprocessing\n");
    419     let (blockSize, blockCount, csSize) = sizes srcLength dstLength in
    420     debugLog (fun() ->
    421       Util.msg "block size = %d bytes; block count = %d; \
    422                 strong checksum size = %d\n" blockSize blockCount csSize);
    423     let timer = Trace.startTimer "Preprocessing old file" in
    424     let weakCs =
    425       Bigarray.Array1.create Bigarray.int32 Bigarray.c_layout blockCount in
    426     let strongCs = Bytearray.create (blockCount * csSize) in
    427     let addBlock i buf offset =
    428       weakCs.{i} <- Int32.of_int (Checksum.subbytes buf offset blockSize);
    429       Bytearray.blit_from_string
    430         (Digest.subbytes buf offset blockSize) 0 strongCs (i * csSize) csSize
    431     in
    432     (* Make sure we are at the beginning of the file
    433        (important for AppleDouble files *)
    434     LargeFile.seek_in infd 0L;
    435     let count =
    436       (* Limit the number of blocks so that there is no overflow in
    437          encodeInt3 *)
    438       blockIter infd addBlock blockSize (min blockCount (256*256*256)) in
    439     debugLog (fun() -> Util.msg "%d blocks\n" count);
    440     Trace.showTimer timer;
    441     let sigs =
    442       { blockSize = blockSize; blockCount = count; checksumSize = csSize;
    443         weakChecksum = weakCs; strongChecksum = strongCs } in
    444     if
    445       sigs.blockCount > Bigarray.Array1.dim sigs.weakChecksum ||
    446       sigs.blockCount * sigs.checksumSize >
    447       Bigarray.Array1.dim sigs.strongChecksum
    448     then
    449       raise
    450         (Util.Transient
    451            (Format.sprintf
    452               "Internal error during rsync transfer (preprocess), \
    453                please report: %d %d - %d %d"
    454               sigs.blockCount (Bigarray.Array1.dim sigs.weakChecksum)
    455               (sigs.blockCount * sigs.checksumSize)
    456               (Bigarray.Array1.dim sigs.strongChecksum)));
    457     (sigs, blockSize)
    458 
    459   (* Expected size of the [rsync_block_info] datastructure (in KiB). *)
    460   let memoryFootprint srcLength dstLength =
    461     let (blockSize, blockCount, csSize) = sizes srcLength dstLength in
    462     blockCount * (csSize + 4)
    463 
    464   (*** DECOMPRESSION ***)
    465 
    466   (* Decompression buffer size *)
    467   let decomprBufSize = 8192
    468 
    469   (* For each transfer instruction, either output a string or copy one or
    470      several blocks from the old file. *)
    471   let rsyncDecompress blockSize infd outfd ?copyFn showProgress (data, pos, len) =
    472     let decomprBuf = Bytes.create decomprBufSize in
    473     let progress = ref 0 in
    474     let rec copy length =
    475       if length > decomprBufSize then begin
    476         let _ = reallyRead infd decomprBuf 0 decomprBufSize in
    477         reallyWrite outfd decomprBuf 0 decomprBufSize;
    478         copy (length - decomprBufSize)
    479       end else
    480         let _ = reallyRead infd decomprBuf 0 length in
    481         reallyWrite outfd decomprBuf 0 length
    482     in
    483     let copyBlocks' offs length =
    484       LargeFile.seek_in infd offs;
    485       copy length
    486     in
    487     let copyBlocks n k =
    488       let offs = Int64.mul n (Int64.of_int blockSize) in
    489       let length = k * blockSize in
    490       begin match copyFn with
    491       | None -> copyBlocks' offs length
    492       | Some f ->
    493           let fallback copied =
    494             let offs = Int64.add offs (Uutil.Filesize.toInt64 copied)
    495             and length = length - (Uutil.Filesize.toInt copied) in
    496             copyBlocks' offs length
    497           in
    498           f (Uutil.Filesize.ofInt64 offs) (Uutil.Filesize.ofInt length) ~fallback;
    499       end;
    500       progress := !progress + length
    501     in
    502     let maxPos = pos + len in
    503     let rec decode pos =
    504       if pos = maxPos then false else
    505       match data.{pos} with
    506         'S' ->
    507           let length = decodeInt2 data (pos + 1) in
    508           if Trace.enabled "rsynctoken" then
    509             debugToken (fun() ->
    510               Util.msg "decompressing string (%d bytes)\n" length);
    511           reallyWriteSubstring outfd (Bytearray.sub data (pos + 3) length) 0 length;
    512           progress := !progress + length;
    513           decode (pos + length + 3)
    514       | 'B' ->
    515           let n = decodeInt3 data (pos + 1) in
    516           let k = decodeInt1 data (pos + 4) in
    517           if Trace.enabled "rsynctoken" then
    518             debugToken (fun() -> Util.msg
    519                 "decompressing %d block(s) (sequence %d->%d)\n"
    520                 k n (n + k - 1));
    521           copyBlocks (Int64.of_int n) k;
    522           decode (pos + 5)
    523       | 'E' ->
    524           true
    525       | _ ->
    526           assert false
    527     in
    528     let finished = decode pos in
    529     showProgress !progress;
    530     finished
    531 
    532 (***************************** SOURCE HOST *******************************)
    533 
    534   (*** CUSTOM HASH TABLE ***)
    535 
    536   (* Half the maximum number of entries in the hash table.
    537      MUST be a power of 2 !
    538      Typical values are around an average 2 * fileSize / blockSize. *)
    539   let hashTableMaxLength = 1024 * 1024
    540 
    541   let rec upperPowerOfTwo n n2 =
    542     if (n2 >= n) || (n2 = hashTableMaxLength) then
    543       n2
    544     else
    545       upperPowerOfTwo n (2 * n2)
    546 
    547   let hash checksum = checksum
    548 
    549   (* Compute the hash table length as a function of the number of blocks *)
    550   let computeHashTableLength signatures =
    551     2 * (upperPowerOfTwo signatures.blockCount 32)
    552 
    553   (* Hash the block signatures into the hash table *)
    554   let hashSig hashTableLength signatures =
    555     let hashTable = Array.make hashTableLength [] in
    556     for k = 0 to signatures.blockCount - 1 do
    557       let cs = Int32.to_int signatures.weakChecksum.{k} land 0x7fffffff in
    558       let h = (hash cs) land (hashTableLength - 1) in
    559       hashTable.(h) <- (k, cs) :: hashTable.(h)
    560     done;
    561     hashTable
    562 
    563   (* Given a key, retrieve the corresponding entry in the table *)
    564   let findEntry hashTable hashTableLength checksum :
    565       (int * Checksum.t) list =
    566     let i = (hash checksum) land (hashTableLength - 1) in
    567     hashTable.(i)
    568 
    569   let sigFilter hashTableLength signatures =
    570     let len = hashTableLength lsl 2 in
    571     let filter = Bytes.make len '\000' in
    572     for k = 0 to signatures.blockCount - 1 do
    573       let cs = Int32.to_int signatures.weakChecksum.{k} land 0x7fffffff in
    574       let h1 = cs lsr 28 in
    575       assert (h1 >= 0 && h1 < 8);
    576       let h2 = (cs lsr 5) land (len - 1) in
    577       let mask = 1 lsl h1 in
    578       Bytes.set filter h2 (Char.chr (Char.code (Bytes.get filter h2) lor mask))
    579     done;
    580     Bytes.to_string filter
    581 
    582   let filterMem filter hashTableLength checksum =
    583     let len = hashTableLength lsl 2 in
    584     let h2 = (checksum lsr 5) land (len - 1) in
    585     let h1 = checksum lsr 28 in
    586     let mask = 1 lsl h1 in
    587     Char.code (String.unsafe_get filter h2) land mask <> 0
    588 
    589   (* Log the values of the parameters associated with the hash table *)
    590   let logHash hashTable hashTableLength =
    591     let rec probe empty collision i =
    592       if i = hashTableLength then (empty, collision)
    593       else begin
    594         let length = Safelist.length hashTable.(i) in
    595         let next =
    596           if length = 0 then probe (empty + 1) collision
    597           else if length > 1 then probe empty (collision + 1)
    598           else probe empty collision
    599         in
    600         next (i + 1)
    601       end
    602     in
    603     let (empty, collision) = probe 0 0 0 in
    604     debugLog (fun() -> Util.msg "%d hash table entries\n" hashTableLength);
    605     debugLog (fun() -> Util.msg
    606         "%d empty, %d used, %d collided\n"
    607         empty (hashTableLength - empty) collision)
    608 
    609   (*** MEASURES ***)
    610 
    611   type probes = {
    612       mutable hitHit : int;
    613       mutable hitMiss : int;
    614       mutable missMiss : int;
    615       mutable nbBlock : int;
    616       mutable nbString : int;
    617       mutable stringSize : int
    618     }
    619 
    620   let logMeasures pb =
    621     debugLog (fun() -> Util.msg
    622         "hit-hit = %d, hit-miss = %d, miss-miss = %d, hit rate = %d%%\n"
    623         pb.hitHit pb.hitMiss pb.missMiss
    624         (if pb.hitHit <> 0 then
    625            pb.hitHit * 100 / (pb.hitHit + pb.hitMiss)
    626          else 0))
    627 (*
    628     debugLog (fun() -> Util.msg
    629         "%d strings (%d bytes), %d blocks\n"
    630         pb.nbString pb.stringSize pb.nbBlock);
    631     let generic = pb.stringSize + pb.nbBlock * blockSize in
    632     debugLog (fun() -> Util.msg
    633         "file size = %d bytes\n"
    634         generic);
    635     debug (fun() -> Util.msg
    636         "compression rate = %d%%\n" ((pb.stringSize * 100) / generic))
    637 *)
    638 
    639 
    640   (*** COMPRESSION ***)
    641 
    642   (* Compression buffer size *)
    643   (* MUST be >= 2 * blockSize *)
    644   let minComprBufSize = 8192
    645 
    646   type compressorState =
    647     { (* Rolling checksum data *)
    648       mutable checksum : int;
    649       mutable cksumOutgoing : char;
    650       (* Buffering *)
    651       mutable offset : int;
    652       mutable toBeSent : int;
    653       mutable length : int;
    654       (* Position in file *)
    655       mutable absolutePos : Uutil.Filesize.t }
    656 
    657   (* Compress the file using the algorithm described in the header *)
    658   let rsyncCompress sigs infd srcLength showProgress transmit =
    659     debug (fun() -> Util.msg "compressing\n");
    660     if
    661       sigs.blockCount > Bigarray.Array1.dim sigs.weakChecksum ||
    662       sigs.blockCount * sigs.checksumSize >
    663       Bigarray.Array1.dim sigs.strongChecksum
    664     then
    665       raise
    666         (Util.Transient
    667            (Format.sprintf
    668               "Internal error during rsync transfer (compression), \
    669                please report: %d %d - %d %d"
    670               sigs.blockCount (Bigarray.Array1.dim sigs.weakChecksum)
    671               (sigs.blockCount * sigs.checksumSize)
    672               (Bigarray.Array1.dim sigs.strongChecksum)));
    673     let blockSize = sigs.blockSize in
    674     let comprBufSize = (2 * blockSize + 8191) land (-8192) in
    675     let comprBufSizeFS = Uutil.Filesize.ofInt comprBufSize in
    676     debugLog (fun() -> Util.msg
    677         "compression buffer size = %d bytes\n" comprBufSize);
    678     debugLog (fun() -> Util.msg "block size = %d bytes\n" blockSize);
    679     assert (comprBufSize >= 2 * blockSize);
    680     let timer = Trace.startTimer "Compressing the new file" in
    681 
    682     (* Measures *)
    683     let pb =
    684       { hitHit = 0; hitMiss = 0; missMiss = 0;
    685         nbBlock = 0; nbString = 0; stringSize = 0 } in
    686 (*
    687     let transmit tokenList =
    688       Safelist.iter
    689         (fun token ->
    690            match token with
    691            | STRING s ->
    692                let length = String.length s in
    693                if Trace.enabled "rsynctoken" then debugToken (fun() ->
    694                  Util.msg "transmitting string (%d bytes)\n" length);
    695                pb.nbString <- pb.nbString + 1;
    696                pb.stringSize <- pb.stringSize + length
    697            | BLOCK n ->
    698                if Trace.enabled "rsynctoken" then debugToken (fun() -> Util.msg
    699                    "transmitting %d block(s) (sequence %d->%d)\n"
    700                    1 n (n));
    701                pb.nbBlock <- pb.nbBlock + k)
    702         tokenList;
    703       transmit tokenList
    704     in
    705 *)
    706 
    707     (* Enable token buffering *)
    708     let tokenQueue = makeQueue blockSize in
    709     let flushTokenQueue () =
    710       flushQueue tokenQueue showProgress transmit true in
    711     let transmit token = queueToken tokenQueue showProgress transmit token in
    712 
    713     (* Set up the hash table for fast checksum look-up *)
    714     let hashTableLength = computeHashTableLength sigs in
    715     let blockTable = hashSig hashTableLength sigs in
    716     logHash blockTable hashTableLength;
    717 
    718     let filter = sigFilter hashTableLength sigs in
    719 
    720     let rec fingerprintMatchRec checksums pos fp i =
    721       let i = i - 1 in
    722       i < 0 ||
    723       (fp.[i] = checksums.{pos + i} &&
    724        fingerprintMatchRec checksums pos fp i)
    725     in
    726     let fingerprintMatch k fp =
    727       let pos = k * sigs.checksumSize in
    728       (*FIX: temporary debugging code... *)
    729       if
    730         pos + sigs.checksumSize > Bigarray.Array1.dim sigs.strongChecksum
    731       then
    732         raise
    733           (Util.Transient
    734              (Format.sprintf "Internal error during rsync transfer, \
    735                               please report: \
    736                               k:%d/%d pos:%d csSize:%d dim:%d"
    737                 k sigs.blockCount pos sigs.checksumSize
    738                 (Bigarray.Array1.dim sigs.strongChecksum)));
    739       fingerprintMatchRec sigs.strongChecksum pos fp sigs.checksumSize
    740     in
    741 
    742     (* Create the compression buffer *)
    743     let comprBuf = Bytes.create comprBufSize in
    744 
    745     (* If there is data waiting to be sent, transmit it as a STRING token *)
    746     let transmitString toBeSent offset =
    747       if offset > toBeSent then
    748         transmit (STRING (comprBuf, toBeSent, offset - toBeSent))
    749       else
    750         return ()
    751     in
    752 
    753     (* Set up the rolling checksum data *)
    754     let cksumTable = Checksum.init blockSize in
    755 
    756     let initialState =
    757       { checksum = 0; cksumOutgoing = ' ';
    758         offset = comprBufSize; toBeSent = comprBufSize; length = comprBufSize;
    759         absolutePos = Uutil.Filesize.zero }
    760     in
    761 
    762     (* Check the new window position and update the compression buffer
    763        if its end has been reached *)
    764     let rec slideWindow st miss : unit Lwt.t =
    765       if st.offset + blockSize <= st.length then
    766         computeChecksum st miss
    767       else if st.length = comprBufSize then begin
    768         transmitString st.toBeSent st.offset >>= (fun () ->
    769         let chunkSize = st.length - st.offset in
    770         if chunkSize > 0 then begin
    771           assert(comprBufSize >= blockSize);
    772           Bytes.blit comprBuf st.offset comprBuf 0 chunkSize
    773         end;
    774         let rem = Uutil.Filesize.sub srcLength st.absolutePos in
    775         let avail = comprBufSize - chunkSize in
    776         let l =
    777           reallyRead infd comprBuf chunkSize
    778             (if rem > comprBufSizeFS then avail else
    779              min (Uutil.Filesize.toInt rem) avail)
    780         in
    781         st.absolutePos <-
    782           Uutil.Filesize.add st.absolutePos (Uutil.Filesize.ofInt l);
    783         st.offset <- 0;
    784         st.toBeSent <- 0;
    785         st.length <- chunkSize + l;
    786         debugToken (fun() -> Util.msg "updating the compression buffer\n");
    787         debugToken (fun() -> Util.msg "new length = %d bytes\n" st.length);
    788         slideWindow st miss)
    789       end else
    790         transmitString st.toBeSent st.length >>= (fun () ->
    791         transmit EOF)
    792 
    793     (* Compute the window contents checksum, in a rolling fashion if there
    794        was a miss *)
    795     and computeChecksum st miss =
    796       if miss then
    797         rollChecksum st
    798       else begin
    799         let cksum = Checksum.subbytes comprBuf st.offset blockSize in
    800         st.checksum <- cksum;
    801         st.cksumOutgoing <- Bytes.unsafe_get comprBuf st.offset;
    802         processBlock st
    803       end
    804 
    805     and rollChecksum st =
    806       let ingoingChar =
    807         Bytes.unsafe_get comprBuf (st.offset + blockSize - 1) in
    808       let cksum =
    809         Checksum.roll cksumTable st.checksum st.cksumOutgoing ingoingChar in
    810       st.checksum <- cksum;
    811       st.cksumOutgoing <- Bytes.unsafe_get comprBuf st.offset;
    812       if filterMem filter hashTableLength cksum then
    813         processBlock st
    814       else
    815         miss st
    816 
    817     (* Try to match the current block with one existing in the old file *)
    818     and processBlock st =
    819       let checksum = st.checksum in
    820       match findEntry blockTable hashTableLength checksum with
    821       | [] ->
    822           pb.missMiss <- pb.missMiss + 1;
    823           miss st
    824       | entry ->
    825           let blockNum = findBlock st checksum entry None in
    826           if blockNum = -1 then begin
    827               pb.hitMiss <- pb.hitMiss + 1;
    828               miss st
    829           end else begin
    830               pb.hitHit <- pb.hitHit + 1;
    831               hit st blockNum
    832           end
    833 
    834     (* In the hash table entry, find nodes with the right checksum and
    835        match fingerprints *)
    836     and findBlock st checksum entry fingerprint =
    837       match entry, fingerprint with
    838       | [], _ ->
    839           -1
    840       | (k, cs) :: tl, None
    841         when cs = checksum ->
    842           let fingerprint = Digest.subbytes comprBuf st.offset blockSize in
    843           findBlock st checksum entry (Some fingerprint)
    844       | (k, cs) :: tl, Some fingerprint
    845         when cs = checksum && fingerprintMatch k fingerprint ->
    846           k
    847       | _ :: tl, _ ->
    848           findBlock st checksum tl fingerprint
    849 
    850     (* Miss : slide the window one character ahead *)
    851     and miss st =
    852       st.offset <- st.offset + 1;
    853       if st.offset + blockSize <= st.length then
    854         rollChecksum st
    855       else
    856         slideWindow st true
    857 
    858     (* Hit : send the data waiting and a BLOCK token, then slide the window
    859        one block ahead *)
    860     and hit st blockNum =
    861       transmitString st.toBeSent st.offset >>= (fun () ->
    862       let sent = st.offset in
    863       st.toBeSent <- sent + blockSize;
    864       transmit (BLOCK blockNum) >>= (fun () ->
    865       st.offset <- st.offset + blockSize;
    866       slideWindow st false))
    867     in
    868 
    869     (* Initialization and termination *)
    870     slideWindow initialState false >>= (fun () ->
    871     flushTokenQueue () >>= (fun () ->
    872     logMeasures pb;
    873     Trace.showTimer timer;
    874     return ()))
    875 
    876 end