transfer.ml (31980B)
1 (* Unison file synchronizer: src/transfer.ml *) 2 (* Copyright 1999-2020, Benjamin C. Pierce 3 4 This program is free software: you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation, either version 3 of the License, or 7 (at your option) any later version. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 *) 17 18 19 (* rsync compression algorithm 20 21 To compress, we use a compression buffer with a size a lot 22 greater than the size of a block, typically half a megabyte. This 23 buffer is loaded with the file contents. Its valid part is 24 represented by its limit 'length'. 25 We scan the file contents by sliding a window with the size of a 26 block over the compression buffer. This window is represented by 27 its 'offset' and its size 'blockSize'. 28 We transmit STRING tokens, containing the differences between the 29 files, and BLOCK tokens, containing the number of a block from the 30 old file found in the new one. The data not transmitted yet are 31 pointed by 'toBeSent'. 32 For each position of the window, we compute the checksum of the 33 block it contains and try to find a matching entry in the hashed 34 block information data. If there is a match, we compute the 35 fingerprint of our block to match it with the candidates' 36 fingerprints : 37 - if there is a match, we've just hit, we can transmit the data not 38 sent yet as a STRING token and emit a BLOCK token representing our 39 match, then we slide the window one block ahead and try again; 40 - in any other case, we've missed, we just slide the window one 41 character ahead and try again. 42 If the file size is greater than the compression buffer size, 43 then we have to update the compression buffer when the window 44 reaches its limit. We do so by sending any data not sent yet, then 45 copying the end of the buffer at its beginning and filling it up 46 with the file contents coming next. We now place our window at the 47 beginning of the buffer and we continue the process. 48 The compression is over when we reach the end of the file. We 49 just have to send the data not sent yet together with the last 50 characters that could not fill a block. *) 51 52 let debug = Trace.debug "transfer" 53 let debugV = Trace.debug "transfer+" 54 let debugToken = Trace.debug "rsynctoken" 55 let debugLog = Trace.debug "rsynclog" 56 57 open Lwt 58 59 type transfer_instruction = Bytearray.t * int * int 60 61 type transmitter = transfer_instruction -> unit Lwt.t 62 63 (*************************************************************************) 64 (* BUFFERED DISK I/O *) 65 (*************************************************************************) 66 67 let reallyRead infd buffer pos length = 68 let rec read pos length = 69 let n = input infd buffer pos length in 70 if n = length || n = 0 then pos + n else 71 read (pos + n) (length - n) 72 in 73 read pos length - pos 74 75 let rec reallyWrite outfd buffer pos length = 76 output outfd buffer pos length 77 78 let rec reallyWriteSubstring outfd buffer pos length = 79 output_substring outfd buffer pos length 80 81 (*************************************************************************) 82 (* TOKEN QUEUE *) 83 (*************************************************************************) 84 85 (* There are two goals: 86 1) to merge consecutive compatible tokens (catenating STRING tokens 87 and combining BLOCK tokens when the referenced blocks are 88 consecutive) 89 2) to delay the transmission of the tokens across the network until 90 their total size is greater than a limit, not to make a costly 91 RPC for each token (therefore, the rsync module uses memory up to 92 (2 * comprBufSize + tokenQueueLimit) bytes at a time) *) 93 94 type token = 95 | STRING of bytes * int * int 96 | BLOCK of int 97 | EOF 98 99 (* Size of a block *) 100 let minBlockSize = 700 101 102 (* This should at most 65535+3 bytes, as we are using this size to 103 ensure that string token lengths will fit in 2 bytes. *) 104 let queueSize = 65500 105 let queueSizeFS = Uutil.Filesize.ofInt queueSize 106 type tokenQueue = 107 { mutable data : Bytearray.t; (* the queued tokens *) 108 mutable previous : [`Str of int | `Block of int | `None]; 109 (* some information about the 110 previous token *) 111 mutable pos : int; (* head of the queue *) 112 mutable prog : int; (* the size of the data they represent *) 113 mutable bSize : int } (* block size *) 114 115 let encodeInt3 s pos i = 116 assert (i >= 0 && i < 256 * 256 * 256); 117 s.{pos + 0} <- Char.chr ((i lsr 0) land 0xff); 118 s.{pos + 1} <- Char.chr ((i lsr 8) land 0xff); 119 s.{pos + 2} <- Char.chr ((i lsr 16) land 0xff) 120 121 let decodeInt3 s pos = 122 (Char.code s.{pos + 0} lsl 0) lor 123 (Char.code s.{pos + 1} lsl 8) lor 124 (Char.code s.{pos + 2} lsl 16) 125 126 let encodeInt2 s pos i = 127 assert (i >= 0 && i < 65536); 128 s.{pos + 0} <- Char.chr ((i lsr 0) land 0xff); 129 s.{pos + 1} <- Char.chr ((i lsr 8) land 0xff) 130 131 let decodeInt2 s pos = 132 (Char.code s.{pos + 0} lsl 0) lor (Char.code s.{pos + 1} lsl 8) 133 134 let encodeInt1 s pos i = 135 assert (i >= 0 && i < 256); 136 s.{pos + 0} <- Char.chr i 137 138 let decodeInt1 s pos = 139 Char.code s.{pos + 0} 140 141 (* Transmit the contents of the tokenQueue *) 142 let flushQueue q showProgress transmit cond = 143 if cond && q.pos > 0 then begin 144 debugToken (fun() -> Util.msg "flushing the token queue\n"); 145 transmit (q.data, 0, q.pos) >>= (fun () -> 146 showProgress q.prog; 147 q.pos <- 0; q.prog <- 0; q.previous <- `None; 148 return ()) 149 end else 150 return () 151 152 let pushEOF q showProgress transmit = 153 if Trace.enabled "rsynctoken" then 154 debugToken (fun() -> 155 Util.msg "pushing EOF (pos:%d/%d)\n" q.pos queueSize); 156 flushQueue q showProgress transmit 157 (q.pos + 1 > queueSize) >>= (fun () -> 158 assert (q.pos < queueSize); 159 q.data.{q.pos} <- 'E'; 160 q.pos <- q.pos + 1; 161 q.previous <- `None; 162 return ()) 163 164 let rec pushString q id transmit s pos len = 165 flushQueue q id transmit (q.pos + len + 3 > queueSize) >>= fun () -> 166 if Trace.enabled "rsynctoken" then 167 debugToken (fun() -> 168 Util.msg "pushing string (pos:%d/%d len:%d)\n" q.pos queueSize len); 169 let l = min len (queueSize - q.pos - 3) in 170 assert (l > 0); 171 q.data.{q.pos} <- 'S'; 172 encodeInt2 q.data (q.pos + 1) l; 173 Bytearray.blit_from_bytes s pos q.data (q.pos + 3) l; 174 q.pos <- q.pos + l + 3; 175 q.prog <- q.prog + l; 176 q.previous <- `Str l; 177 if l < len then 178 pushString q id transmit s (pos + l) (len - l) 179 else 180 return () 181 182 let growString q id transmit len' s pos len = 183 if Trace.enabled "rsynctoken" then 184 debugToken (fun() -> 185 Util.msg "growing string (pos:%d/%d len:%d+%d)\n" 186 q.pos queueSize len' len); 187 let l = min (queueSize - q.pos) len in 188 Bytearray.blit_from_bytes s pos q.data q.pos l; 189 assert (q.pos - len' - 3 >= 0); 190 assert (q.data.{q.pos - len' - 3} = 'S'); 191 assert (decodeInt2 q.data (q.pos - len' - 2) = len'); 192 let len'' = len' + l in 193 encodeInt2 q.data (q.pos - len' - 2) len''; 194 q.pos <- q.pos + l; 195 q.prog <- q.prog + l; 196 q.previous <- `Str len''; 197 if l < len then 198 pushString q id transmit s (pos + l) (len - l) 199 else 200 return () 201 202 let pushBlock q id transmit pos = 203 flushQueue q id transmit (q.pos + 5 > queueSize) >>= (fun () -> 204 if Trace.enabled "rsynctoken" then 205 debugToken (fun() -> 206 Util.msg "pushing block (pos:%d/%d)\n" q.pos queueSize); 207 assert (q.pos + 5 <= queueSize); 208 q.data.{q.pos} <- 'B'; 209 encodeInt3 q.data (q.pos + 1) pos; 210 encodeInt1 q.data (q.pos + 4) 1; 211 q.pos <- q.pos + 5; 212 q.prog <- q.prog + q.bSize; 213 q.previous <- `Block (pos + 1); 214 return ()) 215 216 let growBlock q id transmit pos = 217 if Trace.enabled "rsynctoken" then 218 debugToken (fun() -> 219 Util.msg "growing blocks (pos:%d/%d)\n" q.pos queueSize); 220 assert (q.pos >= 5); 221 let count = decodeInt1 q.data (q.pos - 1) in 222 assert (q.data.{q.pos - 5} = 'B'); 223 assert (decodeInt3 q.data (q.pos - 4) + count = pos); 224 assert (count < 255); 225 encodeInt1 q.data (q.pos - 1) (count + 1); 226 q.prog <- q.prog + q.bSize; 227 q.previous <- if count = 254 then `None else `Block (pos + 1); 228 return () 229 230 (* Queue a new token, possibly merging it with a previous compatible 231 token and flushing the queue if its size becomes greater than the 232 limit *) 233 let queueToken q id transmit token = 234 match token, q.previous with 235 EOF, _ -> 236 pushEOF q id transmit 237 | STRING (s, pos, len), `Str len' -> 238 growString q id transmit len' s pos len 239 | STRING (s, pos, len), _ -> 240 pushString q id transmit s pos len 241 | BLOCK pos, `Block pos' when pos = pos' -> 242 growBlock q id transmit pos 243 | BLOCK pos, _ -> 244 pushBlock q id transmit pos 245 246 let makeQueue blockSize = 247 { data = 248 (* We need to make sure here that the size of the queue is not 249 larger than 65538 250 (1 byte: header, 2 bytes: string size, 65535 bytes: string) *) 251 Bytearray.create queueSize; 252 pos = 0; previous = `None; prog = 0; 253 bSize = blockSize } 254 255 (*************************************************************************) 256 (* GENERIC TRANSMISSION *) 257 (*************************************************************************) 258 259 let debug = Trace.debug "generic" 260 261 (* Slice the file into STRING tokens that are transmitted incrementally *) 262 let send infd length showProgress transmit = 263 debug (fun() -> Util.msg "sending file\n"); 264 let timer = Trace.startTimer "Sending file using generic transmission" in 265 let bufSz = 8192 in 266 let bufSzFS = Uutil.Filesize.ofInt 8192 in 267 let buf = Bytes.create bufSz in 268 let q = makeQueue 0 in 269 let rec sendSlice length = 270 if length > Uutil.Filesize.zero then begin 271 let count = 272 reallyRead infd buf 0 273 (if length > bufSzFS then bufSz else Uutil.Filesize.toInt length) in 274 if count = 0 then 275 Lwt.return () 276 else begin 277 queueToken q showProgress transmit (STRING (buf, 0, count)) 278 >>= fun () -> 279 let length = Uutil.Filesize.sub length (Uutil.Filesize.ofInt count) in 280 sendSlice length 281 end 282 end else 283 Lwt.return () 284 in 285 sendSlice length >>= (fun () -> 286 queueToken q showProgress transmit EOF >>= (fun () -> 287 flushQueue q showProgress transmit true >>= (fun () -> 288 Trace.showTimer timer; 289 return ()))) 290 291 let rec receiveRec outfd showProgress data pos maxPos = 292 if pos = maxPos then false else 293 match data.{pos} with 294 'S' -> 295 let length = decodeInt2 data (pos + 1) in 296 if Trace.enabled "generic" then debug (fun() -> Util.msg 297 "receiving %d bytes\n" length); 298 reallyWriteSubstring outfd (Bytearray.sub data (pos + 3) length) 0 length; 299 showProgress length; 300 receiveRec outfd showProgress data (pos + length + 3) maxPos 301 | 'E' -> 302 true 303 | _ -> 304 assert false 305 306 let receive outfd showProgress (data, pos, len) = 307 receiveRec outfd showProgress data pos (pos + len) 308 309 (*************************************************************************) 310 (* RSYNC TRANSMISSION *) 311 (*************************************************************************) 312 313 module Rsync = 314 struct 315 316 (* Debug messages *) 317 let debug = Trace.debug "rsync" 318 319 320 (**************************** DESTINATION HOST ***************************) 321 322 (* It is impossible to use rsync when the file size is smaller than 323 the size of a block *) 324 let minBlockSizeFs = Uutil.Filesize.ofInt minBlockSize 325 let aboveRsyncThreshold sz = sz > minBlockSizeFs 326 327 (* The type of the info that will be sent to the source host *) 328 type rsync_block_info = 329 { blockSize : int; 330 blockCount : int; 331 checksumSize : int; 332 weakChecksum : 333 (int32, Bigarray.int32_elt, Bigarray.c_layout) Bigarray.Array1.t; 334 strongChecksum : Bytearray.t } 335 336 let mrsync_block_info = 337 Umarshal.(prod5 int int int int32bigarray Bytearray.m 338 (fun {blockSize; blockCount; checksumSize; weakChecksum; strongChecksum} -> 339 blockSize, blockCount, checksumSize, weakChecksum, strongChecksum) 340 (fun (blockSize, blockCount, checksumSize, weakChecksum, strongChecksum) -> 341 {blockSize; blockCount; checksumSize; weakChecksum; strongChecksum})) 342 343 (*** PREPROCESS ***) 344 345 (* Worst case probability of a failure *) 346 let logProba = -27. (* One time in 100 millions *) 347 (* Strength of the weak checksum 348 (how many bit of the weak checksum we can rely on) *) 349 let weakLen = 27. 350 (* This is what rsync uses: 351 let logProba = -10. 352 let weakLen = 31. 353 This would save almost 3 bytes per block, but one need to be able 354 to recover from an rsync error. 355 (We would have to take into account that our weak checksum is 356 only 31 bits.) 357 *) 358 (* Block size *) 359 (* Block size (including the minimum and maximum limits used here) is 360 calculated similar to the rsync reference implementation. The main 361 difference here is rounding down to a power of 2 to allow block-level 362 links on filesystems that support it. *) 363 let computeBlockSize l = 364 1 lsl (truncate (max 10. (min (Float.round (log (sqrt l) /. log 2.)) 17.))) 365 (* Size of each strong checksum *) 366 let checksumSize bs sl dl = 367 let bits = 368 -. logProba -. weakLen +. log (sl *. dl /. float bs) /. log 2. in 369 max 2 (min 16 (truncate ((bits +. 7.99) /. 8.))) 370 371 let sizes srcLength dstLength = 372 let blockSize = computeBlockSize (Uutil.Filesize.toFloat dstLength) in 373 let blockCount = 374 let count = 375 Int64.div (Uutil.Filesize.toInt64 dstLength) (Int64.of_int blockSize) 376 in 377 Int64.to_int (min 16777216L count) 378 in 379 let csSize = 380 checksumSize blockSize 381 (Uutil.Filesize.toFloat srcLength)(Uutil.Filesize.toFloat dstLength) 382 in 383 (blockSize, blockCount, csSize) 384 385 (* Incrementally build arg by executing f on successive blocks (of size 386 'blockSize') of the input stream (pointed by 'infd'). 387 The procedure uses a buffer of size 'bufferSize' to load the input, 388 and eventually handles the buffer update. *) 389 let blockIter infd f blockSize maxCount = 390 let bufferSize = 8192 + blockSize in 391 let buffer = Bytes.create bufferSize in 392 let rec iter count offset length = 393 if count = maxCount then 394 count 395 else begin 396 let newOffset = offset + blockSize in 397 if newOffset <= length then begin 398 f count buffer offset; 399 iter (count + 1) newOffset length 400 end else if offset > 0 then begin 401 let chunkSize = length - offset in 402 Bytes.blit buffer offset buffer 0 chunkSize; 403 iter count 0 chunkSize 404 end else begin 405 let l = input infd buffer length (bufferSize - length) in 406 if l = 0 then 407 count 408 else 409 iter count 0 (length + l) 410 end 411 end 412 in 413 iter 0 0 0 414 415 (* Given a block size, get blocks from the old file and compute a 416 checksum and a fingerprint for each one. *) 417 let rsyncPreprocess infd srcLength dstLength = 418 debug (fun() -> Util.msg "preprocessing\n"); 419 let (blockSize, blockCount, csSize) = sizes srcLength dstLength in 420 debugLog (fun() -> 421 Util.msg "block size = %d bytes; block count = %d; \ 422 strong checksum size = %d\n" blockSize blockCount csSize); 423 let timer = Trace.startTimer "Preprocessing old file" in 424 let weakCs = 425 Bigarray.Array1.create Bigarray.int32 Bigarray.c_layout blockCount in 426 let strongCs = Bytearray.create (blockCount * csSize) in 427 let addBlock i buf offset = 428 weakCs.{i} <- Int32.of_int (Checksum.subbytes buf offset blockSize); 429 Bytearray.blit_from_string 430 (Digest.subbytes buf offset blockSize) 0 strongCs (i * csSize) csSize 431 in 432 (* Make sure we are at the beginning of the file 433 (important for AppleDouble files *) 434 LargeFile.seek_in infd 0L; 435 let count = 436 (* Limit the number of blocks so that there is no overflow in 437 encodeInt3 *) 438 blockIter infd addBlock blockSize (min blockCount (256*256*256)) in 439 debugLog (fun() -> Util.msg "%d blocks\n" count); 440 Trace.showTimer timer; 441 let sigs = 442 { blockSize = blockSize; blockCount = count; checksumSize = csSize; 443 weakChecksum = weakCs; strongChecksum = strongCs } in 444 if 445 sigs.blockCount > Bigarray.Array1.dim sigs.weakChecksum || 446 sigs.blockCount * sigs.checksumSize > 447 Bigarray.Array1.dim sigs.strongChecksum 448 then 449 raise 450 (Util.Transient 451 (Format.sprintf 452 "Internal error during rsync transfer (preprocess), \ 453 please report: %d %d - %d %d" 454 sigs.blockCount (Bigarray.Array1.dim sigs.weakChecksum) 455 (sigs.blockCount * sigs.checksumSize) 456 (Bigarray.Array1.dim sigs.strongChecksum))); 457 (sigs, blockSize) 458 459 (* Expected size of the [rsync_block_info] datastructure (in KiB). *) 460 let memoryFootprint srcLength dstLength = 461 let (blockSize, blockCount, csSize) = sizes srcLength dstLength in 462 blockCount * (csSize + 4) 463 464 (*** DECOMPRESSION ***) 465 466 (* Decompression buffer size *) 467 let decomprBufSize = 8192 468 469 (* For each transfer instruction, either output a string or copy one or 470 several blocks from the old file. *) 471 let rsyncDecompress blockSize infd outfd ?copyFn showProgress (data, pos, len) = 472 let decomprBuf = Bytes.create decomprBufSize in 473 let progress = ref 0 in 474 let rec copy length = 475 if length > decomprBufSize then begin 476 let _ = reallyRead infd decomprBuf 0 decomprBufSize in 477 reallyWrite outfd decomprBuf 0 decomprBufSize; 478 copy (length - decomprBufSize) 479 end else 480 let _ = reallyRead infd decomprBuf 0 length in 481 reallyWrite outfd decomprBuf 0 length 482 in 483 let copyBlocks' offs length = 484 LargeFile.seek_in infd offs; 485 copy length 486 in 487 let copyBlocks n k = 488 let offs = Int64.mul n (Int64.of_int blockSize) in 489 let length = k * blockSize in 490 begin match copyFn with 491 | None -> copyBlocks' offs length 492 | Some f -> 493 let fallback copied = 494 let offs = Int64.add offs (Uutil.Filesize.toInt64 copied) 495 and length = length - (Uutil.Filesize.toInt copied) in 496 copyBlocks' offs length 497 in 498 f (Uutil.Filesize.ofInt64 offs) (Uutil.Filesize.ofInt length) ~fallback; 499 end; 500 progress := !progress + length 501 in 502 let maxPos = pos + len in 503 let rec decode pos = 504 if pos = maxPos then false else 505 match data.{pos} with 506 'S' -> 507 let length = decodeInt2 data (pos + 1) in 508 if Trace.enabled "rsynctoken" then 509 debugToken (fun() -> 510 Util.msg "decompressing string (%d bytes)\n" length); 511 reallyWriteSubstring outfd (Bytearray.sub data (pos + 3) length) 0 length; 512 progress := !progress + length; 513 decode (pos + length + 3) 514 | 'B' -> 515 let n = decodeInt3 data (pos + 1) in 516 let k = decodeInt1 data (pos + 4) in 517 if Trace.enabled "rsynctoken" then 518 debugToken (fun() -> Util.msg 519 "decompressing %d block(s) (sequence %d->%d)\n" 520 k n (n + k - 1)); 521 copyBlocks (Int64.of_int n) k; 522 decode (pos + 5) 523 | 'E' -> 524 true 525 | _ -> 526 assert false 527 in 528 let finished = decode pos in 529 showProgress !progress; 530 finished 531 532 (***************************** SOURCE HOST *******************************) 533 534 (*** CUSTOM HASH TABLE ***) 535 536 (* Half the maximum number of entries in the hash table. 537 MUST be a power of 2 ! 538 Typical values are around an average 2 * fileSize / blockSize. *) 539 let hashTableMaxLength = 1024 * 1024 540 541 let rec upperPowerOfTwo n n2 = 542 if (n2 >= n) || (n2 = hashTableMaxLength) then 543 n2 544 else 545 upperPowerOfTwo n (2 * n2) 546 547 let hash checksum = checksum 548 549 (* Compute the hash table length as a function of the number of blocks *) 550 let computeHashTableLength signatures = 551 2 * (upperPowerOfTwo signatures.blockCount 32) 552 553 (* Hash the block signatures into the hash table *) 554 let hashSig hashTableLength signatures = 555 let hashTable = Array.make hashTableLength [] in 556 for k = 0 to signatures.blockCount - 1 do 557 let cs = Int32.to_int signatures.weakChecksum.{k} land 0x7fffffff in 558 let h = (hash cs) land (hashTableLength - 1) in 559 hashTable.(h) <- (k, cs) :: hashTable.(h) 560 done; 561 hashTable 562 563 (* Given a key, retrieve the corresponding entry in the table *) 564 let findEntry hashTable hashTableLength checksum : 565 (int * Checksum.t) list = 566 let i = (hash checksum) land (hashTableLength - 1) in 567 hashTable.(i) 568 569 let sigFilter hashTableLength signatures = 570 let len = hashTableLength lsl 2 in 571 let filter = Bytes.make len '\000' in 572 for k = 0 to signatures.blockCount - 1 do 573 let cs = Int32.to_int signatures.weakChecksum.{k} land 0x7fffffff in 574 let h1 = cs lsr 28 in 575 assert (h1 >= 0 && h1 < 8); 576 let h2 = (cs lsr 5) land (len - 1) in 577 let mask = 1 lsl h1 in 578 Bytes.set filter h2 (Char.chr (Char.code (Bytes.get filter h2) lor mask)) 579 done; 580 Bytes.to_string filter 581 582 let filterMem filter hashTableLength checksum = 583 let len = hashTableLength lsl 2 in 584 let h2 = (checksum lsr 5) land (len - 1) in 585 let h1 = checksum lsr 28 in 586 let mask = 1 lsl h1 in 587 Char.code (String.unsafe_get filter h2) land mask <> 0 588 589 (* Log the values of the parameters associated with the hash table *) 590 let logHash hashTable hashTableLength = 591 let rec probe empty collision i = 592 if i = hashTableLength then (empty, collision) 593 else begin 594 let length = Safelist.length hashTable.(i) in 595 let next = 596 if length = 0 then probe (empty + 1) collision 597 else if length > 1 then probe empty (collision + 1) 598 else probe empty collision 599 in 600 next (i + 1) 601 end 602 in 603 let (empty, collision) = probe 0 0 0 in 604 debugLog (fun() -> Util.msg "%d hash table entries\n" hashTableLength); 605 debugLog (fun() -> Util.msg 606 "%d empty, %d used, %d collided\n" 607 empty (hashTableLength - empty) collision) 608 609 (*** MEASURES ***) 610 611 type probes = { 612 mutable hitHit : int; 613 mutable hitMiss : int; 614 mutable missMiss : int; 615 mutable nbBlock : int; 616 mutable nbString : int; 617 mutable stringSize : int 618 } 619 620 let logMeasures pb = 621 debugLog (fun() -> Util.msg 622 "hit-hit = %d, hit-miss = %d, miss-miss = %d, hit rate = %d%%\n" 623 pb.hitHit pb.hitMiss pb.missMiss 624 (if pb.hitHit <> 0 then 625 pb.hitHit * 100 / (pb.hitHit + pb.hitMiss) 626 else 0)) 627 (* 628 debugLog (fun() -> Util.msg 629 "%d strings (%d bytes), %d blocks\n" 630 pb.nbString pb.stringSize pb.nbBlock); 631 let generic = pb.stringSize + pb.nbBlock * blockSize in 632 debugLog (fun() -> Util.msg 633 "file size = %d bytes\n" 634 generic); 635 debug (fun() -> Util.msg 636 "compression rate = %d%%\n" ((pb.stringSize * 100) / generic)) 637 *) 638 639 640 (*** COMPRESSION ***) 641 642 (* Compression buffer size *) 643 (* MUST be >= 2 * blockSize *) 644 let minComprBufSize = 8192 645 646 type compressorState = 647 { (* Rolling checksum data *) 648 mutable checksum : int; 649 mutable cksumOutgoing : char; 650 (* Buffering *) 651 mutable offset : int; 652 mutable toBeSent : int; 653 mutable length : int; 654 (* Position in file *) 655 mutable absolutePos : Uutil.Filesize.t } 656 657 (* Compress the file using the algorithm described in the header *) 658 let rsyncCompress sigs infd srcLength showProgress transmit = 659 debug (fun() -> Util.msg "compressing\n"); 660 if 661 sigs.blockCount > Bigarray.Array1.dim sigs.weakChecksum || 662 sigs.blockCount * sigs.checksumSize > 663 Bigarray.Array1.dim sigs.strongChecksum 664 then 665 raise 666 (Util.Transient 667 (Format.sprintf 668 "Internal error during rsync transfer (compression), \ 669 please report: %d %d - %d %d" 670 sigs.blockCount (Bigarray.Array1.dim sigs.weakChecksum) 671 (sigs.blockCount * sigs.checksumSize) 672 (Bigarray.Array1.dim sigs.strongChecksum))); 673 let blockSize = sigs.blockSize in 674 let comprBufSize = (2 * blockSize + 8191) land (-8192) in 675 let comprBufSizeFS = Uutil.Filesize.ofInt comprBufSize in 676 debugLog (fun() -> Util.msg 677 "compression buffer size = %d bytes\n" comprBufSize); 678 debugLog (fun() -> Util.msg "block size = %d bytes\n" blockSize); 679 assert (comprBufSize >= 2 * blockSize); 680 let timer = Trace.startTimer "Compressing the new file" in 681 682 (* Measures *) 683 let pb = 684 { hitHit = 0; hitMiss = 0; missMiss = 0; 685 nbBlock = 0; nbString = 0; stringSize = 0 } in 686 (* 687 let transmit tokenList = 688 Safelist.iter 689 (fun token -> 690 match token with 691 | STRING s -> 692 let length = String.length s in 693 if Trace.enabled "rsynctoken" then debugToken (fun() -> 694 Util.msg "transmitting string (%d bytes)\n" length); 695 pb.nbString <- pb.nbString + 1; 696 pb.stringSize <- pb.stringSize + length 697 | BLOCK n -> 698 if Trace.enabled "rsynctoken" then debugToken (fun() -> Util.msg 699 "transmitting %d block(s) (sequence %d->%d)\n" 700 1 n (n)); 701 pb.nbBlock <- pb.nbBlock + k) 702 tokenList; 703 transmit tokenList 704 in 705 *) 706 707 (* Enable token buffering *) 708 let tokenQueue = makeQueue blockSize in 709 let flushTokenQueue () = 710 flushQueue tokenQueue showProgress transmit true in 711 let transmit token = queueToken tokenQueue showProgress transmit token in 712 713 (* Set up the hash table for fast checksum look-up *) 714 let hashTableLength = computeHashTableLength sigs in 715 let blockTable = hashSig hashTableLength sigs in 716 logHash blockTable hashTableLength; 717 718 let filter = sigFilter hashTableLength sigs in 719 720 let rec fingerprintMatchRec checksums pos fp i = 721 let i = i - 1 in 722 i < 0 || 723 (fp.[i] = checksums.{pos + i} && 724 fingerprintMatchRec checksums pos fp i) 725 in 726 let fingerprintMatch k fp = 727 let pos = k * sigs.checksumSize in 728 (*FIX: temporary debugging code... *) 729 if 730 pos + sigs.checksumSize > Bigarray.Array1.dim sigs.strongChecksum 731 then 732 raise 733 (Util.Transient 734 (Format.sprintf "Internal error during rsync transfer, \ 735 please report: \ 736 k:%d/%d pos:%d csSize:%d dim:%d" 737 k sigs.blockCount pos sigs.checksumSize 738 (Bigarray.Array1.dim sigs.strongChecksum))); 739 fingerprintMatchRec sigs.strongChecksum pos fp sigs.checksumSize 740 in 741 742 (* Create the compression buffer *) 743 let comprBuf = Bytes.create comprBufSize in 744 745 (* If there is data waiting to be sent, transmit it as a STRING token *) 746 let transmitString toBeSent offset = 747 if offset > toBeSent then 748 transmit (STRING (comprBuf, toBeSent, offset - toBeSent)) 749 else 750 return () 751 in 752 753 (* Set up the rolling checksum data *) 754 let cksumTable = Checksum.init blockSize in 755 756 let initialState = 757 { checksum = 0; cksumOutgoing = ' '; 758 offset = comprBufSize; toBeSent = comprBufSize; length = comprBufSize; 759 absolutePos = Uutil.Filesize.zero } 760 in 761 762 (* Check the new window position and update the compression buffer 763 if its end has been reached *) 764 let rec slideWindow st miss : unit Lwt.t = 765 if st.offset + blockSize <= st.length then 766 computeChecksum st miss 767 else if st.length = comprBufSize then begin 768 transmitString st.toBeSent st.offset >>= (fun () -> 769 let chunkSize = st.length - st.offset in 770 if chunkSize > 0 then begin 771 assert(comprBufSize >= blockSize); 772 Bytes.blit comprBuf st.offset comprBuf 0 chunkSize 773 end; 774 let rem = Uutil.Filesize.sub srcLength st.absolutePos in 775 let avail = comprBufSize - chunkSize in 776 let l = 777 reallyRead infd comprBuf chunkSize 778 (if rem > comprBufSizeFS then avail else 779 min (Uutil.Filesize.toInt rem) avail) 780 in 781 st.absolutePos <- 782 Uutil.Filesize.add st.absolutePos (Uutil.Filesize.ofInt l); 783 st.offset <- 0; 784 st.toBeSent <- 0; 785 st.length <- chunkSize + l; 786 debugToken (fun() -> Util.msg "updating the compression buffer\n"); 787 debugToken (fun() -> Util.msg "new length = %d bytes\n" st.length); 788 slideWindow st miss) 789 end else 790 transmitString st.toBeSent st.length >>= (fun () -> 791 transmit EOF) 792 793 (* Compute the window contents checksum, in a rolling fashion if there 794 was a miss *) 795 and computeChecksum st miss = 796 if miss then 797 rollChecksum st 798 else begin 799 let cksum = Checksum.subbytes comprBuf st.offset blockSize in 800 st.checksum <- cksum; 801 st.cksumOutgoing <- Bytes.unsafe_get comprBuf st.offset; 802 processBlock st 803 end 804 805 and rollChecksum st = 806 let ingoingChar = 807 Bytes.unsafe_get comprBuf (st.offset + blockSize - 1) in 808 let cksum = 809 Checksum.roll cksumTable st.checksum st.cksumOutgoing ingoingChar in 810 st.checksum <- cksum; 811 st.cksumOutgoing <- Bytes.unsafe_get comprBuf st.offset; 812 if filterMem filter hashTableLength cksum then 813 processBlock st 814 else 815 miss st 816 817 (* Try to match the current block with one existing in the old file *) 818 and processBlock st = 819 let checksum = st.checksum in 820 match findEntry blockTable hashTableLength checksum with 821 | [] -> 822 pb.missMiss <- pb.missMiss + 1; 823 miss st 824 | entry -> 825 let blockNum = findBlock st checksum entry None in 826 if blockNum = -1 then begin 827 pb.hitMiss <- pb.hitMiss + 1; 828 miss st 829 end else begin 830 pb.hitHit <- pb.hitHit + 1; 831 hit st blockNum 832 end 833 834 (* In the hash table entry, find nodes with the right checksum and 835 match fingerprints *) 836 and findBlock st checksum entry fingerprint = 837 match entry, fingerprint with 838 | [], _ -> 839 -1 840 | (k, cs) :: tl, None 841 when cs = checksum -> 842 let fingerprint = Digest.subbytes comprBuf st.offset blockSize in 843 findBlock st checksum entry (Some fingerprint) 844 | (k, cs) :: tl, Some fingerprint 845 when cs = checksum && fingerprintMatch k fingerprint -> 846 k 847 | _ :: tl, _ -> 848 findBlock st checksum tl fingerprint 849 850 (* Miss : slide the window one character ahead *) 851 and miss st = 852 st.offset <- st.offset + 1; 853 if st.offset + blockSize <= st.length then 854 rollChecksum st 855 else 856 slideWindow st true 857 858 (* Hit : send the data waiting and a BLOCK token, then slide the window 859 one block ahead *) 860 and hit st blockNum = 861 transmitString st.toBeSent st.offset >>= (fun () -> 862 let sent = st.offset in 863 st.toBeSent <- sent + blockSize; 864 transmit (BLOCK blockNum) >>= (fun () -> 865 st.offset <- st.offset + blockSize; 866 slideWindow st false)) 867 in 868 869 (* Initialization and termination *) 870 slideWindow initialState false >>= (fun () -> 871 flushTokenQueue () >>= (fun () -> 872 logMeasures pb; 873 Trace.showTimer timer; 874 return ())) 875 876 end