remote.ml (91859B)
1 (* Unison file synchronizer: src/remote.ml *) 2 (* Copyright 1999-2020, Benjamin C. Pierce 3 4 This program is free software: you can redistribute it and/or modify 5 it under the terms of the GNU General Public License as published by 6 the Free Software Foundation, either version 3 of the License, or 7 (at your option) any later version. 8 9 This program is distributed in the hope that it will be useful, 10 but WITHOUT ANY WARRANTY; without even the implied warranty of 11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License 15 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 *) 17 18 let (>>=) = Lwt.bind 19 20 let debug = Trace.debug "remote" 21 let debugV = Trace.debug "remote_emit+" 22 let debugE = Trace.debug "remote+" 23 let debugT = Trace.debug "remote+" 24 25 (* BCP: The previous definitions of the last two were like this: 26 let debugE = Trace.debug "remote_emit" 27 let debugT = Trace.debug "thread" 28 But that resulted in huge amounts of output from '-debug all'. 29 *) 30 31 let _ = 32 if Sys.unix || Sys.cygwin then 33 ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore) 34 35 (* 36 Flow-control mechanism (only active under Windows). 37 Only one side is allowed to send messages at any given time. 38 Once it has finished sending messages, a special message is sent 39 meaning that the destination is now allowed to send messages. 40 41 Threads behave in a very controlled way: they only perform possibly 42 blocking I/Os through the remote module, and never call 43 Lwt_unix.yield. This mean that when one side gives up its right to 44 write, we know that no matter how long we wait, it will not have 45 anything to write. This ensures that there is no deadlock. 46 A more robust protocol would be to give up write permission 47 whenever idle (not just after having sent at least one message). 48 But then, there is the risk that the two sides exchange spurious 49 messages. 50 *) 51 52 (****) 53 54 let intSize = 5 55 56 let intHash x = ((x * 791538121) lsr 23 + 17) land 255 57 58 let encodeInt m = 59 let int_buf = Bytearray.create intSize in 60 int_buf.{0} <- Char.chr ( m land 0xff); 61 int_buf.{1} <- Char.chr ((m lsr 8) land 0xff); 62 int_buf.{2} <- Char.chr ((m lsr 16) land 0xff); 63 int_buf.{3} <- Char.chr ((m lsr 24) land 0xff); 64 int_buf.{4} <- Char.chr (intHash m); 65 (int_buf, 0, intSize) 66 67 let decodeInt int_buf i = 68 let b0 = Char.code (int_buf.{i + 0}) in 69 let b1 = Char.code (int_buf.{i + 1}) in 70 let b2 = Char.code (int_buf.{i + 2}) in 71 let b3 = Char.code (int_buf.{i + 3}) in 72 let m = (b3 lsl 24) lor (b2 lsl 16) lor (b1 lsl 8) lor b0 in 73 if Char.code (int_buf.{i + 4}) <> intHash m then 74 raise (Util.Fatal 75 "Protocol error: corrupted message received;\n\ 76 if it happens to you in a repeatable way, \n\ 77 please post a report on the unison-users mailing list."); 78 m 79 80 (*************************************************************************) 81 (* LOW-LEVEL IO *) 82 (*************************************************************************) 83 84 let ioCleanups = ref [] 85 86 let registerIOClose io f = 87 ioCleanups := (io, f) :: !ioCleanups 88 89 let lostConnectionHandler ch = 90 let aux ((i, o), f) = 91 if i = ch || o = ch then begin 92 f (); 93 false (* Each handler is run only once *) 94 end else 95 true 96 in 97 ioCleanups := Safelist.filter aux !ioCleanups 98 99 let lostConnection ch = 100 begin try lostConnectionHandler ch with _ -> () end; 101 Lwt.fail (Util.Fatal "Lost connection with the server") 102 103 let catchIoErrors ch th = 104 Lwt.catch th 105 (fun e -> 106 match e with 107 Unix.Unix_error(Unix.ECONNRESET, _, _) 108 | Unix.Unix_error(Unix.EPIPE, _, _) 109 | Unix.Unix_error(Unix.ETIMEDOUT, _, _) 110 | Unix.Unix_error(Unix.EACCES, _, _) (* Linux firewall *) 111 (* Windows may also return the following errors... *) 112 | Unix.Unix_error(Unix.EINVAL, _, _) (* ... and Linux firewall *) 113 | Unix.Unix_error(Unix.EUNKNOWNERR (-64), _, _) 114 (* ERROR_NETNAME_DELETED *) 115 | Unix.Unix_error(Unix.EUNKNOWNERR (-233), _, _) 116 (* ERROR_PIPE_NOT_CONNECTED *) 117 | Unix.Unix_error(Unix.EUNKNOWNERR (-1236), _, _) 118 (* ERROR_CONNECTION_ABORTED *) 119 (* The following may indicate a programming error, but even if that's 120 the case, let's be graceful about it rather than crash the server. 121 (Seen happening on the socket server when the connection is broken 122 before the client has read the updates sent by server.) *) 123 | Unix.Unix_error(Unix.EBADF, _, _) 124 (* The following errors _may_ be temporary but we don't know if 125 they are or for how long they will persist. We also don't have 126 a way to retry and there is no guarantee that the socket remains 127 in a usable state, so treat all these as permanent failures 128 breaking the connection. *) 129 | Unix.Unix_error(Unix.ENETUNREACH, _, _) 130 | Unix.Unix_error(Unix.EHOSTUNREACH, _, _) 131 | Unix.Unix_error(Unix.ENETDOWN, _, _) 132 | Unix.Unix_error(Unix.EHOSTDOWN, _, _) 133 | Unix.Unix_error(Unix.ENETRESET, _, _) -> 134 (* Client has closed its end of the connection *) 135 lostConnection ch 136 | _ -> 137 Lwt.fail e) 138 139 (****) 140 141 let receivedBytes = ref 0. 142 let emittedBytes = ref 0. 143 144 (****) 145 146 (* I/O buffers *) 147 148 type ioBuffer = 149 { channel : Lwt_unix.file_descr; 150 buffer : bytes; 151 mutable length : int; 152 mutable opened : bool } 153 154 let bufferSize = 16384 155 (* No point in making this larger, as the Ocaml Unix library uses a 156 buffer of this size *) 157 158 let makeBuffer ch = 159 { channel = ch; buffer = Bytes.create bufferSize; 160 length = 0; opened = true } 161 162 (****) 163 164 (* Low-level inputs *) 165 166 let fillInputBuffer conn = 167 assert (conn.length = 0); 168 catchIoErrors conn.channel 169 (fun () -> 170 Lwt_unix.read conn.channel conn.buffer 0 bufferSize >>= fun len -> 171 debugV (fun() -> 172 if len = 0 then 173 Util.msg "grab: EOF\n" 174 else 175 Util.msg "grab: %s\n" 176 (String.escaped (Bytes.sub_string conn.buffer 0 len))); 177 if len = 0 then 178 lostConnection conn.channel 179 else begin 180 receivedBytes := !receivedBytes +. float len; 181 conn.length <- len; 182 Lwt.return () 183 end) 184 185 let rec grabRec conn s pos len = 186 if conn.length = 0 then begin 187 fillInputBuffer conn >>= fun () -> 188 grabRec conn s pos len 189 end else begin 190 let l = min (len - pos) conn.length in 191 Bytearray.blit_from_bytes conn.buffer 0 s pos l; 192 conn.length <- conn.length - l; 193 if conn.length > 0 then 194 Bytes.blit conn.buffer l conn.buffer 0 conn.length; 195 if pos + l < len then 196 grabRec conn s (pos + l) len 197 else 198 Lwt.return () 199 end 200 201 let grab conn s len = 202 assert (len > 0); 203 assert (Bytearray.length s <= len); 204 grabRec conn s 0 len 205 206 let peekWithoutBlocking conn = 207 Bytes.sub conn.buffer 0 conn.length 208 209 let peekWithBlocking conn = 210 (if conn.length = 0 then begin 211 fillInputBuffer conn 212 end else 213 Lwt.return ()) >>= fun () -> 214 Lwt.return (peekWithoutBlocking conn) 215 216 (****) 217 218 (* Low-level outputs *) 219 220 let rec sendOutput conn = 221 catchIoErrors conn.channel 222 (fun () -> 223 begin if conn.opened then 224 Lwt_unix.write conn.channel conn.buffer 0 conn.length 225 else 226 Lwt.return conn.length 227 end >>= fun len -> 228 debugV (fun() -> 229 Util.msg "dump: %s\n" 230 (String.escaped (Bytes.sub_string conn.buffer 0 len))); 231 emittedBytes := !emittedBytes +. float len; 232 conn.length <- conn.length - len; 233 if conn.length > 0 then 234 Bytes.blit 235 conn.buffer len conn.buffer 0 conn.length; 236 Lwt.return ()) 237 238 let rec fillBuffer2 conn s pos len = 239 if conn.length = bufferSize then 240 sendOutput conn >>= fun () -> 241 fillBuffer2 conn s pos len 242 else begin 243 let l = min (len - pos) (bufferSize - conn.length) in 244 Bytearray.blit_to_bytes s pos conn.buffer conn.length l; 245 conn.length <- conn.length + l; 246 if pos + l < len then 247 fillBuffer2 conn s (pos + l) len 248 else 249 Lwt.return () 250 end 251 252 let rec fillBuffer conn l = 253 match l with 254 (s, pos, len) :: rem -> 255 assert (pos >= 0); 256 assert (len >= 0); 257 assert (pos <= Bytearray.length s - len); 258 fillBuffer2 conn s pos len >>= fun () -> 259 fillBuffer conn rem 260 | [] -> 261 Lwt.return () 262 263 let rec flushBuffer conn = 264 if conn.length > 0 then 265 sendOutput conn >>= fun () -> 266 flushBuffer conn 267 else 268 Lwt.return () 269 270 (****) 271 272 (* Output scheduling *) 273 274 type kind = Normal | Idle | Last | Urgent 275 276 type outputQueue = 277 { mutable available : bool; 278 mutable canWrite : bool; 279 mutable flowControl : bool; 280 writes : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t; 281 urgentWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t; 282 idleWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t; 283 flush : outputQueue -> unit Lwt.t } 284 285 let rec performOutputRec q (kind, action, res) = 286 action () >>= fun () -> 287 Lwt.wakeup res (); 288 popOutputQueues q 289 290 and popOutputQueues q = 291 if not (Queue.is_empty q.urgentWrites) then 292 performOutputRec q (Queue.take q.urgentWrites) 293 else if not (Queue.is_empty q.writes) && q.canWrite then 294 performOutputRec q (Queue.take q.writes) 295 else if not (Queue.is_empty q.idleWrites) && q.canWrite then 296 performOutputRec q (Queue.take q.idleWrites) 297 else begin 298 q.available <- true; 299 (* Flush asynchronously the output *) 300 Lwt.ignore_result (q.flush q); 301 Lwt.return () 302 end 303 304 (* Perform an output action in an atomic way *) 305 let performOutput q kind action = 306 if q.available && (kind = Urgent || q.canWrite) then begin 307 q.available <- false; 308 performOutputRec q (kind, action, Lwt.wait ()) 309 end else begin 310 let res = Lwt.wait () in 311 Queue.add (kind, action, res) 312 (match kind with 313 Urgent -> q.urgentWrites 314 | Normal -> q.writes 315 | Idle -> q.idleWrites 316 | Last -> assert false); 317 res 318 end 319 320 let allowWrites q = 321 assert (not q.canWrite); 322 q.canWrite <- true; 323 q.available <- false; 324 (* We yield to let the receiving thread restart and to let some time 325 to the requests to be processed *) 326 Lwt.ignore_result (Lwt_unix.yield () >>= fun () -> popOutputQueues q) 327 328 let disableFlowControl q = 329 q.flowControl <- false; 330 if not q.canWrite then allowWrites q 331 332 let outputQueueIsEmpty q = q.available 333 334 (* Setup IO with flow control initially disabled, to do the RPC version 335 handshake. Flow control is part of RPC protocol and must be enabled 336 only after RPC version handshake is complete. *) 337 let makeOutputQueue isServer flush = 338 { available = true; canWrite = true; flowControl = false; 339 writes = Queue.create (); urgentWrites = Queue.create (); 340 idleWrites = Queue.create (); 341 flush = flush } 342 343 (****) 344 345 (* IMPORTANT: the RPC version must be increased when the RPC mechanism itself 346 changes in a breaking way. Changes on the API level (functions and data 347 types) normally do not cause a breaking change at the RPC level. *) 348 (* Version 0 is special in that it must not be listed as a supported version. 349 It is used for 2.51-compatibility mode and is never negotiated. *) 350 (* Supported RPC versions should be ordered from newest to oldest. *) 351 let rpcSupportedVersions = [1] 352 let rpcDefaultVersion = Safelist.hd rpcSupportedVersions 353 354 let rpcSupportedVersionStr = 355 String.concat ", " 356 (Safelist.map (fun v -> "\"" ^ string_of_int v ^ "\"") 357 rpcSupportedVersions) 358 359 let rpcSupportedVersionStrHdr = 360 String.concat " " 361 (Safelist.map (fun v -> string_of_int v) 362 rpcSupportedVersions) 363 364 (* FIX: Added in 2021. Should be removed after a couple of years. *) 365 let rpcServerCmdlineOverride = "__new-rpc-mode" 366 367 (****) 368 369 type connection = 370 { mutable version : int; 371 inputBuffer : ioBuffer; 372 outputBuffer : ioBuffer; 373 outputQueue : outputQueue } 374 375 let maybeFlush pendingFlush q buf = 376 (* We return immediately if a flush is already scheduled, or if the 377 output buffer is already empty. *) 378 (* If we are doing flow control and we can write, we need to send 379 a write token even when the buffer is empty. *) 380 if 381 !pendingFlush || (buf.length = 0 && not (q.flowControl && q.canWrite)) 382 then 383 Lwt.return () 384 else begin 385 pendingFlush := true; 386 (* Wait a bit, in case there are some new requests being processed *) 387 Lwt_unix.yield () >>= fun () -> 388 pendingFlush := false; 389 (* If there are other writes scheduled, we do not flush yet *) 390 if outputQueueIsEmpty q then begin 391 performOutput q Last 392 (fun () -> 393 if q.flowControl then begin 394 debugE (fun() -> Util.msg "Sending write token\n"); 395 q.canWrite <- false; 396 fillBuffer buf [encodeInt 0] >>= fun () -> 397 flushBuffer buf 398 end else 399 flushBuffer buf) >>= fun () -> 400 Lwt.return () 401 end else 402 Lwt.return () 403 end 404 405 let makeConnection isServer inCh outCh = 406 let pendingFlush = ref false in 407 let outputBuffer = makeBuffer outCh in 408 { version = rpcDefaultVersion; 409 inputBuffer = makeBuffer inCh; 410 outputBuffer = outputBuffer; 411 outputQueue = 412 makeOutputQueue isServer 413 (fun q -> maybeFlush pendingFlush q outputBuffer) } 414 415 let closeConnection conn = 416 begin try Lwt_unix.close conn.inputBuffer.channel with Unix.Unix_error _ -> () end; 417 begin try Lwt_unix.close conn.outputBuffer.channel with Unix.Unix_error _ -> () end; 418 conn.outputBuffer.opened <- false 419 420 let connectionIO conn = 421 (conn.inputBuffer.channel, conn.outputBuffer.channel) 422 423 let setConnectionVersion conn ver = 424 conn.version <- ver 425 426 let connectionVersion conn = conn.version 427 428 let connEq conn conn' = 429 conn.inputBuffer.channel = conn'.inputBuffer.channel 430 && conn.outputBuffer.channel = conn'.outputBuffer.channel 431 432 let connNeq conn conn' = not (connEq conn conn') 433 434 (* Send message [l] *) 435 let dump conn l = 436 performOutput 437 conn.outputQueue Normal (fun () -> fillBuffer conn.outputBuffer l) 438 439 (* Send message [l] when idle *) 440 let dumpIdle conn l = 441 performOutput 442 conn.outputQueue Idle (fun () -> fillBuffer conn.outputBuffer l) 443 444 (* Send message [l], even if write are disabled. This is used for 445 aborting rapidly a stream. This works as long as only one small 446 message is written at a time (the write will succeed as the pipe 447 will not be full) *) 448 let dumpUrgent conn l = 449 performOutput conn.outputQueue Urgent 450 (fun () -> 451 fillBuffer conn.outputBuffer l >>= fun () -> 452 flushBuffer conn.outputBuffer) 453 454 let enableFlowControl conn isServer = 455 let rec waitDrain () = 456 if not isServer && conn.outputBuffer.length > 0 then 457 Lwt_unix.yield () >>= waitDrain 458 else 459 Lwt.return () 460 in 461 let q = conn.outputQueue in 462 q.available <- false; 463 waitDrain () >>= fun () -> 464 q.flowControl <- true; 465 q.canWrite <- isServer; 466 if q.canWrite then 467 popOutputQueues q >>= Lwt_unix.yield >>= fun () -> 468 Lwt.return () 469 else 470 Lwt.return () 471 472 (****) 473 474 let connectionCheck = ref None 475 476 let checkConnection ioServer = 477 connectionCheck := Some ioServer; 478 (* Poke on the socket to trigger an error if connection has been lost. *) 479 Lwt_unix.run ( 480 (if Sys.win32 then Lwt.return 0 else 481 Lwt_unix.read ioServer.inputBuffer.channel ioServer.inputBuffer.buffer 0 0) 482 (* Try to make sure connection cleanup, if necessary, has finished 483 before returning. 484 Since there is no way to reliably detect when other threads have 485 finished, we just yield a bit (the same comments apply as in 486 commandLoop). *) 487 >>= fun _ -> 488 let rec wait n = 489 if n = 0 then Lwt.return () else begin 490 Lwt_unix.yield () >>= fun () -> 491 wait (n - 1) 492 end 493 in 494 wait 10); 495 connectionCheck := None 496 497 let isConnectionCheck conn = 498 match !connectionCheck with 499 | None -> false 500 | Some conn' -> connEq conn conn' 501 502 (* Due to [Common.root] currently excluding important details present in 503 [Clroot.clroot], there is a 1:N mapping between a [root] and a [clroot]. 504 For example, a [clroot] pointing to the same host but a different 505 protocol or user or port will be mapped to the same [root] as long as 506 the root fspaths are the same. 507 It is currently (Oct 2022) not seen as a critical issue to fix. 508 The code previously used to index connections just by the canonical host 509 name, ignoring all other details, including the root fspath. That code 510 was in place for over 20 years and did not seem to cause any issues. 511 The current code is safer than the previous code... *) 512 module ClientConn = struct 513 type t = 514 { clroot : Clroot.clroot; 515 root : Common.root; 516 conn : connection } 517 (* Never do polymorphic comparisons with [connection]! *) 518 519 (* The replica path of a root is not a relevant detail for a connection; 520 on the contrary, it must not impact the connection. Instead of creating 521 a new type for connection tracking, for now, just exclude the path in 522 clroot comparisons. Eventually, this could/should be solved in a better 523 way: see the comment above about [Common.root] not being sufficiently 524 detailed. *) 525 let clrootEq clroot clroot' = 526 match clroot, clroot' with 527 | Clroot.ConnectByShell (shell, host, user, port, _), 528 Clroot.ConnectByShell (shell', host', user', port', _) -> 529 shell = shell' && host = host' && user = user' && port = port' 530 | ConnectBySocket (host, port, _), 531 ConnectBySocket (host', port', _) -> 532 host = host' && port = port' 533 | ConnectByShell _, _ | _, ConnectByShell _ -> false 534 | ConnectBySocket _, _ | _, ConnectBySocket _ -> false 535 | ConnectLocal _, ConnectLocal _ -> assert false 536 537 let clrootNeq clroot clroot' = not (clrootEq clroot clroot') 538 539 let connections = ref [] 540 541 let findByClroot clroot = 542 Safelist.find (fun x -> clrootEq x.clroot clroot) !connections 543 544 let findByRoot root = Safelist.find (fun x -> x.root = root) !connections 545 546 let register clroot root conn = 547 connections := { clroot; root; conn } :: 548 Safelist.filter (fun x -> clrootNeq x.clroot clroot) !connections 549 550 let unregister conn = 551 connections := Safelist.filter (fun x -> connNeq x.conn conn) !connections 552 553 let ofRoot root = 554 try (findByRoot root).conn with 555 | Not_found -> raise (Util.Fatal "No connection with the server") 556 557 let ofRootOpt root = 558 try Some (findByRoot root).conn with 559 | Not_found -> None 560 561 let canonRootOfClroot clroot = 562 try Some (findByClroot clroot).root with 563 | Not_found -> None 564 565 let withConncheck find = 566 try 567 let conn = (find ()).conn in 568 begin try 569 checkConnection conn 570 with 571 | Unix.Unix_error (EBADF, _, _) -> (* Already closed *) 572 unregister conn 573 (* (All or most?) other exceptions should be caught by receiving and 574 sending threads. If this does not happen (it also depends on the 575 implementation of [Lwt_unix.run]) then trigger the cleanup here as 576 the last resort. *) 577 | Unix.Unix_error _ -> 578 try 579 lostConnection (fst (connectionIO conn)) |> ignore; 580 unregister conn 581 with e -> begin 582 unregister conn; 583 raise e 584 end 585 end; 586 (* [find] _must_ be duplicated after [checkConnection]! *) 587 Some (find ()).conn 588 with Not_found -> 589 None 590 591 let ofRootConncheck root = 592 withConncheck (fun () -> findByRoot root) 593 594 let ofClrootConncheck clroot = 595 withConncheck (fun () -> findByClroot clroot) 596 597 end (* module ClientConn *) 598 599 let connectionOfRoot root = ClientConn.ofRoot root 600 601 (****) 602 603 let atCloseHandlers = ref [] 604 605 let at_conn_close ?(only_server = false) f = 606 atCloseHandlers := (only_server, f) :: !atCloseHandlers 607 608 let runConnCloseHandlers isServer = 609 Safelist.iter (fun (only_server, f) -> 610 if not only_server || isServer then f ()) !atCloseHandlers 611 612 let atConnCloseHandlers = ref [] 613 614 let at_conn_close' conn f = 615 atConnCloseHandlers := (conn, f) :: !atConnCloseHandlers 616 617 let runConnCloseHandlers' conn = 618 atConnCloseHandlers := Safelist.filter (fun (c, f) -> 619 if connEq c conn then (f (); false) else true) !atConnCloseHandlers 620 621 let clientCloseCleanup () = 622 runConnCloseHandlers false 623 624 let clientConnClose conn = 625 closeConnection conn; 626 ClientConn.unregister conn; 627 runConnCloseHandlers' conn; 628 clientCloseCleanup () 629 630 let registerConnCleanup conn cleanup = 631 registerIOClose (connectionIO conn) (fun () -> clientConnClose conn); 632 match cleanup with 633 | None -> () 634 | Some f -> at_conn_close' conn f 635 636 let clientCloseRootConnection = function 637 | (Common.Local, _) -> clientCloseCleanup () 638 | (Common.Remote _, _) as root -> 639 begin match ClientConn.ofRootOpt root with 640 | Some conn -> clientConnClose conn 641 | None -> () 642 end 643 644 (****) 645 646 (* Implemented as a record to avoid polluting [Remote] namespace. If 647 the number and complexity of functions grows in future then it's 648 probably a good idea to extract this code into a separate module. *) 649 type ('a, 'b, 'c) resourceC = 650 { register : 'a -> 'a; release : 'a -> 'b; release_noerr : 'a -> 'c } 651 let resourceWithConnCleanup close close_noerr = 652 let h = Hashtbl.create 17 in 653 let closeAll () = 654 Hashtbl.iter (fun x _ -> ignore (close_noerr x)) h; 655 Hashtbl.clear h 656 in 657 at_conn_close closeAll; 658 let register x = Hashtbl.add h x true; x in 659 let release x = Hashtbl.remove h x; close x in 660 let release_noerr x = Hashtbl.remove h x; close_noerr x in 661 { register; release; release_noerr } 662 663 let lwtRegionWithConnCleanup sz = 664 let reg = ref (Lwt_util.make_region sz) in 665 let resetReg () = 666 Lwt_util.purge_region !reg; 667 (* The remaining threads should be collected by GC *) 668 reg := Lwt_util.make_region sz 669 in 670 at_conn_close resetReg; 671 reg 672 673 (****) 674 675 (* XXX *) 676 module Thread = struct 677 678 let unwindProtect f cleanup = 679 Lwt.catch f 680 (fun e -> 681 match e with 682 Util.Transient err | Util.Fatal err -> 683 debugT 684 (fun () -> 685 Util.msg 686 "Exception caught by Thread.unwindProtect: %s\n" err); 687 Lwt.catch (fun () -> cleanup e) (fun e' -> 688 Util.encodeException "Thread.unwindProtect" `Fatal e') 689 >>= (fun () -> 690 Lwt.fail e) 691 | _ -> 692 Lwt.fail e) 693 694 end 695 696 (*****************************************************************************) 697 (* MARSHALING *) 698 (*****************************************************************************) 699 700 type tag = Bytearray.t 701 702 type 'a marshalFunction = connection -> 703 'a -> (Bytearray.t * int * int) list -> (Bytearray.t * int * int) list 704 type 'a unmarshalFunction = connection -> Bytearray.t -> 'a 705 type 'a marshalingFunctions = 'a marshalFunction * 'a unmarshalFunction 706 707 type 'a convV0Fun = 708 V0 : ('a -> 'compat) * ('compat -> 'a) -> 'a convV0Fun 709 710 external id : 'a -> 'a = "%identity" 711 let convV0_id = V0 (id, id) 712 let convV0_id_pair = convV0_id, convV0_id 713 714 let makeConvV0FunArg compat_to compat_from = 715 (V0 (compat_to, compat_from)), convV0_id 716 let makeConvV0FunRet compat_to compat_from = 717 convV0_id, (V0 (compat_to, compat_from)) 718 let makeConvV0Funs compat_to compat_from compat_to2 compat_from2 = 719 (V0 (compat_to, compat_from)), (V0 (compat_to2, compat_from2)) 720 721 let registeredSet = ref Util.StringSet.empty 722 723 let rec first_chars len msg = 724 match msg with 725 [] -> 726 "" 727 | (s, p, l) :: rem -> 728 if l < len then 729 Bytearray.sub s p l ^ first_chars (len - l) rem 730 else 731 Bytearray.sub s p len 732 733 let safeMarshal marshalPayload tag data rem = 734 let (rem', length) = marshalPayload data rem in 735 let l = Bytearray.length tag in 736 debugE (fun() -> 737 let start = first_chars (min length 10) rem' in 738 let start = if length > 10 then start ^ "..." else start in 739 let start = String.escaped start in 740 Util.msg "send [%s] '%s' %d bytes\n" 741 (Bytearray.to_string tag) start length); 742 let len = l + length in 743 if (len lsr 31) lsr 1 <> 0 then (* [encodeInt] can only encode 32 bits *) 744 raise (Util.Fatal 745 "Protocol error: message data too big. This may be a bug or it\n\ 746 may be that your replicas are huge and the amount of updates\n\ 747 can't be handled by the current protocol implementation. If you\n\ 748 believe it is a bug then please consider reporting it.\n\ 749 Otherwise, try reducing the amount of updates by syncing the\n\ 750 replicas in smaller steps (using the \"path\" preference, for\n\ 751 example). You may have to do this for the initial sync only.") 752 else 753 (encodeInt len :: (tag, 0, l) :: rem') 754 755 let safeUnmarshal unmarshalPayload tag buf = 756 let taglength = Bytearray.length tag in 757 if Bytearray.prefix tag buf 0 then 758 unmarshalPayload buf taglength 759 else 760 let identifier = 761 String.escaped 762 (Bytearray.sub buf 0 (min taglength (Bytearray.length buf))) in 763 raise (Util.Fatal 764 (Printf.sprintf "[safeUnmarshal] expected '%s' but got '%s'" 765 (String.escaped (Bytearray.to_string tag)) identifier)) 766 767 let registerTag string = 768 if Util.StringSet.mem string !registeredSet then 769 raise (Util.Fatal (Printf.sprintf "tag %s is already registered" string)) 770 else 771 registeredSet := Util.StringSet.add string !registeredSet; 772 Bytearray.of_string string 773 774 let marshalV0 (V0 (to251, _)) data rem = 775 let s = Bytearray.marshal (to251 data) [Marshal.No_sharing] in 776 let l = Bytearray.length s in 777 ((s, 0, l) :: rem, l) 778 779 let unmarshalV0 (V0 (_, from251)) buf pos = 780 try from251 (Bytearray.unmarshal buf pos) 781 with Failure s -> raise (Util.Fatal (Printf.sprintf 782 "Fatal error during unmarshaling (%s), 783 possibly because client and server have been compiled with different \ 784 versions of the OCaml compiler." s)) 785 786 let marshalV1 m data rem = 787 let s = Umarshal.marshal_to_bytearray m data in 788 let l = Bytearray.length s in 789 ((s, 0, l) :: rem, l) 790 791 let unmarshalV1 m buf pos = 792 try Umarshal.unmarshal_from_bytearray m buf pos 793 with Failure s | Umarshal.Error s -> raise (Util.Fatal (Printf.sprintf 794 "Fatal error during unmarshaling (%s)" s)) 795 796 let defaultMarshalingFunctions convV0 m = 797 (fun conn -> if conn.version = 0 then marshalV0 convV0 else marshalV1 m), 798 (fun conn -> if conn.version = 0 then unmarshalV0 convV0 else unmarshalV1 m) 799 800 let makeMarshalingFunctions payloadMarshalingFunctions string = 801 let (marshalPayload, unmarshalPayload) = payloadMarshalingFunctions in 802 let tag = registerTag string in 803 let marshal conn (data : 'a) rem = safeMarshal (marshalPayload conn) tag data rem in 804 let unmarshal conn buf = (safeUnmarshal (unmarshalPayload conn) tag buf : 'a) in 805 (marshal, unmarshal) 806 807 (*****************************************************************************) 808 (* SERVER SETUP *) 809 (*****************************************************************************) 810 811 (* BCPFIX: Now that we've beefed up the clroot data structure, shouldn't 812 these be part of it too? *) 813 let sshCmd = 814 Prefs.createString "sshcmd" "ssh" 815 ~category:(`Advanced `Remote) 816 ("path to the ssh executable") 817 ("This preference can be used to explicitly set the name of the " 818 ^ "ssh executable (e.g., giving a full path name), if necessary.") 819 820 let sshargs = 821 Prefs.createString "sshargs" "" 822 ~category:(`Advanced `Remote) 823 "other arguments (if any) for remote shell command" 824 ("The string value of this preference will be passed as additional " 825 ^ "arguments (besides the host name and the name of the Unison " 826 ^ "executable on the remote system) to the \\verb|ssh| " 827 ^ "command used to invoke the remote server. The backslash is an " 828 ^ "escape character." 829 ) 830 831 (* rsh prefs removed since 2.52 *) 832 let () = Prefs.markRemoved "rshcmd" 833 let () = Prefs.markRemoved "rshargs" 834 835 let serverCmd = 836 Prefs.createString "servercmd" "" 837 ~category:(`Advanced `Remote) 838 ("name of " ^ Uutil.myName ^ " executable on remote server") 839 ("This preference can be used to explicitly set the name of the " 840 ^ "Unison executable on the remote server (e.g., giving a full " 841 ^ "path name), if necessary.") 842 843 let addversionno = 844 Prefs.createBool "addversionno" false 845 ~category:(`Advanced `Remote) 846 ("add version number to name of " ^ Uutil.myName ^ " on server") 847 ("When this flag is set to {\\tt true}, Unison " 848 ^ "will use \\texttt{unison-\\ARG{currentmajorversionnumber}} instead of " 849 ^ "just \\verb|unison| as the remote server command (note that the minor " 850 ^ "version number is dropped -- e.g., unison-2.51). This allows " 851 ^ "multiple binaries for different versions of unison to coexist " 852 ^ "conveniently on the same server: whichever version is run " 853 ^ "on the client, the same version will be selected on the server.") 854 855 (********************************************************************** 856 CLIENT/SERVER PROTOCOLS 857 **********************************************************************) 858 859 (* 860 Each protocol has a name, a client side, and a server side. 861 862 The server remembers the server side of each protocol in a table 863 indexed by protocol name. The function of the server is to wait for 864 the client to invoke a protocol, and carry out the appropriate server 865 side. 866 867 Protocols are invoked on the client with arguments for the server side. 868 The result of the protocol is the result of the server side. In types, 869 870 serverSide : 'a -> 'b 871 872 That is, the server side takes arguments of type 'a from the client, 873 and returns a result of type 'b. 874 875 A protocol is started by the client sending a Request packet and then a 876 packet containing the protocol name to the server. The server looks 877 up the server side of the protocol in its table. 878 879 Next, the client sends a packet containing marshaled arguments for the 880 server side. 881 882 The server unmarshals the arguments and invokes the server side with 883 the arguments from the client. 884 885 When the server side completes it gives a result. The server marshals 886 the result and sends it to the client. (Instead of a result, the 887 server may also send back either a Transient or a Fatal error packet). 888 Finally, the client can receive the result packet from the server and 889 unmarshal it. 890 891 The protocol is fully symmetric, so the server may send a Request 892 packet to invoke a function remotely on the client. In this case, the 893 two switch roles.) 894 *) 895 896 let receivePacket conn = 897 (* Get the length of the packet *) 898 let int_buf = Bytearray.create intSize in 899 grab conn.inputBuffer int_buf intSize >>= (fun () -> 900 let length = decodeInt int_buf 0 in 901 assert (length >= 0); 902 (* Get packet *) 903 let buf = Bytearray.create length in 904 grab conn.inputBuffer buf length >>= (fun () -> 905 (debugE (fun () -> 906 let start = 907 if length > 10 then (Bytearray.sub buf 0 10) ^ "..." 908 else Bytearray.sub buf 0 length in 909 let start = String.escaped start in 910 Util.msg "receive '%s' %d bytes\n" start length); 911 Lwt.return buf))) 912 913 type servercmd = 914 connection -> Bytearray.t -> 915 ((Bytearray.t * int * int) list -> (Bytearray.t * int * int) list) Lwt.t 916 let serverCmds = ref (Util.StringMap.empty : servercmd Util.StringMap.t) 917 918 type serverstream = 919 connection -> Bytearray.t -> unit 920 let serverStreams = ref (Util.StringMap.empty : serverstream Util.StringMap.t) 921 922 type header = 923 NormalResult 924 | TransientExn of string 925 | FatalExn of string 926 | Request of string 927 | Stream of string 928 | StreamAbort 929 930 let mheader = Umarshal.(sum6 unit string string string string unit 931 (function 932 | NormalResult -> I61 () 933 | TransientExn a -> I62 a 934 | FatalExn a -> I63 a 935 | Request a -> I64 a 936 | Stream a -> I65 a 937 | StreamAbort -> I66 ()) 938 (function 939 | I61 () -> NormalResult 940 | I62 a -> TransientExn a 941 | I63 a -> FatalExn a 942 | I64 a -> Request a 943 | I65 a -> Stream a 944 | I66 () -> StreamAbort)) 945 946 let ((marshalHeader, unmarshalHeader) : header marshalingFunctions) = 947 makeMarshalingFunctions (defaultMarshalingFunctions convV0_id mheader) "rsp" 948 949 let processRequest conn id cmdName buf = 950 let cmd = 951 try Util.StringMap.find cmdName !serverCmds 952 with Not_found -> raise (Util.Fatal (cmdName ^ " not registered!")) 953 in 954 Lwt.try_bind (fun () -> cmd conn buf) 955 (fun marshal -> 956 debugE (fun () -> Util.msg "Sending result (id: %d)\n" (decodeInt id 0)); 957 dump conn ((id, 0, intSize) :: marshalHeader conn NormalResult (marshal []))) 958 (function 959 Util.Transient s -> 960 debugE (fun () -> 961 Util.msg "Sending transient exception (id: %d)\n" (decodeInt id 0)); 962 dump conn ((id, 0, intSize) :: marshalHeader conn (TransientExn s) []) 963 | Util.Fatal s -> 964 debugE (fun () -> 965 Util.msg "Sending fatal exception (id: %d)\n" (decodeInt id 0)); 966 dump conn ((id, 0, intSize) :: marshalHeader conn (FatalExn s) []) 967 | e -> 968 Lwt.fail e) 969 (* With the current RPC protocol it is not possible to recover from situations 970 where an RPC packet is not completely transmitted (due to an interrupted 971 write syscall). The other side will hang forever, waiting for the complete 972 packet to arrive. New packets (here, transmitting the exception) can't be 973 sent because the receiver can't read them until the previous packet is 974 complete. (Best case, the server quits with a protocol error.) 975 976 Therefore, it is important that exceptions that can interrupt syscalls 977 (for example, Sys.Break (Ctrl-C)) are never wrapped into Util.Transient or 978 Util.Fatal, unless the connection is already known to be broken. Likewise, 979 other exception handlers than the one just above must avoid writing out 980 additional data to the RPC connection. *) 981 982 let streamAbortedSrc = ref 0 983 let streamAbortedDst = ref false 984 985 let streamError = Hashtbl.create 7 986 987 let resetStreamErroState () = 988 streamAbortedSrc := 0; 989 streamAbortedDst := false; 990 Hashtbl.reset streamError 991 let () = at_conn_close resetStreamErroState 992 993 let abortStream conn id = 994 if not !streamAbortedDst then begin 995 streamAbortedDst := true; 996 let request = encodeInt id :: marshalHeader conn StreamAbort [] in 997 dumpUrgent conn request 998 end else 999 Lwt.return () 1000 1001 let processStream conn id cmdName buf = 1002 let id = decodeInt id 0 in 1003 if Hashtbl.mem streamError id then 1004 abortStream conn id 1005 else begin 1006 begin try 1007 let cmd = 1008 try Util.StringMap.find cmdName !serverStreams 1009 with Not_found -> raise (Util.Fatal (cmdName ^ " not registered!")) 1010 in 1011 cmd conn buf; 1012 Lwt.return () 1013 with e -> 1014 Hashtbl.add streamError id e; 1015 abortStream conn id 1016 end 1017 end 1018 1019 (* Message ids *) 1020 type msgId = int 1021 module MsgIdMap = Map.Make (struct type t = msgId let compare = compare end) 1022 (* An integer just a little smaller than the maximum representable in 1023 30 bits *) 1024 let hugeint = 1000000000 1025 let ids = ref 1 1026 let newMsgId () = incr ids; if !ids = hugeint then ids := 2; !ids 1027 1028 (* Threads waiting for a response from the other side *) 1029 let receivers = ref MsgIdMap.empty 1030 let resetReceivers () = 1031 receivers := MsgIdMap.empty 1032 let () = at_conn_close resetReceivers 1033 1034 let find_receiver id = 1035 let thr = MsgIdMap.find id !receivers in 1036 receivers := MsgIdMap.remove id !receivers; 1037 thr 1038 1039 (* Receiving thread: read a message and dispatch it to the right 1040 thread or create a new thread to process requests. *) 1041 let rec receive conn = 1042 begin 1043 debugE (fun () -> Util.msg "Waiting for next message\n"); 1044 (* Get the message ID *) 1045 let id = Bytearray.create intSize in 1046 grab conn.inputBuffer id intSize >>= (fun () -> 1047 let num_id = decodeInt id 0 in 1048 if num_id = 0 then begin 1049 debugE (fun () -> Util.msg "Received the write permission\n"); 1050 allowWrites conn.outputQueue; 1051 receive conn 1052 end else begin 1053 debugE 1054 (fun () -> Util.msg "Message received (id: %d)\n" num_id); 1055 (* Read the header *) 1056 receivePacket conn >>= (fun buf -> 1057 let req = unmarshalHeader conn buf in 1058 begin match req with 1059 Request cmdName -> 1060 receivePacket conn >>= (fun buf -> 1061 (* We yield before starting processing the request. 1062 This way, the request may call [Lwt_unix.run] and this will 1063 not block the receiving thread. *) 1064 Lwt.ignore_result 1065 (Lwt_unix.yield () >>= (fun () -> 1066 processRequest conn id cmdName buf)); 1067 receive conn) 1068 | NormalResult -> 1069 receivePacket conn >>= (fun buf -> 1070 Lwt.wakeup (find_receiver num_id) buf; 1071 receive conn) 1072 | TransientExn s -> 1073 debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s); 1074 Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s); 1075 receive conn 1076 | FatalExn s -> 1077 debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s); 1078 Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s)); 1079 receive conn 1080 | Stream cmdName -> 1081 receivePacket conn >>= fun buf -> 1082 processStream conn id cmdName buf >>= fun () -> 1083 receive conn 1084 | StreamAbort -> 1085 streamAbortedSrc := num_id; 1086 receive conn 1087 end) 1088 end) 1089 end 1090 1091 let wait_for_reply id = 1092 let res = Lwt.wait () in 1093 receivers := MsgIdMap.add id res !receivers; 1094 (* We yield to let the receiving thread restart. This way, the 1095 thread may call [Lwt_unix.run] and this will not block the 1096 receiving thread. *) 1097 Lwt.catch 1098 (fun () -> 1099 res >>= (fun v -> Lwt_unix.yield () >>= (fun () -> Lwt.return v))) 1100 (fun e -> Lwt_unix.yield () >>= (fun () -> Lwt.fail e)) 1101 1102 let registerSpecialServerCmd 1103 (cmdName : string) 1104 marshalingFunctionsArgs 1105 marshalingFunctionsResult 1106 (serverSide : connection -> 'a -> 'b Lwt.t) 1107 = 1108 (* Check that this command name has not already been bound *) 1109 if (Util.StringMap.mem cmdName !serverCmds) then 1110 raise (Util.Fatal (cmdName ^ " already registered!")); 1111 (* Create marshaling and unmarshaling functions *) 1112 let ((marshalArgs,unmarshalArgs) : 'a marshalingFunctions) = 1113 makeMarshalingFunctions marshalingFunctionsArgs (cmdName ^ "-args") in 1114 let ((marshalResult,unmarshalResult) : 'b marshalingFunctions) = 1115 makeMarshalingFunctions marshalingFunctionsResult (cmdName ^ "-res") in 1116 (* Create a server function and remember it *) 1117 let server conn buf = 1118 let args = unmarshalArgs conn buf in 1119 serverSide conn args >>= (fun answer -> 1120 Lwt.return (marshalResult conn answer)) 1121 in 1122 serverCmds := Util.StringMap.add cmdName server !serverCmds; 1123 (* Create a client function and return it *) 1124 let client conn serverArgs = 1125 let id = newMsgId () in (* Message ID *) 1126 assert (id >= 0); (* tracking down an assert failure in receivePacket... *) 1127 let request = 1128 encodeInt id :: 1129 marshalHeader conn (Request cmdName) (marshalArgs conn serverArgs []) 1130 in 1131 let reply = wait_for_reply id in 1132 debugE (fun () -> Util.msg "Sending request (id: %d)\n" id); 1133 dump conn request >>= (fun () -> 1134 reply >>= (fun buf -> 1135 Lwt.return (unmarshalResult conn buf))) 1136 in 1137 client 1138 1139 let registerServerCmd name ?(convV0=convV0_id_pair) mArg mRet f = 1140 registerSpecialServerCmd 1141 name (defaultMarshalingFunctions (fst convV0) mArg) 1142 (defaultMarshalingFunctions (snd convV0) mRet) f 1143 1144 (* RegisterHostCmd is a simpler version of registerClientServer [registerServerCmd?]. 1145 It is used to create remote procedure calls: the only communication 1146 between the client and server is the sending of arguments from 1147 client to server, and the sending of the result from the server 1148 to the client. Thus, server side does not need the file descriptors 1149 for communication with the client. 1150 1151 RegisterHostCmd recognizes the case where the server is the local 1152 host, and it avoids socket communication in this case. 1153 *) 1154 let registerHostCmd cmdName ?(convV0=convV0_id_pair) mArg mRet cmd = 1155 let serverSide = (fun _ args -> cmd args) in 1156 let client0 = 1157 registerServerCmd cmdName ~convV0 mArg mRet serverSide in 1158 let client root args = 1159 let conn = ClientConn.ofRoot root in 1160 client0 conn args in 1161 (* Return a function that runs either the proxy or the local version, 1162 depending on whether the call is to the local host or a remote one *) 1163 fun root args -> 1164 match root with 1165 | (Common.Local, _) -> cmd args 1166 | (Common.Remote _, _) -> client root args 1167 1168 (* RegisterRootCmd is like registerHostCmd but it indexes connections by 1169 root instead of host. *) 1170 let registerRootCmd (cmdName : string) 1171 ?(convV0=convV0_id_pair) mArg mRet (cmd : (Fspath.t * 'a) -> 'b) = 1172 let mArg = Umarshal.(prod2 Fspath.m mArg id id) in 1173 let r = registerHostCmd cmdName ~convV0 mArg mRet cmd in 1174 fun root args -> r root ((snd root), args) 1175 1176 let registerRootCmdWithConnection (cmdName : string) 1177 ?(convV0=convV0_id_pair) mArg mRet (cmd : connection -> 'a -> 'b) = 1178 let client0 = registerServerCmd cmdName ~convV0 mArg mRet cmd in 1179 (* Return a function that runs either the proxy or the local version, 1180 depending on whether the call is to the local host or a remote one *) 1181 fun localRoot remoteRoot args -> 1182 match (fst localRoot) with 1183 | Common.Local -> let conn = ClientConn.ofRoot remoteRoot in 1184 cmd conn args 1185 | _ -> let conn = ClientConn.ofRoot localRoot in 1186 client0 conn args 1187 1188 let streamReg = lwtRegionWithConnCleanup 1 1189 1190 let streamingActivated = 1191 Prefs.createBool "stream" true 1192 ~category:(`Advanced `Remote) 1193 ~deprecated:true 1194 ("use a streaming protocol for transferring file contents") 1195 "When this preference is set, Unison will use an experimental \ 1196 streaming protocol for transferring file contents more efficiently. \ 1197 The default value is \\texttt{true}." 1198 1199 let registerStreamCmd 1200 (cmdName : string) 1201 marshalingFunctionsArgs 1202 (serverSide : connection -> 'a -> unit) 1203 = 1204 let cmd = 1205 registerSpecialServerCmd 1206 cmdName marshalingFunctionsArgs 1207 (defaultMarshalingFunctions convV0_id Umarshal.unit) 1208 (fun conn v -> serverSide conn v; Lwt.return ()) 1209 in 1210 let ping = 1211 registerServerCmd (cmdName ^ "Ping") Umarshal.int Umarshal.unit 1212 (fun conn (id : int) -> 1213 try 1214 let e = Hashtbl.find streamError id in 1215 Hashtbl.remove streamError id; 1216 streamAbortedDst := false; 1217 Lwt.fail e 1218 with Not_found -> 1219 Lwt.return ()) 1220 in 1221 (* Check that this command name has not already been bound *) 1222 if (Util.StringMap.mem cmdName !serverStreams) then 1223 raise (Util.Fatal (cmdName ^ " already registered!")); 1224 (* Create marshaling and unmarshaling functions *) 1225 let ((marshalArgs,unmarshalArgs) : 'a marshalingFunctions) = 1226 makeMarshalingFunctions marshalingFunctionsArgs (cmdName ^ "-str") in 1227 (* Create a server function and remember it *) 1228 let server conn buf = 1229 let args = unmarshalArgs conn buf in 1230 serverSide conn args 1231 in 1232 serverStreams := Util.StringMap.add cmdName server !serverStreams; 1233 (* Create a client function and return it *) 1234 let client conn id serverArgs = 1235 debugE (fun () -> Util.msg "Sending stream chunk (id: %d)\n" id); 1236 if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted"); 1237 let request = 1238 encodeInt id :: 1239 marshalHeader conn (Stream cmdName) (marshalArgs conn serverArgs []) 1240 in 1241 dumpIdle conn request 1242 in 1243 fun conn sender -> 1244 if not (Prefs.read streamingActivated) then 1245 sender (fun v -> cmd conn v) 1246 else begin 1247 (* At most one active stream at a time *) 1248 let id = newMsgId () in (* Message ID *) 1249 Lwt.try_bind 1250 (fun () -> 1251 Lwt_util.run_in_region !streamReg 1 1252 (fun () -> sender (fun v -> client conn id v))) 1253 (fun v -> ping conn id >>= fun () -> Lwt.return v) 1254 (fun e -> 1255 if !streamAbortedSrc = id then begin 1256 debugE (fun () -> 1257 Util.msg "Pinging remote end after streaming error\n"); 1258 ping conn id >>= fun () -> Lwt.fail e 1259 end else 1260 Lwt.fail e) 1261 end 1262 1263 let commandAvailable = 1264 registerRootCmd "commandAvailable" Umarshal.string Umarshal.bool 1265 (fun (_, cmdName) -> Lwt.return (Util.StringMap.mem cmdName !serverCmds)) 1266 1267 (**************************************************************************** 1268 BUILDING CONNECTIONS TO THE SERVER 1269 ****************************************************************************) 1270 1271 let receiveUntilSep ?(space=false) ?(nl=true) ?(includesep=false) conn = 1272 assert (space || nl); 1273 let inp = Buffer.create 32 1274 and buf = Bytearray.create 1 in 1275 let add () = Buffer.add_char inp buf.{0} in 1276 let rec aux () = 1277 grab conn.inputBuffer buf 1 >>= fun () -> 1278 match buf.{0} with 1279 | ' ' | '\t' when space -> 1280 if includesep then add (); Lwt.return (Buffer.contents inp) 1281 | '\n' when nl -> 1282 if includesep then add (); Lwt.return (Buffer.contents inp) 1283 | '\r' -> 1284 aux () (* ignore *) 1285 | _ -> 1286 add (); aux () 1287 in aux () 1288 1289 (* Get input until newline (excluded), blocking *) 1290 let receiveUntilNewline conn = 1291 receiveUntilSep ~nl:true conn 1292 1293 (* Get input until space or newline, separator included by default; blocking *) 1294 let receiveUntilSpaceOrNl ?(includesep=true) conn = 1295 receiveUntilSep ~space:true ~nl:true ~includesep conn 1296 1297 (* Get input of fixed length, blocking *) 1298 let receiveString conn len = 1299 let buf = Bytearray.create len in 1300 grab conn.inputBuffer buf len >>= fun () -> 1301 Lwt.return (Bytearray.to_string buf) 1302 1303 (* Get input until newline (excluded), non-blocking *) 1304 let receiveUntilNewlineNb conn = 1305 let e = Bytes.to_string (peekWithoutBlocking conn.inputBuffer) in 1306 let len = try String.index e '\n' with Not_found -> String.length e in 1307 receiveString conn len 1308 1309 let sendStrings conn slist = 1310 dump conn 1311 (Safelist.map (fun s -> (Bytearray.of_string s, 0, String.length s)) slist) 1312 1313 let sendString conn s = sendStrings conn [s] 1314 1315 (****) 1316 1317 let rpcOk = "OK\n" 1318 1319 let rpcNokTag = "NOK " 1320 let rpcErr err = rpcNokTag ^ err ^ "\n" 1321 1322 type handshakeMsg = Ok | Error of string | Unknown of string 1323 1324 let receiveHandshakeMsg conn = 1325 receiveUntilSpaceOrNl conn >>= fun msg -> 1326 if msg = rpcOk then Lwt.return Ok 1327 else if msg = rpcNokTag then begin 1328 receiveUntilNewlineNb conn >>= fun msg -> Lwt.return (Error msg) 1329 end else 1330 Lwt.return (Unknown msg) 1331 1332 type handshakeData = Data of string | Error of string | Unknown of string 1333 1334 let receiveHandshakeData conn keyw = 1335 receiveUntilSpaceOrNl conn >>= fun msg -> 1336 if msg = keyw then 1337 receiveUntilNewline conn >>= fun data -> Lwt.return (Data data) 1338 else if msg = rpcNokTag then 1339 receiveUntilNewlineNb conn >>= fun err -> Lwt.return (Error err) 1340 else 1341 Lwt.return (Unknown msg) 1342 1343 let sendHandshakeMsg conn = function 1344 | Ok -> sendString conn rpcOk 1345 | Error err -> sendString conn (rpcErr err) 1346 | Unknown _ -> assert false 1347 1348 let sendHandshakeErr conn err = 1349 sendHandshakeMsg conn (Error err) 1350 1351 let sendHandshakeData conn keyw data = 1352 let len = String.length keyw in 1353 let keyw = if len > 0 && keyw.[len - 1] <> ' ' then keyw ^ " " else keyw in 1354 sendString conn (keyw ^ data ^ "\n") 1355 1356 (* RPC version negotiation process: 1357 1. Server sends connectionHeader and supported RPC versions. 1358 1359 2. Client receives and verifies connectionHeader. 1360 * If OK then proceeds. 1361 * If NOK then closes connection. 1362 1363 3. Client receives and verifies RPC versions. 1364 * If not correct version tag or can't parse then closes connection. 1365 1366 4. Client selects a version (typically the most recent one) from the 1367 intersection of its supported RPC versions and server's RPC versions. 1368 * If intersection is empty then closes connection. 1369 1370 5. Client sends selected RPC version to the server. 1371 1372 6. Server receives and verifies proposed version. 1373 * If OK then proceeds. 1374 * If not correct version tag, can't parse or proposed version is 1375 not supported then server sends "NOK". 1376 ** Client receives "NOK" and closes connection. 1377 1378 7. Server selects proposed version and sends "OK". 1379 1380 8. Client receives "OK". Version negotiation is complete. 1381 *) 1382 1383 let connectionHeader = "Unison RPC\n" 1384 let compatConnectionHeader = "Unison 2.51 with OCaml >= 4.01.2\n" 1385 (* Every supported version released prior to the RPC version negotiation 1386 mechanism uses this connection header string. *) 1387 let compat248ConnectionHeader = "Unison 2.48\n" 1388 (* Additionally, even 2.48 can be supported, even though that support is 1389 not official. *) 1390 1391 let rpcVersionsTag = "VERSIONS " 1392 let rpcVersionsStr = rpcVersionsTag ^ rpcSupportedVersionStrHdr ^ "\n" 1393 1394 let rpcVersionTag = "VERSION " 1395 let rpcVersionStr ver = rpcVersionTag ^ string_of_int ver ^ "\n" 1396 1397 let verIsSupported ver = 1398 Safelist.exists (fun v -> v = ver) rpcSupportedVersions 1399 1400 let handshakeFail err = 1401 Lwt.fail (Util.Fatal err) 1402 1403 let handshakeError msg = 1404 handshakeFail ("Received error from the server: \"" ^ msg ^ "\".") 1405 1406 let handshakeUnknown msg = 1407 handshakeFail ("Received unexpected header from the server: \"" 1408 ^ String.escaped msg ^ "\".") 1409 1410 let parseVersion side s = 1411 let error e = 1412 raise (Util.Transient 1413 ("Unknown " ^ side ^ " RPC version: " ^ e 1414 ^ ". Version received from " ^ side ^ ": \"" ^ String.escaped s 1415 ^ "\". Supported RPC versions: " ^ rpcSupportedVersionStr)) 1416 in 1417 if s = "" then 1418 error "invalid format" 1419 else 1420 match int_of_string s with 1421 | ver -> Some ver 1422 | exception Failure _ -> error "parse error" 1423 1424 let parseServerVersions inp = 1425 let supported l = function 1426 | "" -> l 1427 | v -> match parseVersion "server" v with 1428 | Some vi -> if verIsSupported vi then vi :: l else l 1429 | None -> l 1430 in 1431 try 1432 let vs = String.split_on_char ' ' inp in 1433 if vs = [""] then ignore (parseVersion "server" ""); (* Trigger the error *) 1434 let intersect = Safelist.fold_left supported [] vs in 1435 Lwt.return (Safelist.rev (Safelist.sort compare intersect)) 1436 with 1437 | Util.Transient e -> handshakeFail e 1438 1439 let selectServerVersion conn = 1440 let getTheRest () = Bytes.to_string (peekWithoutBlocking conn.inputBuffer) in 1441 receiveHandshakeData conn rpcVersionsTag >>= function 1442 | Error msg -> handshakeError msg 1443 | Unknown fromServ -> handshakeUnknown (fromServ ^ getTheRest ()) 1444 | Data versions -> 1445 parseServerVersions versions >>= function 1446 | [] -> 1447 handshakeFail ("None of server's RPC versions are supported. " 1448 ^ "The server may be too old or too recent. " 1449 ^ "Versions received from server: \"" 1450 ^ String.escaped versions ^ "\". " 1451 ^ "Supported RPC versions: " ^ rpcSupportedVersionStr) 1452 | ver :: _ -> 1453 setConnectionVersion conn ver; 1454 debug (fun () -> Util.msg "Selected RPC version: %i\n" ver); 1455 sendHandshakeData conn rpcVersionTag (string_of_int ver) >>= fun () -> 1456 receiveHandshakeMsg conn >>= function 1457 | Ok -> Lwt.return () 1458 | Error reply -> handshakeError reply 1459 | Unknown reply -> handshakeUnknown (reply ^ getTheRest ()) 1460 1461 let checkServerVersion conn header = 1462 if header = compatConnectionHeader then begin 1463 setConnectionVersion conn 0; 1464 debug (fun () -> Util.msg "Selected RPC version: 2.51-compatibility\n"); 1465 (* skip negotiation *) Lwt.return () 1466 end else if header = compat248ConnectionHeader then begin 1467 setConnectionVersion conn 0; 1468 debug (fun () -> Util.msg "Selected RPC version: 2.48-compatibility\n"); 1469 (* skip negotiation *) Lwt.return () 1470 end else 1471 selectServerVersion conn 1472 1473 let rec checkHeaderRec conn buffer pos len connectionHeader = 1474 if pos = len then 1475 Lwt.return connectionHeader 1476 else begin 1477 (grab conn.inputBuffer buffer 1 >>= (fun () -> 1478 let chOk = 1479 try buffer.{0} = connectionHeader.[pos] with Invalid_argument _ -> false 1480 and compatChOk = 1481 try buffer.{0} = compatConnectionHeader.[pos] with Invalid_argument _ -> false 1482 and compat248ChOk = 1483 try buffer.{0} = compat248ConnectionHeader.[pos] with Invalid_argument _ -> false 1484 in 1485 if not chOk && not compatChOk && not compat248ChOk then 1486 let prefix = 1487 String.sub connectionHeader 0 pos ^ Bytearray.to_string buffer in 1488 let rest = peekWithoutBlocking conn.inputBuffer in 1489 Lwt.fail 1490 (Util.Fatal 1491 ("Received unexpected header from the server:\n \ 1492 expected \"" 1493 ^ String.escaped (* (String.sub connectionHeader 0 (pos + 1)) *) 1494 connectionHeader 1495 ^ "\" but received \"" ^ String.escaped (prefix ^ Bytes.to_string rest) ^ "\", \n" 1496 ^ "which differs at \"" ^ String.escaped prefix ^ "\".\n" 1497 ^ "This can happen because you have different versions of Unison\n" 1498 ^ "installed on the client and server machines, or because\n" 1499 ^ "your connection is failing and somebody is printing an error\n" 1500 ^ "message, or because your remote login shell is printing\n" 1501 ^ "something itself before starting Unison.")) 1502 else 1503 if not chOk && compatChOk then 1504 (* We make use of the fact that that the new header is almost a prefix 1505 of the old header. It is not an exact comparison here but good 1506 enough for this purpose. *) 1507 checkHeaderRec conn buffer (pos + 1) 1508 (String.length compatConnectionHeader) compatConnectionHeader 1509 else if not chOk && compat248ChOk then 1510 checkHeaderRec conn buffer (pos + 1) 1511 (String.length compat248ConnectionHeader) compat248ConnectionHeader 1512 else 1513 checkHeaderRec conn buffer (pos + 1) len connectionHeader)) 1514 end 1515 1516 let checkHeader conn = 1517 checkHeaderRec conn (Bytearray.create 1) 0 1518 (String.length connectionHeader) connectionHeader 1519 1520 (****) 1521 1522 (* Magic string exchange is used within the old protocol to detect if both 1523 the server and the client support the new RPC version negotiation mechanism. 1524 1525 It works like this: 1526 1. Directly after connection header, the server sends the magic string and 1527 otherwise continues using the old RPC protocol. 1528 2. An old client will process the magic string as a valid RPC message that 1529 is effectively a no-op and continues using old RPC protocol as normal. 1530 3. A new client will notice the magic string and send the same magic string 1531 in response. It will stop the old RPC protocol and restart from header 1532 checking and what is now hopefully an RPC version negotiation. 1533 4. The server will notice client's magic string, stop the old RPC protocol 1534 and restart from connection header, this time with the new RPC version 1535 negotiation mechanism. 1536 1537 The magic string is defined as follows: 1538 1. encoded int 1 followed by 1539 2. encoded int > 0 (packet size) followed by 1540 3. a valid 2.51 protocol packet, the contents of which we don't care about, 1541 but it must be a no-op for 2.51 client (in this case a StreamAbort). 1542 1543 Int 1 is a valid 2.51 protocol message ID but it is never used with normal 1544 messages, hence its safe usage as a magic string. A StreamAbort to a client, 1545 especially with id 1, is a safe no-op. *) 1546 1547 let magicId = 1 1548 (* Although this magic packet is inherently dependent on OCaml version, 1549 it is unlikely to change and has been verified to be the same with 1550 OCaml versions 4.05 to 4.12. It is hard coded here to avoid any future 1551 changes (the idea being that old clients will not be compiled with 1552 any newer OCaml compilers). *) 1553 let magicPacket = "rsp\132\149\166\190\000\000\000\001\000\000\000\000\000\000\000\000\000\000\000\000A" 1554 let magic = encodeInt magicId :: encodeInt (String.length magicPacket) :: 1555 [Bytearray.of_string magicPacket, 0, String.length magicPacket] 1556 1557 let checkForMagicString conn = 1558 (* Fill the buffer and then peek at the contents without consuming *) 1559 peekWithBlocking conn.inputBuffer >>= fun b -> 1560 if Bytes.length b < intSize then 1561 Lwt.return false 1562 else begin 1563 let id = Bytearray.create intSize in 1564 let () = Bytearray.blit_from_bytes b 0 id 0 intSize in 1565 if decodeInt id 0 <> magicId then 1566 Lwt.return false 1567 else begin 1568 debug (fun () -> Util.msg "Received RPC version upgrade notice\n"); 1569 (* Consume magic id from buffer *) 1570 grab conn.inputBuffer id intSize >>= fun () -> 1571 (* Consume magic packet from buffer *) 1572 receivePacket conn >>= fun _ -> Lwt.return true 1573 (* We rely solely on the magic id and don't check the contents of the 1574 packet. Should it become necessary for some reason then it is 1575 possible to verify the magic packet byte by byte here. *) 1576 end 1577 end 1578 1579 let checkServerUpgrade conn header = 1580 if header <> compatConnectionHeader && header <> compat248ConnectionHeader then 1581 Lwt.return header 1582 else 1583 checkForMagicString conn >>= function 1584 | false -> Lwt.return header 1585 | true -> 1586 (* Consume write token from buffer *) 1587 let id = Bytearray.create intSize in 1588 grab conn.inputBuffer id intSize >>= fun () -> 1589 (* Send the magic string *) 1590 dumpUrgent conn magic >>= fun () -> 1591 debug (fun () -> Util.msg "Going to attempt RPC version upgrade\n"); 1592 checkHeader conn 1593 1594 (****) 1595 1596 (* 1597 Disable flow control if possible. 1598 Both hosts must use non-blocking I/O (otherwise a dead-lock is 1599 possible with ssh). 1600 *) 1601 let halfduplex = 1602 Prefs.createBool "halfduplex" false 1603 ~category:(`Advanced `Remote) 1604 ~deprecated:true 1605 "force half-duplex communication with the server" 1606 "When this flag is set to {\\tt true}, Unison network communication \ 1607 is forced to be half duplex (the client and the server never \ 1608 simultaneously emit data). If you experience unstabilities with \ 1609 your network link, this may help." 1610 1611 let negociateFlowControlLocal conn () = 1612 disableFlowControl conn.outputQueue; 1613 Lwt.return false 1614 1615 let negociateFlowControlRemote = 1616 registerServerCmd "negociateFlowControl" Umarshal.unit Umarshal.bool negociateFlowControlLocal 1617 1618 let negociateFlowControl conn = 1619 (* Flow control negotiation can be done asynchronously. *) 1620 if not (Prefs.read halfduplex) then 1621 Lwt.ignore_result 1622 (negociateFlowControlRemote conn () >>= fun needed -> 1623 if not needed then 1624 negociateFlowControlLocal conn () 1625 else 1626 Lwt.return true) 1627 1628 (****) 1629 1630 let initConnection ?(connReady=fun () -> ()) ?cleanup in_ch out_ch = 1631 (* [makeConnection] is not expected to raise any recoverable exceptions. 1632 If this assumption changes in the future then [in_ch] and [out_ch] must 1633 be closed in the recovery code. *) 1634 let conn = makeConnection false in_ch out_ch in 1635 let close_on_fail t = 1636 Lwt.catch (fun () -> t) (fun e -> closeConnection conn; Lwt.fail e) 1637 in 1638 let with_timeout t = 1639 Lwt.choose [t; 1640 Lwt_unix.sleep 120. >>= fun () -> 1641 Lwt.fail (Util.Fatal "Timed out negotiating connection with the server")] 1642 in 1643 close_on_fail (with_timeout ( 1644 peekWithBlocking conn.inputBuffer >>= fun _ -> 1645 connReady (); Lwt.return () >>= fun () -> (* Connection working, notify *) 1646 checkHeader conn >>= 1647 checkServerUpgrade conn >>= 1648 checkServerVersion conn)) >>= fun () -> 1649 registerConnCleanup conn cleanup; 1650 (* From this moment forward, the RPC version has been selected. All 1651 communication must now adhere to that version's specification. *) 1652 enableFlowControl conn false >>= (fun () -> 1653 Lwt.ignore_result (Lwt.catch 1654 (fun () -> receive conn) 1655 (fun e -> 1656 clientConnClose conn; 1657 if isConnectionCheck conn then Lwt.return () else Lwt.fail e)); 1658 negociateFlowControl conn; 1659 Lwt.return conn) 1660 1661 let rec findFirst f l = 1662 match l with 1663 [] -> Lwt.return None 1664 | x :: r -> f x >>= fun v -> 1665 match v with 1666 None -> findFirst f r 1667 | Some _ as v -> Lwt.return v 1668 1669 let printAddr host addr = 1670 match addr with 1671 Unix.ADDR_UNIX s -> 1672 s 1673 | Unix.ADDR_INET (s, p) -> 1674 Format.sprintf "%s[%s]:%d" host (Unix.string_of_inet_addr s) p 1675 1676 let buildSocket host port kind ?(err="") ai = 1677 let attemptCreation ai = 1678 Lwt.catch 1679 (fun () -> 1680 let socket = 1681 Lwt_unix.socket ~cloexec:true 1682 ai.Unix.ai_family ai.Unix.ai_socktype ai.Unix.ai_protocol 1683 in 1684 Lwt.catch 1685 (fun () -> 1686 begin match kind with 1687 `Connect -> 1688 (* Connect (synchronously) to the remote host *) 1689 Lwt_unix.connect socket ai.Unix.ai_addr 1690 | `Bind -> 1691 (* Some OS (Linux?) enable dual-stack mode by default; 1692 trying to bind both IPv4 and IPv6 sockets will fail 1693 with EADDRINUSE unless dual-stack mode is disabled. *) 1694 if ai.Unix.ai_family = Unix.PF_INET6 then 1695 Lwt_unix.setsockopt socket Unix.IPV6_ONLY true; 1696 (* Allow reuse of local addresses for bind *) 1697 if ai.Unix.ai_family <> Unix.PF_UNIX then 1698 Lwt_unix.setsockopt socket Unix.SO_REUSEADDR true; 1699 (* Bind the socket to portnum on the local host 1700 or to a filesystem path (when Unix domain socket) *) 1701 Lwt_unix.bind socket ai.Unix.ai_addr; 1702 (* Start listening, allow up to 1 pending request *) 1703 Lwt_unix.listen socket 1; 1704 Lwt.return () 1705 end >>= fun () -> 1706 Lwt.return (Some socket)) 1707 (fun e -> 1708 match e with 1709 Unix.Unix_error _ -> 1710 Lwt_unix.close socket; 1711 Lwt.fail e 1712 | _ -> 1713 Lwt.fail e)) 1714 (fun e -> 1715 match e with 1716 Unix.Unix_error (error, _, _) -> 1717 begin match error with 1718 Unix.EAFNOSUPPORT | Unix.EPROTONOSUPPORT | Unix.EINVAL -> 1719 Lwt.return None 1720 | _ -> 1721 let msg = 1722 match kind with 1723 `Connect -> 1724 Printf.sprintf "%s%s: %s\n" 1725 err 1726 (printAddr host ai.Unix.ai_addr) 1727 (Unix.error_message error) 1728 | `Bind when ai.Unix.ai_family <> Unix.PF_UNIX -> 1729 Printf.sprintf 1730 "Can't bind socket to port %s at address [%s]: %s\n" 1731 port 1732 (match ai.Unix.ai_addr with 1733 Unix.ADDR_INET (addr, _) -> 1734 Unix.string_of_inet_addr addr 1735 | _ -> 1736 assert false) 1737 (Unix.error_message error) 1738 | `Bind (* Unix.PF_UNIX *) -> 1739 Printf.sprintf 1740 "Can't bind socket to path '%s': %s\n" 1741 port 1742 (Unix.error_message error) 1743 in 1744 Lwt.fail (Util.Fatal msg) 1745 end 1746 | _ -> 1747 Lwt.fail e) 1748 in 1749 attemptCreation ai 1750 1751 let makeUnixSocketAi path = 1752 { Unix.ai_family = Unix.PF_UNIX; 1753 ai_socktype = Unix.SOCK_STREAM; 1754 ai_protocol = 0; 1755 ai_addr = Unix.ADDR_UNIX path; 1756 ai_canonname = "" } 1757 1758 let buildConnectSocketUnix path = 1759 assert (String.length path > 2); 1760 (* Unix domain socket path from [Clroot] is enclosed in curly braces. 1761 Extract the real path. *) 1762 let path = String.sub path 1 ((String.length path) - 2) in 1763 let err = "Can't connect to Unix domain socket on path " in 1764 buildSocket "" path `Connect ~err (makeUnixSocketAi path) >>= function 1765 | None -> 1766 Lwt.fail (Util.Fatal (err ^ path)) 1767 | Some x -> 1768 Lwt.return x 1769 1770 let buildConnectSocket host port = 1771 let isHost = String.length host > 0 && host.[0] <> '{' in 1772 if not isHost then buildConnectSocketUnix host else 1773 let err = "Failed to connect to the server on host " in 1774 let attemptCreation ai = buildSocket host port `Connect ~err ai in 1775 let options = [ Unix.AI_SOCKTYPE Unix.SOCK_STREAM ] in 1776 findFirst attemptCreation (Unix.getaddrinfo host port options) >>= fun res -> 1777 match res with 1778 Some socket -> 1779 Lwt.return socket 1780 | None -> 1781 let hostport = Printf.sprintf "%s:%s" host port in 1782 Lwt.fail (Util.Fatal (err ^ hostport)) 1783 1784 (* [at_exit] does not provide reliable cleanup (why?), so this 1785 complex mechanism is needed to unlink Unix domain sockets 1786 in case of exceptional termination. *) 1787 let createdUnixSockets = ref [] 1788 1789 let postponeUnixSocketCleanup path = 1790 createdUnixSockets := path :: !createdUnixSockets 1791 1792 let unixSocketCleanup () = 1793 Safelist.iter 1794 (fun path -> try Unix.unlink path with Unix.Unix_error _ -> ()) 1795 !createdUnixSockets 1796 1797 let buildListenSocketUnix path = 1798 assert (path <> ""); 1799 buildSocket "" path `Bind (makeUnixSocketAi path) >>= function 1800 | None -> 1801 Lwt.fail (Util.Fatal 1802 (Printf.sprintf "Can't bind Unix domain socket on path %s" path)) 1803 | Some x -> 1804 postponeUnixSocketCleanup path; 1805 Lwt.return [x] 1806 1807 let buildListenSocket hosts port = 1808 let isPort = try ignore (int_of_string port); true with Failure _ -> false in 1809 if not isPort then buildListenSocketUnix port else 1810 let options = [ Unix.AI_SOCKTYPE Unix.SOCK_STREAM ; Unix.AI_PASSIVE ] in 1811 hosts 1812 |> Safelist.map (fun host -> Unix.getaddrinfo host port options) 1813 |> Safelist.concat 1814 |> Lwt_util.map (buildSocket "" port `Bind) >>= fun res -> 1815 match Safelist.filter (fun x -> x <> None) res with 1816 | [] -> 1817 Lwt.fail (Util.Fatal (Printf.sprintf "Can't bind socket to port %s" port)) 1818 | s -> 1819 Lwt.return (Safelist.map (function None -> assert false | Some x -> x) s) 1820 1821 let buildSocketConnection host port = 1822 buildConnectSocket host port >>= fun socket -> 1823 initConnection socket socket 1824 1825 let buildShellConnection shell host userOpt portOpt rootName termInteract = 1826 let remoteCmd = 1827 (if Prefs.read serverCmd="" then Uutil.myName 1828 else Prefs.read serverCmd) 1829 ^ (if Prefs.read addversionno then "-" ^ Uutil.myMajorVersion else "") 1830 ^ " -server " ^ rpcServerCmdlineOverride in 1831 let userArgs = 1832 match userOpt with 1833 None -> [] 1834 | Some user -> ["-l"; user] in 1835 let portArgs = 1836 match portOpt with 1837 None -> [] 1838 | Some port -> ["-p"; port] in 1839 let shellCmd = 1840 (if shell = "ssh" then 1841 Prefs.read sshCmd 1842 else 1843 shell) in 1844 let shellCmdArgs = 1845 (if shell = "ssh" then 1846 Prefs.read sshargs 1847 else 1848 "") in 1849 let preargs = 1850 (userArgs @ portArgs @ 1851 [host]@ 1852 (if shell="ssh" then ["-e none"] else [])@ 1853 [shellCmdArgs;remoteCmd]) in 1854 (* Split compound arguments at space chars, to make 1855 create_process happy *) 1856 let args = [shellCmd] @ 1857 Safelist.concat 1858 (Safelist.map (fun s -> Util.splitIntoWords s ' ') preargs) in 1859 let argsarray = Array.of_list args in 1860 let (i1,o1) = Lwt_unix.pipe_out ~cloexec:true () in 1861 let (i2,o2) = Lwt_unix.pipe_in ~cloexec:true () in 1862 (* We need to make sure that there is only one reader and one 1863 writer by pipe, so that, when one side of the connection 1864 dies, the other side receives an EOF or a SIGPIPE. *) 1865 debug (fun ()-> Util.msg "Shell connection: %s (%s)\n" 1866 shellCmd (String.concat ", " args)); 1867 let (term, termPid) = 1868 Util.convertUnixErrorsToFatal "starting shell connection" (fun () -> 1869 match termInteract with 1870 | None -> 1871 (* Signals generated by the terminal from user input are sent to all 1872 processes in the foreground process group. This means that the ssh 1873 child process will receive SIGINT at the same time as Unison and 1874 close the connection before Unison has the chance to do cleanup with 1875 the remote end. To make matters more complicated, the ssh process 1876 must be in the foreground process group because interaction with the 1877 user is done via the terminal (not via stdin, stdout) and background 1878 processes can't read from the terminal (unless we'd set up a pty 1879 like is done for the GUI). 1880 1881 Don't let these signals reach ssh by blocking them. 1882 1883 Unfortunately, a bug introduced in OpenSSH 9.6 (also present in 9.7) 1884 breaks this workaround by unblocking SIGINT in the ssh process. 1885 1886 The signals could be ignored instead of being blocked because ssh 1887 does not set handlers for SIGINT and SIGQUIT if they've been ignored 1888 at startup. But this triggers an error in ssh. The interactive 1889 passphrase reading function captures these signals for the purpose 1890 of restoring terminal settings (echo). When receiving a signal, and 1891 after restoring previous signal handlers, it resends the signal to 1892 itself. But now the signal is ignored and instead of terminating, 1893 the process will continue running as if passphrase reading function 1894 had returned with an empty result. 1895 1896 Since the ssh process no longer receives the signals generated by 1897 user input we have to make sure that it terminates when Unison does. 1898 This usually happens due to its stdin and stdout being closed, 1899 except for when it is interacting with the user via terminal. To get 1900 around that, an [at_exit] handler is registered to send a SIGTERM 1901 and SIGKILL to the ssh process. (Note, for [at_exit] handlers to 1902 run, unison process must terminate normally, not be killed. For 1903 SIGINT, this means that [Sys.catch_break true] (or an alternative 1904 SIGINT handler) must be set before creating the ssh process.) *) 1905 let pid = Util.blockSignals [Sys.sigint] (fun () -> 1906 System.create_process shellCmd argsarray i1 o2 Unix.stderr) in 1907 let end_ssh () = 1908 let kill_noerr si = try Unix.kill pid si 1909 with Unix.Unix_error _ -> () | Invalid_argument _ -> () in 1910 match Unix.waitpid [WNOHANG] pid with 1911 | (0, _) -> 1912 (* Grace period before killing. Important to give ssh a chance 1913 to restore terminal settings, should that be needed. *) 1914 kill_noerr Sys.sigterm; Unix.sleepf 0.01; kill_noerr Sys.sigkill 1915 | _ | exception Unix.Unix_error _ -> () 1916 in 1917 let () = at_exit end_ssh in 1918 (None, pid) 1919 | Some callBack -> 1920 Terminal.create_session shellCmd argsarray i1 o2 Unix.stderr) 1921 in 1922 Unix.close i1; Unix.close o2; 1923 let writeSilentNoexn fd s pos len = 1924 ignore (try if len > 0 then Unix.write fd s pos len else 0 1925 with Unix.Unix_error _ -> 0) 1926 in 1927 let forwardShellStderr fdIn fdOut s = 1928 (* When the shell connection has been established then keep 1929 forwarding server's stderr to client's stderr; not to GUI. *) 1930 let buf = Bytes.create 16000 in 1931 let rec loop s len = 1932 (* Can't use printf because if stderr is not open in Windows, 1933 it will throw an exception when at_exit tries to flush it. *) 1934 writeSilentNoexn fdOut s 0 len; 1935 Lwt.catch (fun () -> Lwt_unix.read fdIn buf 0 16000) 1936 (fun _ -> debug (fun () -> 1937 Util.msg "Caught an exception when reading remote stderr\n"); 1938 Lwt.return 0) 1939 >>= function 1940 | 0 -> Lwt.return () 1941 | len -> loop buf len 1942 in 1943 loop (Bytes.of_string s) (String.length s) 1944 in 1945 let (connReady, getTermErr) = 1946 match term, termInteract with 1947 | Some fdTerm, Some interact -> 1948 let (setReady, handleRequests, extractRemainingOutput) = 1949 Terminal.handlePasswordRequests fdTerm (interact rootName) in 1950 Lwt.ignore_result ( 1951 handleRequests >>= 1952 forwardShellStderr (fst fdTerm) Unix.stderr); 1953 let connReady () = 1954 let s = setReady () in 1955 writeSilentNoexn Unix.stderr (Bytes.of_string s) 0 (String.length s) 1956 in 1957 (connReady, extractRemainingOutput) 1958 | _ -> 1959 (Fun.id, fun () -> Lwt.return "") 1960 in 1961 let cleanup () = 1962 (* Make sure the [handlePasswordRequests] threads will finish while 1963 silencing any exceptions (most likely EBADF) caused by having closed 1964 the terminal fds. *) 1965 Lwt.ignore_result (getTermErr () >>= fun s -> 1966 debug (fun () -> 1967 if s <> "" then Util.msg "Received from remote shell process:\n%s\n" s); 1968 Lwt.return ()); 1969 if term = None then 1970 try ignore (Terminal.safe_waitpid termPid) with Unix.Unix_error _ -> () 1971 else 1972 try Terminal.close_session termPid with Unix.Unix_error _ -> () 1973 in 1974 (* With [connReady], we know that shell connection was established (even if 1975 RPC handshake failed). This hacky way of detecting the connection is used 1976 because [Lwt_unix.wait_read] is not implemented under Windows. 1977 By this time, we are already somewhat late in the communication process. 1978 Any error output from very early stages of server startup, before other 1979 output is produced, might still end up in GUI (but this is very unlikely; 1980 it is more likely that the same error caused connection to be dropped). *) 1981 Lwt.catch 1982 (fun () -> initConnection ~connReady ~cleanup i2 o1) 1983 (fun e -> 1984 Lwt.catch 1985 (fun () -> getTermErr () >>= fun s -> 1986 if s <> "" then Util.warn s; 1987 Lwt.fail e) 1988 (* Don't close the terminal before reading the final error output 1989 or we might miss it completely. *) 1990 (fun _ -> cleanup (); Lwt.fail e)) 1991 1992 let canonizeLocally s = 1993 Fspath.canonize s 1994 1995 let canonizeOnServer = 1996 registerServerCmd "canonizeOnServer" 1997 Umarshal.(prod2 (option string) bool id id) 1998 Umarshal.(prod2 string Fspath.m id id) 1999 (fun _ (s, _) -> (* The tuple is kept for backwards API compatibility *) 2000 Lwt.return (Os.myCanonicalHostName (), canonizeLocally s)) 2001 2002 let canonizeOnServer conn s = 2003 (* The second tuple item is required for compatibility with <= 2.52 *) 2004 canonizeOnServer conn (s, true) 2005 2006 let canonize clroot = (* connection for clroot must have been set up already *) 2007 match clroot with 2008 Clroot.ConnectLocal s -> 2009 (Common.Local, canonizeLocally s) 2010 | _ -> 2011 match ClientConn.canonRootOfClroot clroot with 2012 None -> raise (Util.Fatal "Remote.canonize") 2013 | Some root -> root 2014 2015 let isRootConnected = function 2016 | (Common.Local, _) -> true 2017 | (Common.Remote _, _) as root -> ClientConn.ofRootConncheck root <> None 2018 2019 let canonizeRoot rootName clroot termInteract = 2020 let finish ioServer s = 2021 (* We need to always compute the fspath as it may have changed 2022 due to profile configuration changes *) 2023 canonizeOnServer ioServer s >>= (fun (host, fspath) -> 2024 let root = (Common.Remote host, fspath) in 2025 ClientConn.register clroot root ioServer; 2026 Lwt.return root) in 2027 match clroot with 2028 Clroot.ConnectLocal s -> 2029 Lwt.return (Common.Local, canonizeLocally s) 2030 | Clroot.ConnectBySocket(host,port,s) -> 2031 begin match ClientConn.ofClrootConncheck clroot with 2032 | Some x -> Lwt.return x 2033 | None -> buildSocketConnection host port 2034 end >>= fun ioServer -> 2035 finish ioServer s 2036 | Clroot.ConnectByShell(shell,host,userOpt,portOpt,s) -> 2037 begin match ClientConn.ofClrootConncheck clroot with 2038 | Some x -> Lwt.return x 2039 | None -> buildShellConnection 2040 shell host userOpt portOpt rootName termInteract 2041 end >>= fun ioServer -> 2042 finish ioServer s 2043 2044 (* A new interface, useful for terminal interaction, it should 2045 eventually replace canonizeRoot and buildShellConnection *) 2046 (* A preconnection is None if there's nothing more to do, and Some if 2047 terminal interaction might be required (for ssh password) *) 2048 type preconnection = 2049 (Unix.file_descr 2050 * Lwt_unix.file_descr 2051 * Lwt_unix.file_descr 2052 * Unix.file_descr 2053 * string option 2054 * (Lwt_unix.file_descr * Lwt_unix.file_descr) option 2055 * Clroot.clroot 2056 * int) 2057 let openConnectionStart clroot = 2058 match clroot with 2059 Clroot.ConnectLocal s -> 2060 None 2061 | Clroot.ConnectBySocket(host,port,s) -> 2062 Lwt_unix.run 2063 (begin match ClientConn.ofClrootConncheck clroot with 2064 | Some x -> Lwt.return x 2065 | None -> buildSocketConnection host port 2066 end >>= fun ioServer -> 2067 (* We need to always compute the fspath as it may have changed 2068 due to profile configuration changes *) 2069 canonizeOnServer ioServer s >>= fun (host, fspath) -> 2070 ClientConn.register clroot (Common.Remote host, fspath) ioServer; 2071 Lwt.return ()); 2072 None 2073 | Clroot.ConnectByShell(shell,host,userOpt,portOpt,s) -> 2074 match ClientConn.ofClrootConncheck clroot with 2075 | Some ioServer -> 2076 (* We recompute the fspath as it may have changed due to 2077 profile configuration changes *) 2078 Lwt_unix.run 2079 (canonizeOnServer ioServer s >>= fun (host, fspath) -> 2080 ClientConn.register clroot (Common.Remote host, fspath) ioServer; 2081 Lwt.return ()); 2082 None 2083 | None -> 2084 let remoteCmd = 2085 (if Prefs.read serverCmd="" then Uutil.myName 2086 else Prefs.read serverCmd) 2087 ^ (if Prefs.read addversionno then "-" ^ Uutil.myMajorVersion else "") 2088 ^ " -server " ^ rpcServerCmdlineOverride in 2089 let userArgs = 2090 match userOpt with 2091 None -> [] 2092 | Some user -> ["-l"; user] in 2093 let portArgs = 2094 match portOpt with 2095 None -> [] 2096 | Some port -> ["-p"; port] in 2097 let shellCmd = 2098 (if shell = "ssh" then 2099 Prefs.read sshCmd 2100 else 2101 shell) in 2102 let shellCmdArgs = 2103 (if shell = "ssh" then 2104 Prefs.read sshargs 2105 else 2106 "") in 2107 let preargs = 2108 ([shellCmd]@userArgs@portArgs@ 2109 [host]@ 2110 (if shell="ssh" then ["-e none"] else [])@ 2111 [shellCmdArgs;remoteCmd]) in 2112 (* Split compound arguments at space chars, to make 2113 create_process happy *) 2114 let args = 2115 Safelist.concat 2116 (Safelist.map (fun s -> Util.splitIntoWords s ' ') preargs) in 2117 let argsarray = Array.of_list args in 2118 let (i1,o1) = Lwt_unix.pipe_out ~cloexec:true () in 2119 let (i2,o2) = Lwt_unix.pipe_in ~cloexec:true () in 2120 (* We need to make sure that there is only one reader and one 2121 writer by pipe, so that, when one side of the connection 2122 dies, the other side receives an EOF or a SIGPIPE. *) 2123 debug (fun ()-> Util.msg "Shell connection: %s (%s)\n" 2124 shellCmd (String.concat ", " args)); 2125 let (term,pid) = 2126 Terminal.create_session shellCmd argsarray i1 o2 Unix.stderr in 2127 (* after terminal interact, remember to close i1 and o2 *) 2128 Some(i1,i2,o1,o2,s,term,clroot,pid) 2129 2130 let openConnectionPrompt = function 2131 (i1,i2,o1,o2,s,Some fdTerm,clroot,pid) -> 2132 let x = Terminal.termInput fdTerm i2 in 2133 x 2134 | _ -> None 2135 2136 let openConnectionReply = function 2137 (i1,i2,o1,o2,s,Some fdTerm,clroot,pid) -> 2138 (fun response -> 2139 (* FIX: should loop until everything is written... *) 2140 ignore (Lwt_unix.run (Lwt_unix.write (snd fdTerm) (Bytes.of_string (response ^ "\n")) 0 2141 (String.length response + 1)))) 2142 | _ -> (fun _ -> ()) 2143 2144 let openConnectionEnd (i1,i2,o1,o2,s,fdopt,clroot,pid) = 2145 Unix.close i1; Unix.close o2; 2146 let cleanup () = 2147 try Terminal.close_session pid with Unix.Unix_error _ -> () 2148 in 2149 Lwt_unix.run 2150 (initConnection ~cleanup i2 o1 >>= fun ioServer -> 2151 canonizeOnServer ioServer s >>= fun (host, fspath) -> 2152 ClientConn.register clroot (Common.Remote host, fspath) ioServer; 2153 Lwt.return ()) 2154 2155 let openConnectionCancel (i1,i2,o1,o2,s,fdopt,clroot,pid) = 2156 (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ()); 2157 (try Unix.close i1 with Unix.Unix_error _ -> ()); 2158 (try Lwt_unix.close i2 with Unix.Unix_error _ -> ()); 2159 (try Lwt_unix.close o1 with Unix.Unix_error _ -> ()); 2160 (try Unix.close o2 with Unix.Unix_error _ -> ()); 2161 (try Terminal.close_session pid with Unix.Unix_error _ -> ()) 2162 2163 (****************************************************************************) 2164 (* SERVER-MODE COMMAND PROCESSING LOOP *) 2165 (****************************************************************************) 2166 2167 let checkClientVersion conn () = 2168 let reply msg = sendHandshakeMsg conn msg in 2169 (* FIX: In future when gaining the ability to close connections from server 2170 side, make errors close the connection, not just send to client. *) 2171 let error = sendHandshakeErr conn in 2172 receiveHandshakeData conn rpcVersionTag >>= function 2173 | Error msg -> 2174 error ("Could not negotiate RPC version. " 2175 ^ "Received unexpected error from the client: \"" ^ msg ^ "\"") 2176 | Unknown fromClient -> 2177 error ("Could not negotiate RPC version. " 2178 ^ "Received unexpected header from the client: \"" 2179 ^ String.escaped (fromClient 2180 ^ Bytes.to_string (peekWithoutBlocking conn.inputBuffer)) ^ "\"") 2181 | Data buf -> 2182 match parseVersion "client" buf with 2183 | Some clientVer -> 2184 if verIsSupported clientVer then begin 2185 setConnectionVersion conn clientVer; 2186 reply Ok 2187 end else 2188 error ("Client RPC version not supported. " 2189 ^ "Version received from client: \"" 2190 ^ string_of_int clientVer ^ "\". " 2191 ^ "Supported RPC versions: " ^ rpcSupportedVersionStr) 2192 | None -> Lwt.return () 2193 | exception Util.Transient e -> error e 2194 2195 (****) 2196 2197 let showWarningOnClient = 2198 (registerServerCmd 2199 "showWarningOnClient" Umarshal.string Umarshal.unit 2200 (fun _ str -> Lwt.return (Util.warn str))) 2201 2202 let forwardMsgToClient = 2203 (registerServerCmd 2204 "forwardMsgToClient" Trace.mmsg Umarshal.unit 2205 (fun _ str -> (*msg "forwardMsgToClient: %s\n" str; *) 2206 Lwt.return (Trace.displayMessageLocally str))) 2207 2208 (* Compatibility mode for 2.51 clients. *) 2209 let compatServerInit mode conn = 2210 let compatConnectionHeader = 2211 match mode with 2212 | Some "2.48" -> compat248ConnectionHeader 2213 | _ -> compatConnectionHeader 2214 in 2215 dump conn [(Bytearray.of_string compatConnectionHeader, 0, 2216 String.length compatConnectionHeader)] >>= fun () -> 2217 (* Send the magic string to notify new clients *) 2218 dumpUrgent conn magic >>= fun () -> 2219 (* Must enable flow control because that is the default for 2.51. 2220 This must be done after dumpUrgent above to ensure that the write 2221 token is sent the last. *) 2222 enableFlowControl conn true >>= fun () -> 2223 (* Let's see if the client noticed the magic string. This is 2224 a no-op for old clients. *) 2225 checkForMagicString conn 2226 2227 let compatServerRun conn = 2228 (* Set the local warning printer to make an RPC to the client and 2229 show the warning there; ditto for the message printer *) 2230 Util.warnPrinter := 2231 Some (fun str -> Lwt_unix.run (showWarningOnClient conn str)); 2232 Trace.messageForwarder := 2233 Some (fun str -> Lwt_unix.run (forwardMsgToClient conn str)); 2234 receive conn >>= 2235 Lwt.wait 2236 2237 (* This function loops, waits for commands, and passes them to 2238 the relevant functions. *) 2239 let commandLoop ~compatMode in_ch out_ch = 2240 Trace.runningasserver := true; 2241 (* Send header indicating to the client that it has successfully 2242 connected to the server *) 2243 let conn = makeConnection true in_ch out_ch in 2244 Lwt.catch 2245 (fun () -> 2246 (if compatMode <> None then 2247 let () = setConnectionVersion conn 0 in 2248 compatServerInit compatMode conn >>= (fun upgrade -> 2249 if upgrade then begin 2250 (* Restore the state before starting protocol negotiation *) 2251 allowWrites conn.outputQueue; 2252 disableFlowControl conn.outputQueue 2253 end; 2254 Lwt.return upgrade) 2255 else 2256 Lwt.return true) >>= fun upgrade -> 2257 debug (fun () -> Util.msg "%sGoing to attempt RPC version upgrade\n" 2258 (if upgrade then "" else "NOT ")); 2259 if not upgrade then 2260 compatServerRun conn 2261 else 2262 sendStrings conn [connectionHeader; rpcVersionsStr] >>= 2263 checkClientVersion conn >>= fun () -> 2264 (* From this moment forward, the RPC version has been selected. All 2265 communication must now adhere to that version's specification. *) 2266 (* Flow control was disabled for RPC version handshake. Enable it 2267 for flow control negotiation. *) 2268 enableFlowControl conn true >>= (fun () -> 2269 (* Set the local warning printer to make an RPC to the client and 2270 show the warning there; ditto for the message printer *) 2271 Util.warnPrinter := 2272 Some (fun str -> Lwt_unix.run (showWarningOnClient conn str)); 2273 Trace.messageForwarder := 2274 Some (fun str -> Lwt_unix.run (forwardMsgToClient conn str)); 2275 receive conn >>= 2276 Lwt.wait)) 2277 (fun e -> 2278 match e with 2279 Util.Fatal "Lost connection with the server" -> 2280 debug (fun () -> Util.msg "Connection closed by the client\n"); 2281 (* We prevent new writes and wait for any current write to 2282 terminate. As we don't have a good way to wait for the 2283 writer to terminate, we just yield a bit. *) 2284 let rec wait n = 2285 if n = 0 then Lwt.return () else begin 2286 Lwt_unix.yield () >>= fun () -> 2287 wait (n - 1) 2288 end 2289 in 2290 conn.outputBuffer.opened <- false; 2291 wait 10 2292 | _ -> 2293 Lwt.fail e) 2294 2295 let killServer = 2296 Prefs.createBool "killserver" false 2297 ~category:(`Advanced `Remote) 2298 "kill server when done (even when using sockets)" 2299 ("When set to \\verb|true|, this flag causes Unison to kill the remote " 2300 ^ "server process when the synchronization is finished. This behavior " 2301 ^ "is the default for \\verb|ssh| connections, so this preference is not " 2302 ^ "normally needed when running over \\verb|ssh|; it is provided so " 2303 ^ "that socket-mode servers can be killed off after a single run of " 2304 ^ "Unison, rather than waiting to accept future connections. (Some " 2305 ^ "users prefer to start a remote socket server for each run of Unison, " 2306 ^ "rather than leaving one running all the time.)") 2307 2308 (* For backward compatibility *) 2309 let _ = Prefs.alias killServer "killServer" 2310 2311 (* FIX: This code should be removed when removing 2.51-compatibility code. *) 2312 let is248Exe = 2313 let exeName = Filename.basename (Sys.executable_name) in 2314 String.length exeName >= 11 && String.sub exeName 0 11 = "unison-2.48" 2315 2316 let rec accept_retry l = 2317 Lwt.catch 2318 (fun () -> Lwt_unix.accept l) 2319 (function 2320 (* Temporary and connection-specific errors *) 2321 | Unix.Unix_error (Unix.ECONNABORTED, _, _) 2322 | Unix.Unix_error (Unix.EPERM, _, _) (* Linux firewall *) 2323 | Unix.Unix_error 2324 (* Resource exhaustion: could be considered temporary *) 2325 (Unix.(EMFILE | ENFILE | ENOBUFS | ENOMEM), _, _) 2326 (* Linux curiosity: accept(2) may return errors on the new socket *) 2327 | Unix.Unix_error (Unix.ENETUNREACH, _, _) 2328 | Unix.Unix_error (Unix.EHOSTUNREACH, _, _) 2329 | Unix.Unix_error (Unix.ENETDOWN, _, _) 2330 | Unix.Unix_error (Unix.EHOSTDOWN, _, _) 2331 | Unix.Unix_error (Unix.ETIMEDOUT, _, _) as e -> 2332 let errmsg = match e with 2333 | Unix.Unix_error (err, _, _) -> Unix.error_message err 2334 | _ -> Printexc.to_string e in 2335 Util.msg "server: continuing after receiving an error \ 2336 when accepting client connection: %s\n" errmsg; 2337 accept_retry l 2338 (* Permanent errors *) 2339 | e -> Lwt.fail e) 2340 2341 (* Used by the socket mechanism: Create a socket on portNum and wait 2342 for a request. Each request is processed by commandLoop. When a 2343 session finishes, the server waits for another request. *) 2344 let waitOnPort hosts port = 2345 Util.convertUnixErrorsToFatal "waiting on port" 2346 (fun () -> 2347 let hosts = match hosts with [] -> [""] | _ -> hosts in 2348 let listening = Lwt_unix.run (buildListenSocket hosts port) in 2349 let accepting = Array.make (Safelist.length listening) None in 2350 let rec accept i l = 2351 match accepting.(i) with 2352 | None -> 2353 let st = accept_retry l >>= fun s -> Lwt.return (i, s) in 2354 let () = accepting.(i) <- Some st in 2355 st 2356 | Some st -> st 2357 and serve (i, s) = accepting.(i) <- None; setKeepalive s; s 2358 and setKeepalive = function 2359 | (_, Unix.ADDR_UNIX _) -> () 2360 | (c, ADDR_INET _) -> Lwt_unix.setsockopt c Unix.SO_KEEPALIVE true 2361 in 2362 Util.msg "server started\n"; 2363 let rec handleClients () = 2364 let (connected, _) = 2365 serve @@ Lwt_unix.run (Lwt.choose (List.mapi accept listening)) 2366 in 2367 registerIOClose (connected, connected) (fun () -> doCleanup connected); 2368 begin try 2369 (* Accept a connection *) 2370 let compatMode = Some (if is248Exe then "2.48" else "2.51") in 2371 Lwt_unix.run (commandLoop ~compatMode connected connected) 2372 with Util.Fatal "Lost connection with the server" -> () end; 2373 (* The client has closed its end of the connection *) 2374 if not (Prefs.read killServer) then handleClients () 2375 and doCleanup socket = 2376 begin try Lwt_unix.close socket with Unix.Unix_error _ -> () end; 2377 if not (Prefs.read killServer) then runConnCloseHandlers true 2378 in 2379 try 2380 Sys.catch_break true; 2381 handleClients (); 2382 unixSocketCleanup () 2383 with 2384 | Sys.Break -> 2385 unixSocketCleanup () 2386 | (Util.Fatal _ | Unix.Unix_error _) as e -> 2387 unixSocketCleanup (); 2388 raise e 2389 ) 2390 2391 let beAServer () = 2392 begin try 2393 let home = System.getenv "HOME" in 2394 Util.convertUnixErrorsToFatal 2395 "changing working directory" 2396 (fun () -> System.chdir home) 2397 with Not_found -> 2398 Util.msg 2399 "Environment variable HOME unbound: \ 2400 executing server in current directory\n" 2401 end; 2402 (* Let's start with 2.51-compatibility mode. Newer clients will add 2403 a special override keyword in server args that will disable the 2404 compatibility mode. 2405 2406 FIX: It is a bit of a hack, so better not make it permanent. 2407 It was added in 2021 and should be removed after a couple of years. *) 2408 let compatMode = 2409 try 2410 not (Prefs.scanCmdLine "" |> Util.StringMap.find "rest" 2411 |> Safelist.mem rpcServerCmdlineOverride) 2412 with Not_found -> true 2413 in 2414 (* Additionally, do a best effort emulation of 2.48. 2415 FIX: remove together with code above. *) 2416 let compatMode = 2417 match compatMode with 2418 | true when is248Exe -> Some "2.48" 2419 | true -> Some "2.51" 2420 | false -> None 2421 in 2422 begin end; 2423 Lwt_unix.run 2424 (commandLoop ~compatMode 2425 (Lwt_unix.of_unix_file_descr Unix.stdin) 2426 (Lwt_unix.of_unix_file_descr Unix.stdout))