unison

Fork of Unison, a bi-directional file synchronization tool
git clone git://git.laack.co/unison.git
Log | Files | Refs | README | LICENSE

remote.ml (91859B)


      1 (* Unison file synchronizer: src/remote.ml *)
      2 (* Copyright 1999-2020, Benjamin C. Pierce
      3 
      4     This program is free software: you can redistribute it and/or modify
      5     it under the terms of the GNU General Public License as published by
      6     the Free Software Foundation, either version 3 of the License, or
      7     (at your option) any later version.
      8 
      9     This program is distributed in the hope that it will be useful,
     10     but WITHOUT ANY WARRANTY; without even the implied warranty of
     11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12     GNU General Public License for more details.
     13 
     14     You should have received a copy of the GNU General Public License
     15     along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 *)
     17 
     18 let (>>=) = Lwt.bind
     19 
     20 let debug = Trace.debug "remote"
     21 let debugV = Trace.debug "remote_emit+"
     22 let debugE = Trace.debug "remote+"
     23 let debugT = Trace.debug "remote+"
     24 
     25 (* BCP: The previous definitions of the last two were like this:
     26      let debugE = Trace.debug "remote_emit"
     27      let debugT = Trace.debug "thread"
     28    But that resulted in huge amounts of output from '-debug all'.
     29 *)
     30 
     31 let _ =
     32   if Sys.unix || Sys.cygwin then
     33     ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore)
     34 
     35 (*
     36    Flow-control mechanism (only active under Windows).
     37    Only one side is allowed to send messages at any given time.
     38    Once it has finished sending messages, a special message is sent
     39    meaning that the destination is now allowed to send messages.
     40 
     41    Threads behave in a very controlled way: they only perform possibly
     42    blocking I/Os through the remote module, and never call
     43    Lwt_unix.yield.  This mean that when one side gives up its right to
     44    write, we know that no matter how long we wait, it will not have
     45    anything to write.  This ensures that there is no deadlock.
     46    A more robust protocol would be to give up write permission
     47    whenever idle (not just after having sent at least one message).
     48    But then, there is the risk that the two sides exchange spurious
     49    messages.
     50 *)
     51 
     52 (****)
     53 
     54 let intSize = 5
     55 
     56 let intHash x = ((x * 791538121) lsr 23 + 17) land 255
     57 
     58 let encodeInt m =
     59   let int_buf = Bytearray.create intSize in
     60   int_buf.{0} <- Char.chr ( m         land 0xff);
     61   int_buf.{1} <- Char.chr ((m lsr 8)  land 0xff);
     62   int_buf.{2} <- Char.chr ((m lsr 16) land 0xff);
     63   int_buf.{3} <- Char.chr ((m lsr 24) land 0xff);
     64   int_buf.{4} <- Char.chr (intHash m);
     65   (int_buf, 0, intSize)
     66 
     67 let decodeInt int_buf i =
     68   let b0 = Char.code (int_buf.{i + 0}) in
     69   let b1 = Char.code (int_buf.{i + 1}) in
     70   let b2 = Char.code (int_buf.{i + 2}) in
     71   let b3 = Char.code (int_buf.{i + 3}) in
     72   let m = (b3 lsl 24) lor (b2 lsl 16) lor (b1 lsl 8) lor b0 in
     73   if Char.code (int_buf.{i + 4}) <> intHash m then
     74     raise (Util.Fatal
     75              "Protocol error: corrupted message received;\n\
     76               if it happens to you in a repeatable way, \n\
     77               please post a report on the unison-users mailing list.");
     78   m
     79 
     80 (*************************************************************************)
     81 (*                           LOW-LEVEL IO                                *)
     82 (*************************************************************************)
     83 
     84 let ioCleanups = ref []
     85 
     86 let registerIOClose io f =
     87   ioCleanups := (io, f) :: !ioCleanups
     88 
     89 let lostConnectionHandler ch =
     90   let aux ((i, o), f) =
     91     if i = ch || o = ch then begin
     92       f ();
     93       false (* Each handler is run only once *)
     94     end else
     95       true
     96   in
     97   ioCleanups := Safelist.filter aux !ioCleanups
     98 
     99 let lostConnection ch =
    100   begin try lostConnectionHandler ch with _ -> () end;
    101   Lwt.fail (Util.Fatal "Lost connection with the server")
    102 
    103 let catchIoErrors ch th =
    104   Lwt.catch th
    105     (fun e ->
    106        match e with
    107          Unix.Unix_error(Unix.ECONNRESET, _, _)
    108        | Unix.Unix_error(Unix.EPIPE, _, _)
    109        | Unix.Unix_error(Unix.ETIMEDOUT, _, _)
    110        | Unix.Unix_error(Unix.EACCES, _, _)        (* Linux firewall *)
    111          (* Windows may also return the following errors... *)
    112        | Unix.Unix_error(Unix.EINVAL, _, _) (* ... and Linux firewall *)
    113        | Unix.Unix_error(Unix.EUNKNOWNERR (-64), _, _)
    114                          (* ERROR_NETNAME_DELETED *)
    115        | Unix.Unix_error(Unix.EUNKNOWNERR (-233), _, _)
    116                          (* ERROR_PIPE_NOT_CONNECTED *)
    117        | Unix.Unix_error(Unix.EUNKNOWNERR (-1236), _, _)
    118                          (* ERROR_CONNECTION_ABORTED *)
    119          (* The following may indicate a programming error, but even if that's
    120             the case, let's be graceful about it rather than crash the server.
    121             (Seen happening on the socket server when the connection is broken
    122             before the client has read the updates sent by server.) *)
    123        | Unix.Unix_error(Unix.EBADF, _, _)
    124          (* The following errors _may_ be temporary but we don't know if
    125             they are or for how long they will persist. We also don't have
    126             a way to retry and there is no guarantee that the socket remains
    127             in a usable state, so treat all these as permanent failures
    128             breaking the connection. *)
    129        | Unix.Unix_error(Unix.ENETUNREACH, _, _)
    130        | Unix.Unix_error(Unix.EHOSTUNREACH, _, _)
    131        | Unix.Unix_error(Unix.ENETDOWN, _, _)
    132        | Unix.Unix_error(Unix.EHOSTDOWN, _, _)
    133        | Unix.Unix_error(Unix.ENETRESET, _, _) ->
    134          (* Client has closed its end of the connection *)
    135            lostConnection ch
    136        | _ ->
    137            Lwt.fail e)
    138 
    139 (****)
    140 
    141 let receivedBytes = ref 0.
    142 let emittedBytes = ref 0.
    143 
    144 (****)
    145 
    146 (* I/O buffers *)
    147 
    148 type ioBuffer =
    149   { channel : Lwt_unix.file_descr;
    150     buffer : bytes;
    151     mutable length : int;
    152     mutable opened : bool }
    153 
    154 let bufferSize = 16384
    155 (* No point in making this larger, as the Ocaml Unix library uses a
    156    buffer of this size *)
    157 
    158 let makeBuffer ch =
    159   { channel = ch; buffer = Bytes.create bufferSize;
    160     length = 0; opened = true }
    161 
    162 (****)
    163 
    164 (* Low-level inputs *)
    165 
    166 let fillInputBuffer conn =
    167   assert (conn.length = 0);
    168   catchIoErrors conn.channel
    169     (fun () ->
    170        Lwt_unix.read conn.channel conn.buffer 0 bufferSize >>= fun len ->
    171        debugV (fun() ->
    172          if len = 0 then
    173            Util.msg "grab: EOF\n"
    174          else
    175            Util.msg "grab: %s\n"
    176              (String.escaped (Bytes.sub_string conn.buffer 0 len)));
    177        if len = 0 then
    178          lostConnection conn.channel
    179        else begin
    180          receivedBytes := !receivedBytes +. float len;
    181          conn.length <- len;
    182          Lwt.return ()
    183        end)
    184 
    185 let rec grabRec conn s pos len =
    186   if conn.length = 0 then begin
    187     fillInputBuffer conn >>= fun () ->
    188     grabRec conn s pos len
    189   end else begin
    190     let l = min (len - pos) conn.length in
    191     Bytearray.blit_from_bytes conn.buffer 0 s pos l;
    192     conn.length <- conn.length - l;
    193     if conn.length > 0 then
    194       Bytes.blit conn.buffer l conn.buffer 0 conn.length;
    195     if pos + l < len then
    196       grabRec conn s (pos + l) len
    197     else
    198       Lwt.return ()
    199   end
    200 
    201 let grab conn s len =
    202   assert (len > 0);
    203   assert (Bytearray.length s <= len);
    204   grabRec conn s 0 len
    205 
    206 let peekWithoutBlocking conn =
    207   Bytes.sub conn.buffer 0 conn.length
    208 
    209 let peekWithBlocking conn =
    210   (if conn.length = 0 then begin
    211     fillInputBuffer conn
    212   end else
    213     Lwt.return ()) >>= fun () ->
    214   Lwt.return (peekWithoutBlocking conn)
    215 
    216 (****)
    217 
    218 (* Low-level outputs *)
    219 
    220 let rec sendOutput conn =
    221   catchIoErrors conn.channel
    222     (fun () ->
    223        begin if conn.opened then
    224          Lwt_unix.write conn.channel conn.buffer 0 conn.length
    225        else
    226          Lwt.return conn.length
    227        end >>= fun len ->
    228        debugV (fun() ->
    229          Util.msg "dump: %s\n"
    230            (String.escaped (Bytes.sub_string conn.buffer 0 len)));
    231        emittedBytes := !emittedBytes +. float len;
    232        conn.length <- conn.length - len;
    233        if conn.length > 0 then
    234          Bytes.blit
    235            conn.buffer len conn.buffer 0 conn.length;
    236        Lwt.return ())
    237 
    238 let rec fillBuffer2 conn s pos len =
    239   if conn.length = bufferSize then
    240     sendOutput conn >>= fun () ->
    241     fillBuffer2 conn s pos len
    242   else begin
    243     let l = min (len - pos) (bufferSize - conn.length) in
    244     Bytearray.blit_to_bytes s pos conn.buffer conn.length l;
    245     conn.length <- conn.length + l;
    246     if pos + l < len then
    247       fillBuffer2 conn s (pos + l) len
    248     else
    249       Lwt.return ()
    250   end
    251 
    252 let rec fillBuffer conn l =
    253   match l with
    254     (s, pos, len) :: rem ->
    255       assert (pos >= 0);
    256       assert (len >= 0);
    257       assert (pos <= Bytearray.length s - len);
    258       fillBuffer2 conn s pos len >>= fun () ->
    259       fillBuffer conn rem
    260   | [] ->
    261       Lwt.return ()
    262 
    263 let rec flushBuffer conn =
    264   if conn.length > 0 then
    265     sendOutput conn >>= fun () ->
    266     flushBuffer conn
    267   else
    268     Lwt.return ()
    269 
    270 (****)
    271 
    272 (* Output scheduling *)
    273 
    274 type kind = Normal | Idle | Last | Urgent
    275 
    276 type outputQueue =
    277   { mutable available : bool;
    278     mutable canWrite : bool;
    279     mutable flowControl : bool;
    280     writes : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
    281     urgentWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
    282     idleWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
    283     flush : outputQueue -> unit Lwt.t }
    284 
    285 let rec performOutputRec q (kind, action, res) =
    286   action () >>= fun () ->
    287   Lwt.wakeup res ();
    288   popOutputQueues q
    289 
    290 and popOutputQueues q =
    291   if not (Queue.is_empty q.urgentWrites) then
    292     performOutputRec q (Queue.take q.urgentWrites)
    293   else if not (Queue.is_empty q.writes) && q.canWrite then
    294     performOutputRec q (Queue.take q.writes)
    295   else if not (Queue.is_empty q.idleWrites) && q.canWrite then
    296     performOutputRec q (Queue.take q.idleWrites)
    297   else begin
    298     q.available <- true;
    299     (* Flush asynchronously the output *)
    300     Lwt.ignore_result (q.flush q);
    301     Lwt.return ()
    302   end
    303 
    304 (* Perform an output action in an atomic way *)
    305 let performOutput q kind action =
    306   if q.available && (kind = Urgent || q.canWrite) then begin
    307     q.available <- false;
    308     performOutputRec q (kind, action, Lwt.wait ())
    309   end else begin
    310     let res = Lwt.wait () in
    311     Queue.add (kind, action, res)
    312       (match kind with
    313          Urgent -> q.urgentWrites
    314        | Normal -> q.writes
    315        | Idle   -> q.idleWrites
    316        | Last   -> assert false);
    317     res
    318   end
    319 
    320 let allowWrites q =
    321   assert (not q.canWrite);
    322   q.canWrite <- true;
    323   q.available <- false;
    324   (* We yield to let the receiving thread restart and to let some time
    325      to the requests to be processed *)
    326   Lwt.ignore_result (Lwt_unix.yield () >>= fun () -> popOutputQueues q)
    327 
    328 let disableFlowControl q =
    329   q.flowControl <- false;
    330   if not q.canWrite then allowWrites q
    331 
    332 let outputQueueIsEmpty q = q.available
    333 
    334 (* Setup IO with flow control initially disabled, to do the RPC version
    335    handshake. Flow control is part of RPC protocol and must be enabled
    336    only after RPC version handshake is complete. *)
    337 let makeOutputQueue isServer flush =
    338   { available = true; canWrite = true; flowControl = false;
    339     writes = Queue.create (); urgentWrites = Queue.create ();
    340     idleWrites = Queue.create ();
    341     flush = flush }
    342 
    343 (****)
    344 
    345 (* IMPORTANT: the RPC version must be increased when the RPC mechanism itself
    346    changes in a breaking way. Changes on the API level (functions and data
    347    types) normally do not cause a breaking change at the RPC level. *)
    348 (* Version 0 is special in that it must not be listed as a supported version.
    349    It is used for 2.51-compatibility mode and is never negotiated. *)
    350 (* Supported RPC versions should be ordered from newest to oldest. *)
    351 let rpcSupportedVersions = [1]
    352 let rpcDefaultVersion = Safelist.hd rpcSupportedVersions
    353 
    354 let rpcSupportedVersionStr =
    355   String.concat ", "
    356     (Safelist.map (fun v -> "\"" ^ string_of_int v ^ "\"")
    357        rpcSupportedVersions)
    358 
    359 let rpcSupportedVersionStrHdr =
    360   String.concat " "
    361     (Safelist.map (fun v -> string_of_int v)
    362        rpcSupportedVersions)
    363 
    364 (* FIX: Added in 2021. Should be removed after a couple of years. *)
    365 let rpcServerCmdlineOverride = "__new-rpc-mode"
    366 
    367 (****)
    368 
    369 type connection =
    370   { mutable version : int;
    371     inputBuffer : ioBuffer;
    372     outputBuffer : ioBuffer;
    373     outputQueue : outputQueue }
    374 
    375 let maybeFlush pendingFlush q buf =
    376   (* We return immediately if a flush is already scheduled, or if the
    377      output buffer is already empty. *)
    378   (* If we are doing flow control and we can write, we need to send
    379      a write token even when the buffer is empty. *)
    380   if
    381     !pendingFlush || (buf.length = 0 && not (q.flowControl && q.canWrite))
    382   then
    383     Lwt.return ()
    384   else begin
    385     pendingFlush := true;
    386     (* Wait a bit, in case there are some new requests being processed *)
    387     Lwt_unix.yield () >>= fun () ->
    388     pendingFlush := false;
    389     (* If there are other writes scheduled, we do not flush yet *)
    390     if outputQueueIsEmpty q then begin
    391       performOutput q Last
    392         (fun () ->
    393            if q.flowControl then begin
    394              debugE (fun() -> Util.msg "Sending write token\n");
    395              q.canWrite <- false;
    396              fillBuffer buf [encodeInt 0] >>= fun () ->
    397              flushBuffer buf
    398            end else
    399              flushBuffer buf) >>= fun () ->
    400       Lwt.return ()
    401     end else
    402       Lwt.return ()
    403   end
    404 
    405 let makeConnection isServer inCh outCh =
    406   let pendingFlush = ref false in
    407   let outputBuffer = makeBuffer outCh in
    408   { version = rpcDefaultVersion;
    409     inputBuffer = makeBuffer inCh;
    410     outputBuffer = outputBuffer;
    411     outputQueue =
    412       makeOutputQueue isServer
    413         (fun q -> maybeFlush pendingFlush q outputBuffer) }
    414 
    415 let closeConnection conn =
    416   begin try Lwt_unix.close conn.inputBuffer.channel with Unix.Unix_error _ -> () end;
    417   begin try Lwt_unix.close conn.outputBuffer.channel with Unix.Unix_error _ -> () end;
    418   conn.outputBuffer.opened <- false
    419 
    420 let connectionIO conn =
    421   (conn.inputBuffer.channel, conn.outputBuffer.channel)
    422 
    423 let setConnectionVersion conn ver =
    424   conn.version <- ver
    425 
    426 let connectionVersion conn = conn.version
    427 
    428 let connEq conn conn' =
    429   conn.inputBuffer.channel = conn'.inputBuffer.channel
    430     && conn.outputBuffer.channel = conn'.outputBuffer.channel
    431 
    432 let connNeq conn conn' = not (connEq conn conn')
    433 
    434 (* Send message [l] *)
    435 let dump conn l =
    436   performOutput
    437     conn.outputQueue Normal (fun () -> fillBuffer conn.outputBuffer l)
    438 
    439 (* Send message [l] when idle *)
    440 let dumpIdle conn l =
    441   performOutput
    442     conn.outputQueue Idle (fun () -> fillBuffer conn.outputBuffer l)
    443 
    444 (* Send message [l], even if write are disabled.  This is used for
    445    aborting rapidly a stream.  This works as long as only one small
    446    message is written at a time (the write will succeed as the pipe
    447    will not be full) *)
    448 let dumpUrgent conn l =
    449   performOutput conn.outputQueue Urgent
    450     (fun () ->
    451        fillBuffer conn.outputBuffer l >>= fun () ->
    452        flushBuffer conn.outputBuffer)
    453 
    454 let enableFlowControl conn isServer =
    455   let rec waitDrain () =
    456     if not isServer && conn.outputBuffer.length > 0 then
    457       Lwt_unix.yield () >>= waitDrain
    458     else
    459       Lwt.return ()
    460   in
    461   let q = conn.outputQueue in
    462   q.available <- false;
    463   waitDrain () >>= fun () ->
    464   q.flowControl <- true;
    465   q.canWrite <- isServer;
    466   if q.canWrite then
    467     popOutputQueues q >>= Lwt_unix.yield >>= fun () ->
    468     Lwt.return ()
    469   else
    470     Lwt.return ()
    471 
    472 (****)
    473 
    474 let connectionCheck = ref None
    475 
    476 let checkConnection ioServer =
    477   connectionCheck := Some ioServer;
    478   (* Poke on the socket to trigger an error if connection has been lost. *)
    479   Lwt_unix.run (
    480     (if Sys.win32 then Lwt.return 0 else
    481     Lwt_unix.read ioServer.inputBuffer.channel ioServer.inputBuffer.buffer 0 0)
    482     (* Try to make sure connection cleanup, if necessary, has finished
    483        before returning.
    484        Since there is no way to reliably detect when other threads have
    485        finished, we just yield a bit (the same comments apply as in
    486        commandLoop). *)
    487     >>= fun _ ->
    488     let rec wait n =
    489       if n = 0 then Lwt.return () else begin
    490         Lwt_unix.yield () >>= fun () ->
    491         wait (n - 1)
    492       end
    493     in
    494     wait 10);
    495   connectionCheck := None
    496 
    497 let isConnectionCheck conn =
    498   match !connectionCheck with
    499   | None -> false
    500   | Some conn' -> connEq conn conn'
    501 
    502 (* Due to [Common.root] currently excluding important details present in
    503    [Clroot.clroot], there is a 1:N mapping between a [root] and a [clroot].
    504    For example, a [clroot] pointing to the same host but a different
    505    protocol or user or port will be mapped to the same [root] as long as
    506    the root fspaths are the same.
    507    It is currently (Oct 2022) not seen as a critical issue to fix.
    508    The code previously used to index connections just by the canonical host
    509    name, ignoring all other details, including the root fspath. That code
    510    was in place for over 20 years and did not seem to cause any issues.
    511    The current code is safer than the previous code... *)
    512 module ClientConn = struct
    513   type t =
    514     { clroot : Clroot.clroot;
    515       root : Common.root;
    516       conn : connection }
    517       (* Never do polymorphic comparisons with [connection]! *)
    518 
    519   (* The replica path of a root is not a relevant detail for a connection;
    520      on the contrary, it must not impact the connection. Instead of creating
    521      a new type for connection tracking, for now, just exclude the path in
    522      clroot comparisons. Eventually, this could/should be solved in a better
    523      way: see the comment above about [Common.root] not being sufficiently
    524      detailed. *)
    525   let clrootEq clroot clroot' =
    526     match clroot, clroot' with
    527     | Clroot.ConnectByShell (shell, host, user, port, _),
    528       Clroot.ConnectByShell (shell', host', user', port', _) ->
    529         shell = shell' && host = host' && user = user' && port = port'
    530     | ConnectBySocket (host, port, _),
    531       ConnectBySocket (host', port', _) ->
    532         host = host' && port = port'
    533     | ConnectByShell _, _ | _, ConnectByShell _ -> false
    534     | ConnectBySocket _, _ | _, ConnectBySocket _ -> false
    535     | ConnectLocal _, ConnectLocal _ -> assert false
    536 
    537   let clrootNeq clroot clroot' = not (clrootEq clroot clroot')
    538 
    539   let connections = ref []
    540 
    541   let findByClroot clroot =
    542     Safelist.find (fun x -> clrootEq x.clroot clroot) !connections
    543 
    544   let findByRoot root = Safelist.find (fun x -> x.root = root) !connections
    545 
    546   let register clroot root conn =
    547     connections := { clroot; root; conn } ::
    548       Safelist.filter (fun x -> clrootNeq x.clroot clroot) !connections
    549 
    550   let unregister conn =
    551     connections := Safelist.filter (fun x -> connNeq x.conn conn) !connections
    552 
    553   let ofRoot root =
    554     try (findByRoot root).conn with
    555     | Not_found -> raise (Util.Fatal "No connection with the server")
    556 
    557   let ofRootOpt root =
    558     try Some (findByRoot root).conn with
    559     | Not_found -> None
    560 
    561   let canonRootOfClroot clroot =
    562     try Some (findByClroot clroot).root with
    563     | Not_found -> None
    564 
    565   let withConncheck find =
    566     try
    567       let conn = (find ()).conn in
    568       begin try
    569         checkConnection conn
    570       with
    571       | Unix.Unix_error (EBADF, _, _) -> (* Already closed *)
    572           unregister conn
    573         (* (All or most?) other exceptions should be caught by receiving and
    574            sending threads. If this does not happen (it also depends on the
    575            implementation of [Lwt_unix.run]) then trigger the cleanup here as
    576            the last resort. *)
    577       | Unix.Unix_error _ ->
    578           try
    579             lostConnection (fst (connectionIO conn)) |> ignore;
    580             unregister conn
    581           with e -> begin
    582             unregister conn;
    583             raise e
    584           end
    585       end;
    586       (* [find] _must_ be duplicated after [checkConnection]! *)
    587       Some (find ()).conn
    588     with Not_found ->
    589       None
    590 
    591   let ofRootConncheck root =
    592     withConncheck (fun () -> findByRoot root)
    593 
    594   let ofClrootConncheck clroot =
    595     withConncheck (fun () -> findByClroot clroot)
    596 
    597 end (* module ClientConn *)
    598 
    599 let connectionOfRoot root = ClientConn.ofRoot root
    600 
    601 (****)
    602 
    603 let atCloseHandlers = ref []
    604 
    605 let at_conn_close ?(only_server = false) f =
    606   atCloseHandlers := (only_server, f) :: !atCloseHandlers
    607 
    608 let runConnCloseHandlers isServer =
    609   Safelist.iter (fun (only_server, f) ->
    610     if not only_server || isServer then f ()) !atCloseHandlers
    611 
    612 let atConnCloseHandlers = ref []
    613 
    614 let at_conn_close' conn f =
    615   atConnCloseHandlers := (conn, f) :: !atConnCloseHandlers
    616 
    617 let runConnCloseHandlers' conn =
    618   atConnCloseHandlers := Safelist.filter (fun (c, f) ->
    619     if connEq c conn then (f (); false) else true) !atConnCloseHandlers
    620 
    621 let clientCloseCleanup () =
    622   runConnCloseHandlers false
    623 
    624 let clientConnClose conn =
    625   closeConnection conn;
    626   ClientConn.unregister conn;
    627   runConnCloseHandlers' conn;
    628   clientCloseCleanup ()
    629 
    630 let registerConnCleanup conn cleanup =
    631   registerIOClose (connectionIO conn) (fun () -> clientConnClose conn);
    632   match cleanup with
    633   | None -> ()
    634   | Some f -> at_conn_close' conn f
    635 
    636 let clientCloseRootConnection = function
    637   | (Common.Local, _) -> clientCloseCleanup ()
    638   | (Common.Remote _, _) as root ->
    639       begin match ClientConn.ofRootOpt root with
    640       | Some conn -> clientConnClose conn
    641       | None -> ()
    642       end
    643 
    644 (****)
    645 
    646 (* Implemented as a record to avoid polluting [Remote] namespace. If
    647    the number and complexity of functions grows in future then it's
    648    probably a good idea to extract this code into a separate module. *)
    649 type ('a, 'b, 'c) resourceC =
    650   { register : 'a -> 'a; release : 'a -> 'b; release_noerr : 'a -> 'c }
    651 let resourceWithConnCleanup close close_noerr =
    652   let h = Hashtbl.create 17 in
    653   let closeAll () =
    654     Hashtbl.iter (fun x _ -> ignore (close_noerr x)) h;
    655     Hashtbl.clear h
    656   in
    657   at_conn_close closeAll;
    658   let register x = Hashtbl.add h x true; x in
    659   let release x = Hashtbl.remove h x; close x in
    660   let release_noerr x = Hashtbl.remove h x; close_noerr x in
    661   { register; release; release_noerr }
    662 
    663 let lwtRegionWithConnCleanup sz =
    664   let reg = ref (Lwt_util.make_region sz) in
    665   let resetReg () =
    666     Lwt_util.purge_region !reg;
    667     (* The remaining threads should be collected by GC *)
    668     reg := Lwt_util.make_region sz
    669   in
    670   at_conn_close resetReg;
    671   reg
    672 
    673 (****)
    674 
    675 (* XXX *)
    676 module Thread = struct
    677 
    678   let unwindProtect f cleanup =
    679     Lwt.catch f
    680       (fun e ->
    681          match e with
    682            Util.Transient err | Util.Fatal err ->
    683              debugT
    684                (fun () ->
    685                   Util.msg
    686                     "Exception caught by Thread.unwindProtect: %s\n" err);
    687              Lwt.catch (fun () -> cleanup e) (fun e' ->
    688                Util.encodeException "Thread.unwindProtect" `Fatal e')
    689                  >>= (fun () ->
    690              Lwt.fail e)
    691          | _ ->
    692              Lwt.fail e)
    693 
    694 end
    695 
    696 (*****************************************************************************)
    697 (*                              MARSHALING                                   *)
    698 (*****************************************************************************)
    699 
    700 type tag = Bytearray.t
    701 
    702 type 'a marshalFunction = connection ->
    703   'a -> (Bytearray.t * int * int) list -> (Bytearray.t * int * int) list
    704 type 'a unmarshalFunction = connection -> Bytearray.t -> 'a
    705 type 'a marshalingFunctions = 'a marshalFunction * 'a unmarshalFunction
    706 
    707 type 'a convV0Fun =
    708   V0 : ('a -> 'compat) * ('compat -> 'a) -> 'a convV0Fun
    709 
    710 external id : 'a -> 'a = "%identity"
    711 let convV0_id = V0 (id, id)
    712 let convV0_id_pair = convV0_id, convV0_id
    713 
    714 let makeConvV0FunArg compat_to compat_from =
    715   (V0 (compat_to, compat_from)), convV0_id
    716 let makeConvV0FunRet compat_to compat_from =
    717   convV0_id, (V0 (compat_to, compat_from))
    718 let makeConvV0Funs compat_to compat_from compat_to2 compat_from2 =
    719   (V0 (compat_to, compat_from)), (V0 (compat_to2, compat_from2))
    720 
    721 let registeredSet = ref Util.StringSet.empty
    722 
    723 let rec first_chars len msg =
    724   match msg with
    725     [] ->
    726       ""
    727   | (s, p, l) :: rem ->
    728       if l < len then
    729         Bytearray.sub s p l ^ first_chars (len - l) rem
    730       else
    731         Bytearray.sub s p len
    732 
    733 let safeMarshal marshalPayload tag data rem =
    734   let (rem', length) = marshalPayload data rem in
    735   let l = Bytearray.length tag in
    736   debugE (fun() ->
    737             let start = first_chars (min length 10) rem' in
    738             let start = if length > 10 then start ^ "..." else start in
    739             let start = String.escaped start in
    740             Util.msg "send [%s] '%s' %d bytes\n"
    741               (Bytearray.to_string tag) start length);
    742   let len = l + length in
    743   if (len lsr 31) lsr 1 <> 0 then (* [encodeInt] can only encode 32 bits *)
    744     raise (Util.Fatal
    745             "Protocol error: message data too big. This may be a bug or it\n\
    746              may be that your replicas are huge and the amount of updates\n\
    747              can't be handled by the current protocol implementation. If you\n\
    748              believe it is a bug then please consider reporting it.\n\
    749              Otherwise, try reducing the amount of updates by syncing the\n\
    750              replicas in smaller steps (using the \"path\" preference, for\n\
    751              example). You may have to do this for the initial sync only.")
    752   else
    753     (encodeInt len :: (tag, 0, l) :: rem')
    754 
    755 let safeUnmarshal unmarshalPayload tag buf =
    756   let taglength = Bytearray.length tag in
    757   if Bytearray.prefix tag buf 0 then
    758     unmarshalPayload buf taglength
    759   else
    760     let identifier =
    761       String.escaped
    762         (Bytearray.sub buf 0 (min taglength (Bytearray.length buf))) in
    763     raise (Util.Fatal
    764              (Printf.sprintf "[safeUnmarshal] expected '%s' but got '%s'"
    765                 (String.escaped (Bytearray.to_string tag)) identifier))
    766 
    767 let registerTag string =
    768   if Util.StringSet.mem string !registeredSet then
    769     raise (Util.Fatal (Printf.sprintf "tag %s is already registered" string))
    770   else
    771     registeredSet := Util.StringSet.add string !registeredSet;
    772   Bytearray.of_string string
    773 
    774 let marshalV0 (V0 (to251, _)) data rem =
    775   let s = Bytearray.marshal (to251 data) [Marshal.No_sharing] in
    776   let l = Bytearray.length s in
    777   ((s, 0, l) :: rem, l)
    778 
    779 let unmarshalV0 (V0 (_, from251)) buf pos =
    780   try from251 (Bytearray.unmarshal buf pos)
    781   with Failure s -> raise (Util.Fatal (Printf.sprintf
    782 "Fatal error during unmarshaling (%s),
    783 possibly because client and server have been compiled with different \
    784 versions of the OCaml compiler." s))
    785 
    786 let marshalV1 m data rem =
    787   let s = Umarshal.marshal_to_bytearray m data in
    788   let l = Bytearray.length s in
    789   ((s, 0, l) :: rem, l)
    790 
    791 let unmarshalV1 m buf pos =
    792   try Umarshal.unmarshal_from_bytearray m buf pos
    793   with Failure s | Umarshal.Error s -> raise (Util.Fatal (Printf.sprintf
    794 "Fatal error during unmarshaling (%s)" s))
    795 
    796 let defaultMarshalingFunctions convV0 m =
    797   (fun conn -> if conn.version = 0 then marshalV0 convV0 else marshalV1 m),
    798   (fun conn -> if conn.version = 0 then unmarshalV0 convV0 else unmarshalV1 m)
    799 
    800 let makeMarshalingFunctions payloadMarshalingFunctions string =
    801   let (marshalPayload, unmarshalPayload) = payloadMarshalingFunctions in
    802   let tag = registerTag string in
    803   let marshal conn (data : 'a) rem = safeMarshal (marshalPayload conn) tag data rem in
    804   let unmarshal conn buf = (safeUnmarshal (unmarshalPayload conn) tag buf : 'a) in
    805   (marshal, unmarshal)
    806 
    807 (*****************************************************************************)
    808 (*                              SERVER SETUP                                 *)
    809 (*****************************************************************************)
    810 
    811 (* BCPFIX: Now that we've beefed up the clroot data structure, shouldn't
    812    these be part of it too? *)
    813 let sshCmd =
    814   Prefs.createString "sshcmd" "ssh"
    815     ~category:(`Advanced `Remote)
    816     ("path to the ssh executable")
    817     ("This preference can be used to explicitly set the name of the "
    818      ^ "ssh executable (e.g., giving a full path name), if necessary.")
    819 
    820 let sshargs =
    821   Prefs.createString "sshargs" ""
    822     ~category:(`Advanced `Remote)
    823     "other arguments (if any) for remote shell command"
    824     ("The string value of this preference will be passed as additional "
    825      ^ "arguments (besides the host name and the name of the Unison "
    826      ^ "executable on the remote system) to the \\verb|ssh| "
    827      ^ "command used to invoke the remote server. The backslash is an "
    828      ^ "escape character."
    829      )
    830 
    831 (* rsh prefs removed since 2.52 *)
    832 let () = Prefs.markRemoved "rshcmd"
    833 let () = Prefs.markRemoved "rshargs"
    834 
    835 let serverCmd =
    836   Prefs.createString "servercmd" ""
    837     ~category:(`Advanced `Remote)
    838     ("name of " ^ Uutil.myName ^ " executable on remote server")
    839     ("This preference can be used to explicitly set the name of the "
    840      ^ "Unison executable on the remote server (e.g., giving a full "
    841      ^ "path name), if necessary.")
    842 
    843 let addversionno =
    844   Prefs.createBool "addversionno" false
    845     ~category:(`Advanced `Remote)
    846     ("add version number to name of " ^ Uutil.myName ^ " on server")
    847     ("When this flag is set to {\\tt true}, Unison "
    848      ^ "will use \\texttt{unison-\\ARG{currentmajorversionnumber}} instead of "
    849      ^ "just \\verb|unison| as the remote server command (note that the minor "
    850      ^ "version number is dropped -- e.g., unison-2.51).  This allows "
    851      ^ "multiple binaries for different versions of unison to coexist "
    852      ^ "conveniently on the same server: whichever version is run "
    853      ^ "on the client, the same version will be selected on the server.")
    854 
    855 (**********************************************************************
    856                        CLIENT/SERVER PROTOCOLS
    857  **********************************************************************)
    858 
    859 (*
    860 Each protocol has a name, a client side, and a server side.
    861 
    862 The server remembers the server side of each protocol in a table
    863 indexed by protocol name. The function of the server is to wait for
    864 the client to invoke a protocol, and carry out the appropriate server
    865 side.
    866 
    867 Protocols are invoked on the client with arguments for the server side.
    868 The result of the protocol is the result of the server side. In types,
    869 
    870   serverSide : 'a -> 'b
    871 
    872 That is, the server side takes arguments of type 'a from the client,
    873 and returns a result of type 'b.
    874 
    875 A protocol is started by the client sending a Request packet and then a
    876 packet containing the protocol name to the server.  The server looks
    877 up the server side of the protocol in its table.
    878 
    879 Next, the client sends a packet containing marshaled arguments for the
    880 server side.
    881 
    882 The server unmarshals the arguments and invokes the server side with
    883 the arguments from the client.
    884 
    885 When the server side completes it gives a result. The server marshals
    886 the result and sends it to the client.  (Instead of a result, the
    887 server may also send back either a Transient or a Fatal error packet).
    888 Finally, the client can receive the result packet from the server and
    889 unmarshal it.
    890 
    891 The protocol is fully symmetric, so the server may send a Request
    892 packet to invoke a function remotely on the client.  In this case, the
    893 two switch roles.)
    894 *)
    895 
    896 let receivePacket conn =
    897   (* Get the length of the packet *)
    898   let int_buf = Bytearray.create intSize in
    899   grab conn.inputBuffer int_buf intSize >>= (fun () ->
    900   let length = decodeInt int_buf 0 in
    901   assert (length >= 0);
    902   (* Get packet *)
    903   let buf = Bytearray.create length in
    904   grab conn.inputBuffer buf length >>= (fun () ->
    905   (debugE (fun () ->
    906              let start =
    907                if length > 10 then (Bytearray.sub buf 0 10) ^ "..."
    908                else Bytearray.sub buf 0 length in
    909              let start = String.escaped start in
    910              Util.msg "receive '%s' %d bytes\n" start length);
    911    Lwt.return buf)))
    912 
    913 type servercmd =
    914   connection -> Bytearray.t ->
    915   ((Bytearray.t * int * int) list -> (Bytearray.t * int * int) list) Lwt.t
    916 let serverCmds = ref (Util.StringMap.empty : servercmd Util.StringMap.t)
    917 
    918 type serverstream =
    919   connection -> Bytearray.t -> unit
    920 let serverStreams = ref (Util.StringMap.empty : serverstream Util.StringMap.t)
    921 
    922 type header =
    923     NormalResult
    924   | TransientExn of string
    925   | FatalExn of string
    926   | Request of string
    927   | Stream of string
    928   | StreamAbort
    929 
    930 let mheader = Umarshal.(sum6 unit string string string string unit
    931                           (function
    932                            | NormalResult -> I61 ()
    933                            | TransientExn a -> I62 a
    934                            | FatalExn a -> I63 a
    935                            | Request a -> I64 a
    936                            | Stream a -> I65 a
    937                            | StreamAbort -> I66 ())
    938                           (function
    939                            | I61 () -> NormalResult
    940                            | I62 a -> TransientExn a
    941                            | I63 a -> FatalExn a
    942                            | I64 a -> Request a
    943                            | I65 a -> Stream a
    944                            | I66 () -> StreamAbort))
    945 
    946 let ((marshalHeader, unmarshalHeader) : header marshalingFunctions) =
    947   makeMarshalingFunctions (defaultMarshalingFunctions convV0_id mheader) "rsp"
    948 
    949 let processRequest conn id cmdName buf =
    950   let cmd =
    951     try Util.StringMap.find cmdName !serverCmds
    952     with Not_found -> raise (Util.Fatal (cmdName ^ " not registered!"))
    953   in
    954   Lwt.try_bind (fun () -> cmd conn buf)
    955     (fun marshal ->
    956        debugE (fun () -> Util.msg "Sending result (id: %d)\n" (decodeInt id 0));
    957        dump conn ((id, 0, intSize) :: marshalHeader conn NormalResult (marshal [])))
    958     (function
    959        Util.Transient s ->
    960          debugE (fun () ->
    961            Util.msg "Sending transient exception (id: %d)\n" (decodeInt id 0));
    962          dump conn ((id, 0, intSize) :: marshalHeader conn (TransientExn s) [])
    963      | Util.Fatal s ->
    964          debugE (fun () ->
    965            Util.msg "Sending fatal exception (id: %d)\n" (decodeInt id 0));
    966          dump conn ((id, 0, intSize) :: marshalHeader conn (FatalExn s) [])
    967      | e ->
    968          Lwt.fail e)
    969   (* With the current RPC protocol it is not possible to recover from situations
    970      where an RPC packet is not completely transmitted (due to an interrupted
    971      write syscall). The other side will hang forever, waiting for the complete
    972      packet to arrive. New packets (here, transmitting the exception) can't be
    973      sent because the receiver can't read them until the previous packet is
    974      complete. (Best case, the server quits with a protocol error.)
    975 
    976      Therefore, it is important that exceptions that can interrupt syscalls
    977      (for example, Sys.Break (Ctrl-C)) are never wrapped into Util.Transient or
    978      Util.Fatal, unless the connection is already known to be broken. Likewise,
    979      other exception handlers than the one just above must avoid writing out
    980      additional data to the RPC connection. *)
    981 
    982 let streamAbortedSrc = ref 0
    983 let streamAbortedDst = ref false
    984 
    985 let streamError = Hashtbl.create 7
    986 
    987 let resetStreamErroState () =
    988   streamAbortedSrc := 0;
    989   streamAbortedDst := false;
    990   Hashtbl.reset streamError
    991 let () = at_conn_close resetStreamErroState
    992 
    993 let abortStream conn id =
    994   if not !streamAbortedDst then begin
    995     streamAbortedDst := true;
    996     let request = encodeInt id :: marshalHeader conn StreamAbort [] in
    997     dumpUrgent conn request
    998   end else
    999     Lwt.return ()
   1000 
   1001 let processStream conn id cmdName buf =
   1002   let id = decodeInt id 0 in
   1003   if Hashtbl.mem streamError id then
   1004    abortStream conn id
   1005   else begin
   1006     begin try
   1007       let cmd =
   1008         try Util.StringMap.find cmdName !serverStreams
   1009         with Not_found -> raise (Util.Fatal (cmdName ^ " not registered!"))
   1010       in
   1011       cmd conn buf;
   1012       Lwt.return ()
   1013     with e ->
   1014       Hashtbl.add streamError id e;
   1015       abortStream conn id
   1016     end
   1017   end
   1018 
   1019 (* Message ids *)
   1020 type msgId = int
   1021 module MsgIdMap = Map.Make (struct type t = msgId let compare = compare end)
   1022 (* An integer just a little smaller than the maximum representable in
   1023    30 bits *)
   1024 let hugeint = 1000000000
   1025 let ids = ref 1
   1026 let newMsgId () = incr ids; if !ids = hugeint then ids := 2; !ids
   1027 
   1028 (* Threads waiting for a response from the other side *)
   1029 let receivers = ref MsgIdMap.empty
   1030 let resetReceivers () =
   1031   receivers := MsgIdMap.empty
   1032 let () = at_conn_close resetReceivers
   1033 
   1034 let find_receiver id =
   1035   let thr = MsgIdMap.find id !receivers in
   1036   receivers := MsgIdMap.remove id !receivers;
   1037   thr
   1038 
   1039 (* Receiving thread: read a message and dispatch it to the right
   1040    thread or create a new thread to process requests. *)
   1041 let rec receive conn =
   1042   begin
   1043     debugE (fun () -> Util.msg "Waiting for next message\n");
   1044     (* Get the message ID *)
   1045     let id = Bytearray.create intSize in
   1046     grab conn.inputBuffer id intSize >>= (fun () ->
   1047     let num_id = decodeInt id 0 in
   1048     if num_id = 0 then begin
   1049       debugE (fun () -> Util.msg "Received the write permission\n");
   1050       allowWrites conn.outputQueue;
   1051       receive conn
   1052     end else begin
   1053       debugE
   1054         (fun () -> Util.msg "Message received (id: %d)\n" num_id);
   1055       (* Read the header *)
   1056       receivePacket conn >>= (fun buf ->
   1057       let req = unmarshalHeader conn buf in
   1058       begin match req with
   1059         Request cmdName ->
   1060           receivePacket conn >>= (fun buf ->
   1061           (* We yield before starting processing the request.
   1062              This way, the request may call [Lwt_unix.run] and this will
   1063              not block the receiving thread. *)
   1064           Lwt.ignore_result
   1065             (Lwt_unix.yield () >>= (fun () ->
   1066              processRequest conn id cmdName buf));
   1067           receive conn)
   1068       | NormalResult ->
   1069           receivePacket conn >>= (fun buf ->
   1070           Lwt.wakeup (find_receiver num_id) buf;
   1071           receive conn)
   1072       | TransientExn s ->
   1073           debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s);
   1074           Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s);
   1075           receive conn
   1076       | FatalExn s ->
   1077           debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s);
   1078           Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s));
   1079           receive conn
   1080       | Stream cmdName ->
   1081           receivePacket conn >>= fun buf ->
   1082           processStream conn id cmdName buf >>= fun () ->
   1083           receive conn
   1084       | StreamAbort ->
   1085           streamAbortedSrc := num_id;
   1086           receive conn
   1087       end)
   1088     end)
   1089   end
   1090 
   1091 let wait_for_reply id =
   1092   let res = Lwt.wait () in
   1093   receivers := MsgIdMap.add id res !receivers;
   1094   (* We yield to let the receiving thread restart.  This way, the
   1095      thread may call [Lwt_unix.run] and this will not block the
   1096      receiving thread. *)
   1097   Lwt.catch
   1098     (fun () ->
   1099        res >>= (fun v -> Lwt_unix.yield () >>= (fun () -> Lwt.return v)))
   1100     (fun e -> Lwt_unix.yield () >>= (fun () -> Lwt.fail e))
   1101 
   1102 let registerSpecialServerCmd
   1103     (cmdName : string)
   1104     marshalingFunctionsArgs
   1105     marshalingFunctionsResult
   1106     (serverSide : connection -> 'a -> 'b Lwt.t)
   1107     =
   1108   (* Check that this command name has not already been bound *)
   1109   if (Util.StringMap.mem cmdName !serverCmds) then
   1110     raise (Util.Fatal (cmdName ^ " already registered!"));
   1111   (* Create marshaling and unmarshaling functions *)
   1112   let ((marshalArgs,unmarshalArgs) : 'a marshalingFunctions) =
   1113     makeMarshalingFunctions marshalingFunctionsArgs (cmdName ^ "-args") in
   1114   let ((marshalResult,unmarshalResult) : 'b marshalingFunctions) =
   1115     makeMarshalingFunctions marshalingFunctionsResult (cmdName ^ "-res") in
   1116   (* Create a server function and remember it *)
   1117   let server conn buf =
   1118     let args = unmarshalArgs conn buf in
   1119     serverSide conn args >>= (fun answer ->
   1120     Lwt.return (marshalResult conn answer))
   1121   in
   1122   serverCmds := Util.StringMap.add cmdName server !serverCmds;
   1123   (* Create a client function and return it *)
   1124   let client conn serverArgs =
   1125     let id = newMsgId () in (* Message ID *)
   1126     assert (id >= 0); (* tracking down an assert failure in receivePacket... *)
   1127     let request =
   1128       encodeInt id ::
   1129       marshalHeader conn (Request cmdName) (marshalArgs conn serverArgs [])
   1130     in
   1131     let reply = wait_for_reply id in
   1132     debugE (fun () -> Util.msg "Sending request (id: %d)\n" id);
   1133     dump conn request >>= (fun () ->
   1134     reply >>= (fun buf ->
   1135     Lwt.return (unmarshalResult conn buf)))
   1136   in
   1137   client
   1138 
   1139 let registerServerCmd name ?(convV0=convV0_id_pair) mArg mRet f =
   1140   registerSpecialServerCmd
   1141     name (defaultMarshalingFunctions (fst convV0) mArg)
   1142          (defaultMarshalingFunctions (snd convV0) mRet) f
   1143 
   1144 (* RegisterHostCmd is a simpler version of registerClientServer [registerServerCmd?].
   1145    It is used to create remote procedure calls: the only communication
   1146    between the client and server is the sending of arguments from
   1147    client to server, and the sending of the result from the server
   1148    to the client. Thus, server side does not need the file descriptors
   1149    for communication with the client.
   1150 
   1151    RegisterHostCmd recognizes the case where the server is the local
   1152    host, and it avoids socket communication in this case.
   1153 *)
   1154 let registerHostCmd cmdName ?(convV0=convV0_id_pair) mArg mRet cmd =
   1155   let serverSide = (fun _ args -> cmd args) in
   1156   let client0 =
   1157     registerServerCmd cmdName ~convV0 mArg mRet serverSide in
   1158   let client root args =
   1159     let conn = ClientConn.ofRoot root in
   1160     client0 conn args in
   1161   (* Return a function that runs either the proxy or the local version,
   1162      depending on whether the call is to the local host or a remote one *)
   1163   fun root args ->
   1164     match root with
   1165     | (Common.Local, _) -> cmd args
   1166     | (Common.Remote _, _) -> client root args
   1167 
   1168 (* RegisterRootCmd is like registerHostCmd but it indexes connections by
   1169    root instead of host. *)
   1170 let registerRootCmd (cmdName : string)
   1171   ?(convV0=convV0_id_pair) mArg mRet (cmd : (Fspath.t * 'a) -> 'b) =
   1172   let mArg = Umarshal.(prod2 Fspath.m mArg id id) in
   1173   let r = registerHostCmd cmdName ~convV0 mArg mRet cmd in
   1174   fun root args -> r root ((snd root), args)
   1175 
   1176 let registerRootCmdWithConnection (cmdName : string)
   1177   ?(convV0=convV0_id_pair) mArg mRet (cmd : connection -> 'a -> 'b) =
   1178   let client0 = registerServerCmd cmdName ~convV0 mArg mRet cmd in
   1179   (* Return a function that runs either the proxy or the local version,
   1180      depending on whether the call is to the local host or a remote one *)
   1181   fun localRoot remoteRoot args ->
   1182     match (fst localRoot) with
   1183     | Common.Local -> let conn = ClientConn.ofRoot remoteRoot in
   1184             cmd conn args
   1185     | _  -> let conn = ClientConn.ofRoot localRoot in
   1186             client0 conn args
   1187 
   1188 let streamReg = lwtRegionWithConnCleanup 1
   1189 
   1190 let streamingActivated =
   1191   Prefs.createBool "stream" true
   1192     ~category:(`Advanced `Remote)
   1193     ~deprecated:true
   1194     ("use a streaming protocol for transferring file contents")
   1195     "When this preference is set, Unison will use an experimental \
   1196      streaming protocol for transferring file contents more efficiently. \
   1197      The default value is \\texttt{true}."
   1198 
   1199 let registerStreamCmd
   1200     (cmdName : string)
   1201     marshalingFunctionsArgs
   1202     (serverSide : connection -> 'a -> unit)
   1203     =
   1204   let cmd =
   1205     registerSpecialServerCmd
   1206       cmdName marshalingFunctionsArgs
   1207       (defaultMarshalingFunctions convV0_id Umarshal.unit)
   1208       (fun conn v -> serverSide conn v; Lwt.return ())
   1209   in
   1210   let ping =
   1211     registerServerCmd (cmdName ^ "Ping") Umarshal.int Umarshal.unit
   1212       (fun conn (id : int) ->
   1213          try
   1214            let e = Hashtbl.find streamError id in
   1215            Hashtbl.remove streamError id;
   1216            streamAbortedDst := false;
   1217            Lwt.fail e
   1218          with Not_found ->
   1219            Lwt.return ())
   1220   in
   1221   (* Check that this command name has not already been bound *)
   1222   if (Util.StringMap.mem cmdName !serverStreams) then
   1223     raise (Util.Fatal (cmdName ^ " already registered!"));
   1224   (* Create marshaling and unmarshaling functions *)
   1225   let ((marshalArgs,unmarshalArgs) : 'a marshalingFunctions) =
   1226     makeMarshalingFunctions marshalingFunctionsArgs (cmdName ^ "-str") in
   1227   (* Create a server function and remember it *)
   1228   let server conn buf =
   1229     let args = unmarshalArgs conn buf in
   1230     serverSide conn args
   1231   in
   1232   serverStreams := Util.StringMap.add cmdName server !serverStreams;
   1233   (* Create a client function and return it *)
   1234   let client conn id serverArgs =
   1235     debugE (fun () -> Util.msg "Sending stream chunk (id: %d)\n" id);
   1236     if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted");
   1237     let request =
   1238       encodeInt id ::
   1239       marshalHeader conn (Stream cmdName) (marshalArgs conn serverArgs [])
   1240     in
   1241     dumpIdle conn request
   1242   in
   1243   fun conn sender ->
   1244     if not (Prefs.read streamingActivated) then
   1245       sender (fun v -> cmd conn v)
   1246     else begin
   1247       (* At most one active stream at a time *)
   1248       let id = newMsgId () in (* Message ID *)
   1249       Lwt.try_bind
   1250         (fun () ->
   1251            Lwt_util.run_in_region !streamReg 1
   1252              (fun () -> sender (fun v -> client conn id v)))
   1253         (fun v -> ping conn id >>= fun () -> Lwt.return v)
   1254         (fun e ->
   1255            if !streamAbortedSrc = id then begin
   1256              debugE (fun () ->
   1257                Util.msg "Pinging remote end after streaming error\n");
   1258              ping conn id >>= fun () -> Lwt.fail e
   1259            end else
   1260              Lwt.fail e)
   1261     end
   1262 
   1263 let commandAvailable =
   1264   registerRootCmd "commandAvailable" Umarshal.string Umarshal.bool
   1265     (fun (_, cmdName) -> Lwt.return (Util.StringMap.mem cmdName !serverCmds))
   1266 
   1267 (****************************************************************************
   1268                      BUILDING CONNECTIONS TO THE SERVER
   1269  ****************************************************************************)
   1270 
   1271 let receiveUntilSep ?(space=false) ?(nl=true) ?(includesep=false) conn =
   1272   assert (space || nl);
   1273   let inp = Buffer.create 32
   1274   and buf = Bytearray.create 1 in
   1275   let add () = Buffer.add_char inp buf.{0} in
   1276   let rec aux () =
   1277     grab conn.inputBuffer buf 1 >>= fun () ->
   1278     match buf.{0} with
   1279     | ' ' | '\t' when space ->
   1280         if includesep then add (); Lwt.return (Buffer.contents inp)
   1281     | '\n' when nl ->
   1282         if includesep then add (); Lwt.return (Buffer.contents inp)
   1283     | '\r' ->
   1284         aux () (* ignore *)
   1285     | _ ->
   1286         add (); aux ()
   1287   in aux ()
   1288 
   1289 (* Get input until newline (excluded), blocking *)
   1290 let receiveUntilNewline conn =
   1291   receiveUntilSep ~nl:true conn
   1292 
   1293 (* Get input until space or newline, separator included by default; blocking *)
   1294 let receiveUntilSpaceOrNl ?(includesep=true) conn =
   1295   receiveUntilSep ~space:true ~nl:true ~includesep conn
   1296 
   1297 (* Get input of fixed length, blocking *)
   1298 let receiveString conn len =
   1299   let buf = Bytearray.create len in
   1300   grab conn.inputBuffer buf len >>= fun () ->
   1301   Lwt.return (Bytearray.to_string buf)
   1302 
   1303 (* Get input until newline (excluded), non-blocking *)
   1304 let receiveUntilNewlineNb conn =
   1305   let e = Bytes.to_string (peekWithoutBlocking conn.inputBuffer) in
   1306   let len = try String.index e '\n' with Not_found -> String.length e in
   1307   receiveString conn len
   1308 
   1309 let sendStrings conn slist =
   1310   dump conn
   1311     (Safelist.map (fun s -> (Bytearray.of_string s, 0, String.length s)) slist)
   1312 
   1313 let sendString conn s = sendStrings conn [s]
   1314 
   1315 (****)
   1316 
   1317 let rpcOk = "OK\n"
   1318 
   1319 let rpcNokTag = "NOK "
   1320 let rpcErr err = rpcNokTag ^ err ^ "\n"
   1321 
   1322 type handshakeMsg = Ok | Error of string | Unknown of string
   1323 
   1324 let receiveHandshakeMsg conn =
   1325   receiveUntilSpaceOrNl conn >>= fun msg ->
   1326   if msg = rpcOk then Lwt.return Ok
   1327   else if msg = rpcNokTag then begin
   1328     receiveUntilNewlineNb conn >>= fun msg -> Lwt.return (Error msg)
   1329   end else
   1330     Lwt.return (Unknown msg)
   1331 
   1332 type handshakeData = Data of string | Error of string | Unknown of string
   1333 
   1334 let receiveHandshakeData conn keyw =
   1335   receiveUntilSpaceOrNl conn >>= fun msg ->
   1336   if msg = keyw then
   1337     receiveUntilNewline conn >>= fun data -> Lwt.return (Data data)
   1338   else if msg = rpcNokTag then
   1339     receiveUntilNewlineNb conn >>= fun err -> Lwt.return (Error err)
   1340   else
   1341     Lwt.return (Unknown msg)
   1342 
   1343 let sendHandshakeMsg conn = function
   1344   | Ok -> sendString conn rpcOk
   1345   | Error err -> sendString conn (rpcErr err)
   1346   | Unknown _ -> assert false
   1347 
   1348 let sendHandshakeErr conn err =
   1349   sendHandshakeMsg conn (Error err)
   1350 
   1351 let sendHandshakeData conn keyw data =
   1352   let len = String.length keyw in
   1353   let keyw = if len > 0 && keyw.[len - 1] <> ' ' then keyw ^ " " else keyw in
   1354   sendString conn (keyw ^ data ^ "\n")
   1355 
   1356 (* RPC version negotiation process:
   1357    1. Server sends connectionHeader and supported RPC versions.
   1358 
   1359    2. Client receives and verifies connectionHeader.
   1360       * If OK then proceeds.
   1361       * If NOK then closes connection.
   1362 
   1363    3. Client receives and verifies RPC versions.
   1364       * If not correct version tag or can't parse then closes connection.
   1365 
   1366    4. Client selects a version (typically the most recent one) from the
   1367       intersection of its supported RPC versions and server's RPC versions.
   1368       * If intersection is empty then closes connection.
   1369 
   1370    5. Client sends selected RPC version to the server.
   1371 
   1372    6. Server receives and verifies proposed version.
   1373       * If OK then proceeds.
   1374       * If not correct version tag, can't parse or proposed version is
   1375         not supported then server sends "NOK".
   1376       ** Client receives "NOK" and closes connection.
   1377 
   1378    7. Server selects proposed version and sends "OK".
   1379 
   1380    8. Client receives "OK". Version negotiation is complete.
   1381 *)
   1382 
   1383 let connectionHeader = "Unison RPC\n"
   1384 let compatConnectionHeader = "Unison 2.51 with OCaml >= 4.01.2\n"
   1385 (* Every supported version released prior to the RPC version negotiation
   1386    mechanism uses this connection header string. *)
   1387 let compat248ConnectionHeader = "Unison 2.48\n"
   1388 (* Additionally, even 2.48 can be supported, even though that support is
   1389    not official. *)
   1390 
   1391 let rpcVersionsTag = "VERSIONS "
   1392 let rpcVersionsStr = rpcVersionsTag ^ rpcSupportedVersionStrHdr ^ "\n"
   1393 
   1394 let rpcVersionTag = "VERSION "
   1395 let rpcVersionStr ver = rpcVersionTag ^ string_of_int ver ^ "\n"
   1396 
   1397 let verIsSupported ver =
   1398   Safelist.exists (fun v -> v = ver) rpcSupportedVersions
   1399 
   1400 let handshakeFail err =
   1401   Lwt.fail (Util.Fatal err)
   1402 
   1403 let handshakeError msg =
   1404   handshakeFail ("Received error from the server: \"" ^ msg ^ "\".")
   1405 
   1406 let handshakeUnknown msg =
   1407   handshakeFail ("Received unexpected header from the server: \""
   1408                  ^ String.escaped msg ^ "\".")
   1409 
   1410 let parseVersion side s =
   1411   let error e =
   1412     raise (Util.Transient
   1413             ("Unknown " ^ side ^ " RPC version: " ^ e
   1414              ^ ". Version received from " ^ side ^ ": \"" ^ String.escaped s
   1415              ^ "\". Supported RPC versions: " ^ rpcSupportedVersionStr))
   1416   in
   1417   if s = "" then
   1418     error "invalid format"
   1419   else
   1420     match int_of_string s with
   1421     | ver -> Some ver
   1422     | exception Failure _ -> error "parse error"
   1423 
   1424 let parseServerVersions inp =
   1425   let supported l = function
   1426     | "" -> l
   1427     | v -> match parseVersion "server" v with
   1428            | Some vi -> if verIsSupported vi then vi :: l else l
   1429            | None -> l
   1430   in
   1431   try
   1432     let vs = String.split_on_char ' ' inp in
   1433     if vs = [""] then ignore (parseVersion "server" ""); (* Trigger the error *)
   1434     let intersect = Safelist.fold_left supported [] vs in
   1435     Lwt.return (Safelist.rev (Safelist.sort compare intersect))
   1436   with
   1437   | Util.Transient e -> handshakeFail e
   1438 
   1439 let selectServerVersion conn =
   1440   let getTheRest () = Bytes.to_string (peekWithoutBlocking conn.inputBuffer) in
   1441   receiveHandshakeData conn rpcVersionsTag >>= function
   1442   | Error msg -> handshakeError msg
   1443   | Unknown fromServ -> handshakeUnknown (fromServ ^ getTheRest ())
   1444   | Data versions ->
   1445       parseServerVersions versions >>= function
   1446       | [] ->
   1447           handshakeFail ("None of server's RPC versions are supported. "
   1448                          ^ "The server may be too old or too recent. "
   1449                          ^ "Versions received from server: \""
   1450                          ^ String.escaped versions ^ "\". "
   1451                          ^ "Supported RPC versions: " ^ rpcSupportedVersionStr)
   1452       | ver :: _ ->
   1453           setConnectionVersion conn ver;
   1454           debug (fun () -> Util.msg "Selected RPC version: %i\n" ver);
   1455           sendHandshakeData conn rpcVersionTag (string_of_int ver) >>= fun () ->
   1456           receiveHandshakeMsg conn >>= function
   1457           | Ok -> Lwt.return ()
   1458           | Error reply -> handshakeError reply
   1459           | Unknown reply -> handshakeUnknown (reply ^ getTheRest ())
   1460 
   1461 let checkServerVersion conn header =
   1462   if header = compatConnectionHeader then begin
   1463     setConnectionVersion conn 0;
   1464     debug (fun () -> Util.msg "Selected RPC version: 2.51-compatibility\n");
   1465     (* skip negotiation *) Lwt.return ()
   1466   end else if header = compat248ConnectionHeader then begin
   1467     setConnectionVersion conn 0;
   1468     debug (fun () -> Util.msg "Selected RPC version: 2.48-compatibility\n");
   1469     (* skip negotiation *) Lwt.return ()
   1470   end else
   1471     selectServerVersion conn
   1472 
   1473 let rec checkHeaderRec conn buffer pos len connectionHeader =
   1474   if pos = len then
   1475     Lwt.return connectionHeader
   1476   else begin
   1477     (grab conn.inputBuffer buffer 1 >>= (fun () ->
   1478     let chOk =
   1479       try buffer.{0} = connectionHeader.[pos] with Invalid_argument _ -> false
   1480     and compatChOk =
   1481       try buffer.{0} = compatConnectionHeader.[pos] with Invalid_argument _ -> false
   1482     and compat248ChOk =
   1483       try buffer.{0} = compat248ConnectionHeader.[pos] with Invalid_argument _ -> false
   1484     in
   1485     if not chOk && not compatChOk && not compat248ChOk then
   1486       let prefix =
   1487         String.sub connectionHeader 0 pos ^ Bytearray.to_string buffer in
   1488       let rest = peekWithoutBlocking conn.inputBuffer in
   1489       Lwt.fail
   1490         (Util.Fatal
   1491            ("Received unexpected header from the server:\n \
   1492              expected \""
   1493            ^ String.escaped (* (String.sub connectionHeader 0 (pos + 1)) *)
   1494                connectionHeader
   1495            ^ "\" but received \"" ^ String.escaped (prefix ^ Bytes.to_string rest) ^ "\", \n"
   1496            ^ "which differs at \"" ^ String.escaped prefix ^ "\".\n"
   1497            ^ "This can happen because you have different versions of Unison\n"
   1498            ^ "installed on the client and server machines, or because\n"
   1499            ^ "your connection is failing and somebody is printing an error\n"
   1500            ^ "message, or because your remote login shell is printing\n"
   1501            ^ "something itself before starting Unison."))
   1502     else
   1503     if not chOk && compatChOk then
   1504       (* We make use of the fact that that the new header is almost a prefix
   1505          of the old header. It is not an exact comparison here but good
   1506          enough for this purpose. *)
   1507       checkHeaderRec conn buffer (pos + 1)
   1508         (String.length compatConnectionHeader) compatConnectionHeader
   1509     else if not chOk && compat248ChOk then
   1510       checkHeaderRec conn buffer (pos + 1)
   1511         (String.length compat248ConnectionHeader) compat248ConnectionHeader
   1512     else
   1513       checkHeaderRec conn buffer (pos + 1) len connectionHeader))
   1514   end
   1515 
   1516 let checkHeader conn =
   1517   checkHeaderRec conn (Bytearray.create 1) 0
   1518     (String.length connectionHeader) connectionHeader
   1519 
   1520 (****)
   1521 
   1522 (* Magic string exchange is used within the old protocol to detect if both
   1523    the server and the client support the new RPC version negotiation mechanism.
   1524 
   1525    It works like this:
   1526     1. Directly after connection header, the server sends the magic string and
   1527        otherwise continues using the old RPC protocol.
   1528     2. An old client will process the magic string as a valid RPC message that
   1529        is effectively a no-op and continues using old RPC protocol as normal.
   1530     3. A new client will notice the magic string and send the same magic string
   1531        in response. It will stop the old RPC protocol and restart from header
   1532        checking and what is now hopefully an RPC version negotiation.
   1533     4. The server will notice client's magic string, stop the old RPC protocol
   1534        and restart from connection header, this time with the new RPC version
   1535        negotiation mechanism.
   1536 
   1537    The magic string is defined as follows:
   1538     1. encoded int 1 followed by
   1539     2. encoded int > 0 (packet size) followed by
   1540     3. a valid 2.51 protocol packet, the contents of which we don't care about,
   1541        but it must be a no-op for 2.51 client (in this case a StreamAbort).
   1542 
   1543    Int 1 is a valid 2.51 protocol message ID but it is never used with normal
   1544    messages, hence its safe usage as a magic string. A StreamAbort to a client,
   1545    especially with id 1, is a safe no-op. *)
   1546 
   1547 let magicId = 1
   1548 (* Although this magic packet is inherently dependent on OCaml version,
   1549    it is unlikely to change and has been verified to be the same with
   1550    OCaml versions 4.05 to 4.12. It is hard coded here to avoid any future
   1551    changes (the idea being that old clients will not be compiled with
   1552    any newer OCaml compilers). *)
   1553 let magicPacket = "rsp\132\149\166\190\000\000\000\001\000\000\000\000\000\000\000\000\000\000\000\000A"
   1554 let magic = encodeInt magicId :: encodeInt (String.length magicPacket) ::
   1555               [Bytearray.of_string magicPacket, 0, String.length magicPacket]
   1556 
   1557 let checkForMagicString conn =
   1558   (* Fill the buffer and then peek at the contents without consuming *)
   1559   peekWithBlocking conn.inputBuffer >>= fun b ->
   1560   if Bytes.length b < intSize then
   1561     Lwt.return false
   1562   else begin
   1563     let id = Bytearray.create intSize in
   1564     let () = Bytearray.blit_from_bytes b 0 id 0 intSize in
   1565     if decodeInt id 0 <> magicId then
   1566       Lwt.return false
   1567     else begin
   1568       debug (fun () -> Util.msg "Received RPC version upgrade notice\n");
   1569       (* Consume magic id from buffer *)
   1570       grab conn.inputBuffer id intSize >>= fun () ->
   1571       (* Consume magic packet from buffer *)
   1572       receivePacket conn >>= fun _ -> Lwt.return true
   1573       (* We rely solely on the magic id and don't check the contents of the
   1574          packet. Should it become necessary for some reason then it is
   1575          possible to verify the magic packet byte by byte here. *)
   1576     end
   1577   end
   1578 
   1579 let checkServerUpgrade conn header =
   1580   if header <> compatConnectionHeader && header <> compat248ConnectionHeader then
   1581     Lwt.return header
   1582   else
   1583     checkForMagicString conn >>= function
   1584     | false -> Lwt.return header
   1585     | true ->
   1586         (* Consume write token from buffer *)
   1587         let id = Bytearray.create intSize in
   1588         grab conn.inputBuffer id intSize >>= fun () ->
   1589         (* Send the magic string *)
   1590         dumpUrgent conn magic >>= fun () ->
   1591         debug (fun () -> Util.msg "Going to attempt RPC version upgrade\n");
   1592         checkHeader conn
   1593 
   1594 (****)
   1595 
   1596 (*
   1597    Disable flow control if possible.
   1598    Both hosts must use non-blocking I/O (otherwise a dead-lock is
   1599    possible with ssh).
   1600 *)
   1601 let halfduplex =
   1602   Prefs.createBool "halfduplex" false
   1603     ~category:(`Advanced `Remote)
   1604     ~deprecated:true
   1605     "force half-duplex communication with the server"
   1606     "When this flag is set to {\\tt true}, Unison network communication \
   1607      is forced to be half duplex (the client and the server never \
   1608      simultaneously emit data).  If you experience unstabilities with \
   1609      your network link, this may help."
   1610 
   1611 let negociateFlowControlLocal conn () =
   1612   disableFlowControl conn.outputQueue;
   1613   Lwt.return false
   1614 
   1615 let negociateFlowControlRemote =
   1616   registerServerCmd "negociateFlowControl" Umarshal.unit Umarshal.bool negociateFlowControlLocal
   1617 
   1618 let negociateFlowControl conn =
   1619   (* Flow control negotiation can be done asynchronously. *)
   1620   if not (Prefs.read halfduplex) then
   1621     Lwt.ignore_result
   1622       (negociateFlowControlRemote conn () >>= fun needed ->
   1623        if not needed then
   1624          negociateFlowControlLocal conn ()
   1625        else
   1626          Lwt.return true)
   1627 
   1628 (****)
   1629 
   1630 let initConnection ?(connReady=fun () -> ()) ?cleanup in_ch out_ch =
   1631   (* [makeConnection] is not expected to raise any recoverable exceptions.
   1632      If this assumption changes in the future then [in_ch] and [out_ch] must
   1633      be closed in the recovery code. *)
   1634   let conn = makeConnection false in_ch out_ch in
   1635   let close_on_fail t =
   1636     Lwt.catch (fun () -> t) (fun e -> closeConnection conn; Lwt.fail e)
   1637   in
   1638   let with_timeout t =
   1639     Lwt.choose [t;
   1640       Lwt_unix.sleep 120. >>= fun () ->
   1641       Lwt.fail (Util.Fatal "Timed out negotiating connection with the server")]
   1642   in
   1643   close_on_fail (with_timeout (
   1644     peekWithBlocking conn.inputBuffer >>= fun _ ->
   1645     connReady (); Lwt.return () >>= fun () -> (* Connection working, notify *)
   1646     checkHeader conn >>=
   1647     checkServerUpgrade conn >>=
   1648     checkServerVersion conn)) >>= fun () ->
   1649   registerConnCleanup conn cleanup;
   1650   (* From this moment forward, the RPC version has been selected. All
   1651      communication must now adhere to that version's specification. *)
   1652   enableFlowControl conn false >>= (fun () ->
   1653   Lwt.ignore_result (Lwt.catch
   1654     (fun () -> receive conn)
   1655     (fun e ->
   1656       clientConnClose conn;
   1657       if isConnectionCheck conn then Lwt.return () else Lwt.fail e));
   1658   negociateFlowControl conn;
   1659   Lwt.return conn)
   1660 
   1661 let rec findFirst f l =
   1662   match l with
   1663     []     -> Lwt.return None
   1664   | x :: r -> f x >>= fun v ->
   1665               match v with
   1666                 None        -> findFirst f r
   1667               | Some _ as v -> Lwt.return v
   1668 
   1669 let printAddr host addr =
   1670   match addr with
   1671     Unix.ADDR_UNIX s ->
   1672       s
   1673   | Unix.ADDR_INET (s, p) ->
   1674       Format.sprintf "%s[%s]:%d" host (Unix.string_of_inet_addr s) p
   1675 
   1676 let buildSocket host port kind ?(err="") ai =
   1677   let attemptCreation ai =
   1678     Lwt.catch
   1679       (fun () ->
   1680          let socket =
   1681            Lwt_unix.socket ~cloexec:true
   1682              ai.Unix.ai_family ai.Unix.ai_socktype ai.Unix.ai_protocol
   1683          in
   1684          Lwt.catch
   1685            (fun () ->
   1686               begin match kind with
   1687                 `Connect ->
   1688                   (* Connect (synchronously) to the remote host *)
   1689                   Lwt_unix.connect socket ai.Unix.ai_addr
   1690               | `Bind ->
   1691                   (* Some OS (Linux?) enable dual-stack mode by default;
   1692                      trying to bind both IPv4 and IPv6 sockets will fail
   1693                      with EADDRINUSE unless dual-stack mode is disabled. *)
   1694                   if ai.Unix.ai_family = Unix.PF_INET6 then
   1695                     Lwt_unix.setsockopt socket Unix.IPV6_ONLY true;
   1696                   (* Allow reuse of local addresses for bind *)
   1697                   if ai.Unix.ai_family <> Unix.PF_UNIX then
   1698                     Lwt_unix.setsockopt socket Unix.SO_REUSEADDR true;
   1699                   (* Bind the socket to portnum on the local host
   1700                      or to a filesystem path (when Unix domain socket) *)
   1701                   Lwt_unix.bind socket ai.Unix.ai_addr;
   1702                   (* Start listening, allow up to 1 pending request *)
   1703                   Lwt_unix.listen socket 1;
   1704                   Lwt.return ()
   1705               end >>= fun () ->
   1706               Lwt.return (Some socket))
   1707            (fun e ->
   1708               match e with
   1709                 Unix.Unix_error _ ->
   1710                   Lwt_unix.close socket;
   1711                   Lwt.fail e
   1712               | _ ->
   1713                   Lwt.fail e))
   1714       (fun e ->
   1715          match e with
   1716            Unix.Unix_error (error, _, _) ->
   1717              begin match error with
   1718                Unix.EAFNOSUPPORT | Unix.EPROTONOSUPPORT | Unix.EINVAL ->
   1719                  Lwt.return None
   1720              | _  ->
   1721                  let msg =
   1722                    match kind with
   1723                      `Connect ->
   1724                        Printf.sprintf "%s%s: %s\n"
   1725                          err
   1726                          (printAddr host ai.Unix.ai_addr)
   1727                          (Unix.error_message error)
   1728                    | `Bind when ai.Unix.ai_family <> Unix.PF_UNIX ->
   1729                        Printf.sprintf
   1730                          "Can't bind socket to port %s at address [%s]: %s\n"
   1731                          port
   1732                          (match ai.Unix.ai_addr with
   1733                             Unix.ADDR_INET (addr, _) ->
   1734                               Unix.string_of_inet_addr addr
   1735                           | _ ->
   1736                               assert false)
   1737                          (Unix.error_message error)
   1738                    | `Bind (* Unix.PF_UNIX *) ->
   1739                        Printf.sprintf
   1740                          "Can't bind socket to path '%s': %s\n"
   1741                          port
   1742                          (Unix.error_message error)
   1743                  in
   1744                  Lwt.fail (Util.Fatal msg)
   1745              end
   1746          | _ ->
   1747              Lwt.fail e)
   1748   in
   1749   attemptCreation ai
   1750 
   1751 let makeUnixSocketAi path =
   1752   { Unix.ai_family = Unix.PF_UNIX;
   1753     ai_socktype = Unix.SOCK_STREAM;
   1754     ai_protocol = 0;
   1755     ai_addr = Unix.ADDR_UNIX path;
   1756     ai_canonname = "" }
   1757 
   1758 let buildConnectSocketUnix path =
   1759   assert (String.length path > 2);
   1760   (* Unix domain socket path from [Clroot] is enclosed in curly braces.
   1761      Extract the real path. *)
   1762   let path = String.sub path 1 ((String.length path) - 2) in
   1763   let err = "Can't connect to Unix domain socket on path " in
   1764   buildSocket "" path `Connect ~err (makeUnixSocketAi path) >>= function
   1765   | None ->
   1766       Lwt.fail (Util.Fatal (err ^ path))
   1767   | Some x ->
   1768       Lwt.return x
   1769 
   1770 let buildConnectSocket host port =
   1771   let isHost = String.length host > 0 && host.[0] <> '{' in
   1772   if not isHost then buildConnectSocketUnix host else
   1773   let err = "Failed to connect to the server on host " in
   1774   let attemptCreation ai = buildSocket host port `Connect ~err ai in
   1775   let options = [ Unix.AI_SOCKTYPE Unix.SOCK_STREAM ] in
   1776   findFirst attemptCreation (Unix.getaddrinfo host port options) >>= fun res ->
   1777   match res with
   1778     Some socket ->
   1779       Lwt.return socket
   1780   | None ->
   1781       let hostport = Printf.sprintf "%s:%s" host port in
   1782       Lwt.fail (Util.Fatal (err ^ hostport))
   1783 
   1784 (* [at_exit] does not provide reliable cleanup (why?), so this
   1785    complex mechanism is needed to unlink Unix domain sockets
   1786    in case of exceptional termination. *)
   1787 let createdUnixSockets = ref []
   1788 
   1789 let postponeUnixSocketCleanup path =
   1790   createdUnixSockets := path :: !createdUnixSockets
   1791 
   1792 let unixSocketCleanup () =
   1793   Safelist.iter
   1794     (fun path -> try Unix.unlink path with Unix.Unix_error _ -> ())
   1795     !createdUnixSockets
   1796 
   1797 let buildListenSocketUnix path =
   1798   assert (path <> "");
   1799   buildSocket "" path `Bind (makeUnixSocketAi path) >>= function
   1800   | None ->
   1801       Lwt.fail (Util.Fatal
   1802         (Printf.sprintf "Can't bind Unix domain socket on path %s" path))
   1803   | Some x ->
   1804       postponeUnixSocketCleanup path;
   1805       Lwt.return [x]
   1806 
   1807 let buildListenSocket hosts port =
   1808   let isPort = try ignore (int_of_string port); true with Failure _ -> false in
   1809   if not isPort then buildListenSocketUnix port else
   1810   let options = [ Unix.AI_SOCKTYPE Unix.SOCK_STREAM ; Unix.AI_PASSIVE ] in
   1811   hosts
   1812   |> Safelist.map (fun host -> Unix.getaddrinfo host port options)
   1813   |> Safelist.concat
   1814   |> Lwt_util.map (buildSocket "" port `Bind) >>= fun res ->
   1815   match Safelist.filter (fun x -> x <> None) res with
   1816   | [] ->
   1817       Lwt.fail (Util.Fatal (Printf.sprintf "Can't bind socket to port %s" port))
   1818   | s ->
   1819       Lwt.return (Safelist.map (function None -> assert false | Some x -> x) s)
   1820 
   1821 let buildSocketConnection host port =
   1822   buildConnectSocket host port >>= fun socket ->
   1823   initConnection socket socket
   1824 
   1825 let buildShellConnection shell host userOpt portOpt rootName termInteract =
   1826   let remoteCmd =
   1827     (if Prefs.read serverCmd="" then Uutil.myName
   1828      else Prefs.read serverCmd)
   1829     ^ (if Prefs.read addversionno then "-" ^ Uutil.myMajorVersion else "")
   1830     ^ " -server " ^ rpcServerCmdlineOverride in
   1831   let userArgs =
   1832     match userOpt with
   1833       None -> []
   1834     | Some user -> ["-l"; user] in
   1835   let portArgs =
   1836     match portOpt with
   1837       None -> []
   1838     | Some port -> ["-p"; port] in
   1839   let shellCmd =
   1840     (if shell = "ssh" then
   1841       Prefs.read sshCmd
   1842     else
   1843       shell) in
   1844   let shellCmdArgs =
   1845     (if shell = "ssh" then
   1846       Prefs.read sshargs
   1847     else
   1848       "") in
   1849   let preargs =
   1850     (userArgs @ portArgs @
   1851        [host]@
   1852        (if shell="ssh" then ["-e none"] else [])@
   1853        [shellCmdArgs;remoteCmd]) in
   1854   (* Split compound arguments at space chars, to make
   1855      create_process happy *)
   1856   let args = [shellCmd] @
   1857     Safelist.concat
   1858       (Safelist.map (fun s -> Util.splitIntoWords s ' ') preargs) in
   1859   let argsarray = Array.of_list args in
   1860   let (i1,o1) = Lwt_unix.pipe_out ~cloexec:true () in
   1861   let (i2,o2) = Lwt_unix.pipe_in ~cloexec:true () in
   1862   (* We need to make sure that there is only one reader and one
   1863      writer by pipe, so that, when one side of the connection
   1864      dies, the other side receives an EOF or a SIGPIPE. *)
   1865   debug (fun ()-> Util.msg "Shell connection: %s (%s)\n"
   1866            shellCmd (String.concat ", " args));
   1867   let (term, termPid) =
   1868     Util.convertUnixErrorsToFatal "starting shell connection" (fun () ->
   1869     match termInteract with
   1870     | None ->
   1871         (* Signals generated by the terminal from user input are sent to all
   1872            processes in the foreground process group. This means that the ssh
   1873            child process will receive SIGINT at the same time as Unison and
   1874            close the connection before Unison has the chance to do cleanup with
   1875            the remote end. To make matters more complicated, the ssh process
   1876            must be in the foreground process group because interaction with the
   1877            user is done via the terminal (not via stdin, stdout) and background
   1878            processes can't read from the terminal (unless we'd set up a pty
   1879            like is done for the GUI).
   1880 
   1881            Don't let these signals reach ssh by blocking them.
   1882 
   1883            Unfortunately, a bug introduced in OpenSSH 9.6 (also present in 9.7)
   1884            breaks this workaround by unblocking SIGINT in the ssh process.
   1885 
   1886            The signals could be ignored instead of being blocked because ssh
   1887            does not set handlers for SIGINT and SIGQUIT if they've been ignored
   1888            at startup. But this triggers an error in ssh. The interactive
   1889            passphrase reading function captures these signals for the purpose
   1890            of restoring terminal settings (echo). When receiving a signal, and
   1891            after restoring previous signal handlers, it resends the signal to
   1892            itself. But now the signal is ignored and instead of terminating,
   1893            the process will continue running as if passphrase reading function
   1894            had returned with an empty result.
   1895 
   1896            Since the ssh process no longer receives the signals generated by
   1897            user input we have to make sure that it terminates when Unison does.
   1898            This usually happens due to its stdin and stdout being closed,
   1899            except for when it is interacting with the user via terminal. To get
   1900            around that, an [at_exit] handler is registered to send a SIGTERM
   1901            and SIGKILL to the ssh process.  (Note, for [at_exit] handlers to
   1902            run, unison process must terminate normally, not be killed. For
   1903            SIGINT, this means that [Sys.catch_break true] (or an alternative
   1904            SIGINT handler) must be set before creating the ssh process.) *)
   1905         let pid = Util.blockSignals [Sys.sigint] (fun () ->
   1906           System.create_process shellCmd argsarray i1 o2 Unix.stderr) in
   1907         let end_ssh () =
   1908           let kill_noerr si = try Unix.kill pid si
   1909             with Unix.Unix_error _ -> () | Invalid_argument _ -> () in
   1910           match Unix.waitpid [WNOHANG] pid with
   1911           | (0, _) ->
   1912               (* Grace period before killing. Important to give ssh a chance
   1913                  to restore terminal settings, should that be needed. *)
   1914               kill_noerr Sys.sigterm; Unix.sleepf 0.01; kill_noerr Sys.sigkill
   1915           | _ | exception Unix.Unix_error _ -> ()
   1916         in
   1917         let () = at_exit end_ssh in
   1918         (None, pid)
   1919     | Some callBack ->
   1920         Terminal.create_session shellCmd argsarray i1 o2 Unix.stderr)
   1921   in
   1922   Unix.close i1; Unix.close o2;
   1923   let writeSilentNoexn fd s pos len =
   1924     ignore (try if len > 0 then Unix.write fd s pos len else 0
   1925             with Unix.Unix_error _ -> 0)
   1926   in
   1927   let forwardShellStderr fdIn fdOut s =
   1928     (* When the shell connection has been established then keep
   1929        forwarding server's stderr to client's stderr; not to GUI. *)
   1930     let buf = Bytes.create 16000 in
   1931     let rec loop s len =
   1932       (* Can't use printf because if stderr is not open in Windows,
   1933          it will throw an exception when at_exit tries to flush it. *)
   1934       writeSilentNoexn fdOut s 0 len;
   1935       Lwt.catch (fun () -> Lwt_unix.read fdIn buf 0 16000)
   1936         (fun _ -> debug (fun () ->
   1937            Util.msg "Caught an exception when reading remote stderr\n");
   1938            Lwt.return 0)
   1939       >>= function
   1940       | 0 -> Lwt.return ()
   1941       | len -> loop buf len
   1942     in
   1943     loop (Bytes.of_string s) (String.length s)
   1944   in
   1945   let (connReady, getTermErr) =
   1946     match term, termInteract with
   1947     | Some fdTerm, Some interact ->
   1948         let (setReady, handleRequests, extractRemainingOutput) =
   1949           Terminal.handlePasswordRequests fdTerm (interact rootName) in
   1950         Lwt.ignore_result (
   1951           handleRequests >>=
   1952           forwardShellStderr (fst fdTerm) Unix.stderr);
   1953         let connReady () =
   1954           let s = setReady () in
   1955           writeSilentNoexn Unix.stderr (Bytes.of_string s) 0 (String.length s)
   1956         in
   1957         (connReady, extractRemainingOutput)
   1958     | _ ->
   1959         (Fun.id, fun () -> Lwt.return "")
   1960   in
   1961   let cleanup () =
   1962     (* Make sure the [handlePasswordRequests] threads will finish while
   1963        silencing any exceptions (most likely EBADF) caused by having closed
   1964        the terminal fds. *)
   1965     Lwt.ignore_result (getTermErr () >>= fun s ->
   1966       debug (fun () ->
   1967         if s <> "" then Util.msg "Received from remote shell process:\n%s\n" s);
   1968       Lwt.return ());
   1969     if term = None then
   1970       try ignore (Terminal.safe_waitpid termPid) with Unix.Unix_error _ -> ()
   1971     else
   1972       try Terminal.close_session termPid with Unix.Unix_error _ -> ()
   1973   in
   1974   (* With [connReady], we know that shell connection was established (even if
   1975      RPC handshake failed). This hacky way of detecting the connection is used
   1976      because [Lwt_unix.wait_read] is not implemented under Windows.
   1977      By this time, we are already somewhat late in the communication process.
   1978      Any error output from very early stages of server startup, before other
   1979      output is produced, might still end up in GUI (but this is very unlikely;
   1980      it is more likely that the same error caused connection to be dropped). *)
   1981   Lwt.catch
   1982     (fun () -> initConnection ~connReady ~cleanup i2 o1)
   1983     (fun e ->
   1984       Lwt.catch
   1985         (fun () -> getTermErr () >>= fun s ->
   1986                    if s <> "" then Util.warn s;
   1987                    Lwt.fail e)
   1988         (* Don't close the terminal before reading the final error output
   1989            or we might miss it completely. *)
   1990         (fun _ ->  cleanup (); Lwt.fail e))
   1991 
   1992 let canonizeLocally s =
   1993   Fspath.canonize s
   1994 
   1995 let canonizeOnServer =
   1996   registerServerCmd "canonizeOnServer"
   1997     Umarshal.(prod2 (option string) bool id id)
   1998     Umarshal.(prod2 string Fspath.m id id)
   1999     (fun _ (s, _) -> (* The tuple is kept for backwards API compatibility *)
   2000        Lwt.return (Os.myCanonicalHostName (), canonizeLocally s))
   2001 
   2002 let canonizeOnServer conn s =
   2003   (* The second tuple item is required for compatibility with <= 2.52 *)
   2004   canonizeOnServer conn (s, true)
   2005 
   2006 let canonize clroot = (* connection for clroot must have been set up already *)
   2007   match clroot with
   2008     Clroot.ConnectLocal s ->
   2009       (Common.Local, canonizeLocally s)
   2010   | _ ->
   2011       match ClientConn.canonRootOfClroot clroot with
   2012         None                -> raise (Util.Fatal "Remote.canonize")
   2013       | Some root -> root
   2014 
   2015 let isRootConnected = function
   2016   | (Common.Local, _) -> true
   2017   | (Common.Remote _, _) as root -> ClientConn.ofRootConncheck root <> None
   2018 
   2019 let canonizeRoot rootName clroot termInteract =
   2020   let finish ioServer s =
   2021     (* We need to always compute the fspath as it may have changed
   2022        due to profile configuration changes *)
   2023     canonizeOnServer ioServer s >>= (fun (host, fspath) ->
   2024     let root = (Common.Remote host, fspath) in
   2025     ClientConn.register clroot root ioServer;
   2026     Lwt.return root) in
   2027   match clroot with
   2028     Clroot.ConnectLocal s ->
   2029       Lwt.return (Common.Local, canonizeLocally s)
   2030   | Clroot.ConnectBySocket(host,port,s) ->
   2031       begin match ClientConn.ofClrootConncheck clroot with
   2032       | Some x -> Lwt.return x
   2033       | None   -> buildSocketConnection host port
   2034       end >>= fun ioServer ->
   2035       finish ioServer s
   2036   | Clroot.ConnectByShell(shell,host,userOpt,portOpt,s) ->
   2037       begin match ClientConn.ofClrootConncheck clroot with
   2038       | Some x -> Lwt.return x
   2039       | None   -> buildShellConnection
   2040                    shell host userOpt portOpt rootName termInteract
   2041       end >>= fun ioServer ->
   2042       finish ioServer s
   2043 
   2044 (* A new interface, useful for terminal interaction, it should
   2045    eventually replace canonizeRoot and buildShellConnection *)
   2046 (* A preconnection is None if there's nothing more to do, and Some if
   2047    terminal interaction might be required (for ssh password) *)
   2048 type preconnection =
   2049      (Unix.file_descr
   2050      * Lwt_unix.file_descr
   2051      * Lwt_unix.file_descr
   2052      * Unix.file_descr
   2053      * string option
   2054      * (Lwt_unix.file_descr * Lwt_unix.file_descr) option
   2055      * Clroot.clroot
   2056      * int)
   2057 let openConnectionStart clroot =
   2058   match clroot with
   2059     Clroot.ConnectLocal s ->
   2060       None
   2061   | Clroot.ConnectBySocket(host,port,s) ->
   2062       Lwt_unix.run
   2063         (begin match ClientConn.ofClrootConncheck clroot with
   2064          | Some x -> Lwt.return x
   2065          | None   -> buildSocketConnection host port
   2066          end >>= fun ioServer ->
   2067          (* We need to always compute the fspath as it may have changed
   2068             due to profile configuration changes *)
   2069          canonizeOnServer ioServer s >>= fun (host, fspath) ->
   2070          ClientConn.register clroot (Common.Remote host, fspath) ioServer;
   2071          Lwt.return ());
   2072       None
   2073   | Clroot.ConnectByShell(shell,host,userOpt,portOpt,s) ->
   2074       match ClientConn.ofClrootConncheck clroot with
   2075       | Some ioServer ->
   2076            (* We recompute the fspath as it may have changed due to
   2077               profile configuration changes *)
   2078            Lwt_unix.run
   2079              (canonizeOnServer ioServer s >>= fun (host, fspath) ->
   2080               ClientConn.register clroot (Common.Remote host, fspath) ioServer;
   2081               Lwt.return ());
   2082            None
   2083       | None ->
   2084           let remoteCmd =
   2085             (if Prefs.read serverCmd="" then Uutil.myName
   2086              else Prefs.read serverCmd)
   2087             ^ (if Prefs.read addversionno then "-" ^ Uutil.myMajorVersion else "")
   2088             ^ " -server " ^ rpcServerCmdlineOverride in
   2089           let userArgs =
   2090             match userOpt with
   2091               None -> []
   2092             | Some user -> ["-l"; user] in
   2093           let portArgs =
   2094             match portOpt with
   2095               None -> []
   2096             | Some port -> ["-p"; port] in
   2097           let shellCmd =
   2098             (if shell = "ssh" then
   2099               Prefs.read sshCmd
   2100             else
   2101               shell) in
   2102           let shellCmdArgs =
   2103             (if shell = "ssh" then
   2104               Prefs.read sshargs
   2105             else
   2106               "") in
   2107           let preargs =
   2108               ([shellCmd]@userArgs@portArgs@
   2109                [host]@
   2110                (if shell="ssh" then ["-e none"] else [])@
   2111                [shellCmdArgs;remoteCmd]) in
   2112           (* Split compound arguments at space chars, to make
   2113              create_process happy *)
   2114           let args =
   2115             Safelist.concat
   2116               (Safelist.map (fun s -> Util.splitIntoWords s ' ') preargs) in
   2117           let argsarray = Array.of_list args in
   2118           let (i1,o1) = Lwt_unix.pipe_out ~cloexec:true () in
   2119           let (i2,o2) = Lwt_unix.pipe_in ~cloexec:true () in
   2120           (* We need to make sure that there is only one reader and one
   2121              writer by pipe, so that, when one side of the connection
   2122              dies, the other side receives an EOF or a SIGPIPE. *)
   2123           debug (fun ()-> Util.msg "Shell connection: %s (%s)\n"
   2124                    shellCmd (String.concat ", " args));
   2125           let (term,pid) =
   2126             Terminal.create_session shellCmd argsarray i1 o2 Unix.stderr in
   2127           (* after terminal interact, remember to close i1 and o2 *)
   2128           Some(i1,i2,o1,o2,s,term,clroot,pid)
   2129 
   2130 let openConnectionPrompt = function
   2131     (i1,i2,o1,o2,s,Some fdTerm,clroot,pid) ->
   2132       let x = Terminal.termInput fdTerm i2 in
   2133       x
   2134   | _ -> None
   2135 
   2136 let openConnectionReply = function
   2137     (i1,i2,o1,o2,s,Some fdTerm,clroot,pid) ->
   2138     (fun response ->
   2139       (* FIX: should loop until everything is written... *)
   2140       ignore (Lwt_unix.run (Lwt_unix.write (snd fdTerm) (Bytes.of_string (response ^ "\n")) 0
   2141                               (String.length response + 1))))
   2142   | _ -> (fun _ -> ())
   2143 
   2144 let openConnectionEnd (i1,i2,o1,o2,s,fdopt,clroot,pid) =
   2145       Unix.close i1; Unix.close o2;
   2146       let cleanup () =
   2147         try Terminal.close_session pid with Unix.Unix_error _ -> ()
   2148       in
   2149       Lwt_unix.run
   2150         (initConnection ~cleanup i2 o1 >>= fun ioServer ->
   2151          canonizeOnServer ioServer s >>= fun (host, fspath) ->
   2152          ClientConn.register clroot (Common.Remote host, fspath) ioServer;
   2153          Lwt.return ())
   2154 
   2155 let openConnectionCancel (i1,i2,o1,o2,s,fdopt,clroot,pid) =
   2156   (try Unix.kill pid Sys.sigkill with Unix.Unix_error _ -> ());
   2157   (try Unix.close i1 with Unix.Unix_error _ -> ());
   2158   (try Lwt_unix.close i2 with Unix.Unix_error _ -> ());
   2159   (try Lwt_unix.close o1 with Unix.Unix_error _ -> ());
   2160   (try Unix.close o2 with Unix.Unix_error _ -> ());
   2161   (try Terminal.close_session pid with Unix.Unix_error _ -> ())
   2162 
   2163 (****************************************************************************)
   2164 (*                     SERVER-MODE COMMAND PROCESSING LOOP                  *)
   2165 (****************************************************************************)
   2166 
   2167 let checkClientVersion conn () =
   2168   let reply msg = sendHandshakeMsg conn msg in
   2169   (* FIX: In future when gaining the ability to close connections from server
   2170      side, make errors close the connection, not just send to client. *)
   2171   let error = sendHandshakeErr conn in
   2172   receiveHandshakeData conn rpcVersionTag >>= function
   2173   | Error msg ->
   2174       error ("Could not negotiate RPC version. "
   2175              ^ "Received unexpected error from the client: \"" ^ msg ^ "\"")
   2176   | Unknown fromClient ->
   2177       error ("Could not negotiate RPC version. "
   2178              ^ "Received unexpected header from the client: \""
   2179              ^ String.escaped (fromClient
   2180              ^ Bytes.to_string (peekWithoutBlocking conn.inputBuffer)) ^ "\"")
   2181   | Data buf ->
   2182       match parseVersion "client" buf with
   2183       | Some clientVer ->
   2184           if verIsSupported clientVer then begin
   2185             setConnectionVersion conn clientVer;
   2186             reply Ok
   2187           end else
   2188             error ("Client RPC version not supported. "
   2189                    ^ "Version received from client: \""
   2190                    ^ string_of_int clientVer ^ "\". "
   2191                    ^ "Supported RPC versions: " ^ rpcSupportedVersionStr)
   2192       | None -> Lwt.return ()
   2193       | exception Util.Transient e -> error e
   2194 
   2195 (****)
   2196 
   2197 let showWarningOnClient =
   2198     (registerServerCmd
   2199        "showWarningOnClient" Umarshal.string Umarshal.unit
   2200        (fun _ str -> Lwt.return (Util.warn str)))
   2201 
   2202 let forwardMsgToClient =
   2203     (registerServerCmd
   2204        "forwardMsgToClient" Trace.mmsg Umarshal.unit
   2205        (fun _ str -> (*msg "forwardMsgToClient: %s\n" str; *)
   2206           Lwt.return (Trace.displayMessageLocally str)))
   2207 
   2208 (* Compatibility mode for 2.51 clients. *)
   2209 let compatServerInit mode conn =
   2210   let compatConnectionHeader =
   2211     match mode with
   2212     | Some "2.48" -> compat248ConnectionHeader
   2213     | _ -> compatConnectionHeader
   2214   in
   2215   dump conn [(Bytearray.of_string compatConnectionHeader, 0,
   2216                 String.length compatConnectionHeader)] >>= fun () ->
   2217   (* Send the magic string to notify new clients *)
   2218   dumpUrgent conn magic >>= fun () ->
   2219   (* Must enable flow control because that is the default for 2.51.
   2220      This must be done after dumpUrgent above to ensure that the write
   2221      token is sent the last. *)
   2222   enableFlowControl conn true >>= fun () ->
   2223   (* Let's see if the client noticed the magic string. This is
   2224      a no-op for old clients. *)
   2225   checkForMagicString conn
   2226 
   2227 let compatServerRun conn =
   2228   (* Set the local warning printer to make an RPC to the client and
   2229      show the warning there; ditto for the message printer *)
   2230   Util.warnPrinter :=
   2231     Some (fun str -> Lwt_unix.run (showWarningOnClient conn str));
   2232   Trace.messageForwarder :=
   2233     Some (fun str -> Lwt_unix.run (forwardMsgToClient conn str));
   2234   receive conn >>=
   2235   Lwt.wait
   2236 
   2237 (* This function loops, waits for commands, and passes them to
   2238    the relevant functions. *)
   2239 let commandLoop ~compatMode in_ch out_ch =
   2240   Trace.runningasserver := true;
   2241   (* Send header indicating to the client that it has successfully
   2242      connected to the server *)
   2243   let conn = makeConnection true in_ch out_ch in
   2244   Lwt.catch
   2245     (fun () ->
   2246        (if compatMode <> None then
   2247          let () = setConnectionVersion conn 0 in
   2248          compatServerInit compatMode conn >>= (fun upgrade ->
   2249          if upgrade then begin
   2250            (* Restore the state before starting protocol negotiation *)
   2251            allowWrites conn.outputQueue;
   2252            disableFlowControl conn.outputQueue
   2253          end;
   2254          Lwt.return upgrade)
   2255        else
   2256          Lwt.return true) >>= fun upgrade ->
   2257        debug (fun () -> Util.msg "%sGoing to attempt RPC version upgrade\n"
   2258                  (if upgrade then "" else "NOT "));
   2259        if not upgrade then
   2260          compatServerRun conn
   2261        else
   2262        sendStrings conn [connectionHeader; rpcVersionsStr] >>=
   2263        checkClientVersion conn >>= fun () ->
   2264        (* From this moment forward, the RPC version has been selected. All
   2265           communication must now adhere to that version's specification. *)
   2266        (* Flow control was disabled for RPC version handshake. Enable it
   2267           for flow control negotiation. *)
   2268        enableFlowControl conn true >>= (fun () ->
   2269        (* Set the local warning printer to make an RPC to the client and
   2270           show the warning there; ditto for the message printer *)
   2271        Util.warnPrinter :=
   2272          Some (fun str -> Lwt_unix.run (showWarningOnClient conn str));
   2273        Trace.messageForwarder :=
   2274          Some (fun str -> Lwt_unix.run (forwardMsgToClient conn str));
   2275        receive conn >>=
   2276        Lwt.wait))
   2277     (fun e ->
   2278        match e with
   2279          Util.Fatal "Lost connection with the server" ->
   2280            debug (fun () -> Util.msg "Connection closed by the client\n");
   2281            (* We prevent new writes and wait for any current write to
   2282               terminate.  As we don't have a good way to wait for the
   2283               writer to terminate, we just yield a bit. *)
   2284            let rec wait n =
   2285              if n = 0 then Lwt.return () else begin
   2286                Lwt_unix.yield () >>= fun () ->
   2287                wait (n - 1)
   2288              end
   2289            in
   2290            conn.outputBuffer.opened <- false;
   2291            wait 10
   2292        | _ ->
   2293            Lwt.fail e)
   2294 
   2295 let killServer =
   2296   Prefs.createBool "killserver" false
   2297     ~category:(`Advanced `Remote)
   2298     "kill server when done (even when using sockets)"
   2299     ("When set to \\verb|true|, this flag causes Unison to kill the remote "
   2300      ^ "server process when the synchronization is finished.  This behavior "
   2301      ^ "is the default for \\verb|ssh| connections, so this preference is not "
   2302      ^ "normally needed when running over \\verb|ssh|; it is provided so "
   2303      ^ "that socket-mode servers can be killed off after a single run of "
   2304      ^ "Unison, rather than waiting to accept future connections.  (Some "
   2305      ^ "users prefer to start a remote socket server for each run of Unison, "
   2306      ^ "rather than leaving one running all the time.)")
   2307 
   2308 (* For backward compatibility *)
   2309 let _ = Prefs.alias killServer "killServer"
   2310 
   2311 (* FIX: This code should be removed when removing 2.51-compatibility code. *)
   2312 let is248Exe =
   2313   let exeName = Filename.basename (Sys.executable_name) in
   2314   String.length exeName >= 11 && String.sub exeName 0 11 = "unison-2.48"
   2315 
   2316 let rec accept_retry l =
   2317   Lwt.catch
   2318     (fun () -> Lwt_unix.accept l)
   2319     (function
   2320      (* Temporary and connection-specific errors *)
   2321      | Unix.Unix_error (Unix.ECONNABORTED, _, _)
   2322      | Unix.Unix_error (Unix.EPERM, _, _)  (* Linux firewall *)
   2323      | Unix.Unix_error
   2324          (* Resource exhaustion: could be considered temporary *)
   2325          (Unix.(EMFILE | ENFILE | ENOBUFS | ENOMEM), _, _)
   2326        (* Linux curiosity: accept(2) may return errors on the new socket *)
   2327      | Unix.Unix_error (Unix.ENETUNREACH, _, _)
   2328      | Unix.Unix_error (Unix.EHOSTUNREACH, _, _)
   2329      | Unix.Unix_error (Unix.ENETDOWN, _, _)
   2330      | Unix.Unix_error (Unix.EHOSTDOWN, _, _)
   2331      | Unix.Unix_error (Unix.ETIMEDOUT, _, _) as e ->
   2332          let errmsg = match e with
   2333            | Unix.Unix_error (err, _, _) -> Unix.error_message err
   2334            | _ -> Printexc.to_string e in
   2335          Util.msg "server: continuing after receiving an error \
   2336            when accepting client connection: %s\n" errmsg;
   2337          accept_retry l
   2338      (* Permanent errors *)
   2339      | e -> Lwt.fail e)
   2340 
   2341 (* Used by the socket mechanism: Create a socket on portNum and wait
   2342    for a request. Each request is processed by commandLoop. When a
   2343    session finishes, the server waits for another request. *)
   2344 let waitOnPort hosts port =
   2345   Util.convertUnixErrorsToFatal "waiting on port"
   2346     (fun () ->
   2347        let hosts = match hosts with [] -> [""] | _ -> hosts in
   2348        let listening = Lwt_unix.run (buildListenSocket hosts port) in
   2349        let accepting = Array.make (Safelist.length listening) None in
   2350        let rec accept i l =
   2351          match accepting.(i) with
   2352            | None ->
   2353                let st = accept_retry l >>= fun s -> Lwt.return (i, s) in
   2354                let () = accepting.(i) <- Some st in
   2355                st
   2356            | Some st -> st
   2357        and serve (i, s) = accepting.(i) <- None; setKeepalive s; s
   2358        and setKeepalive = function
   2359          | (_, Unix.ADDR_UNIX _) -> ()
   2360          | (c, ADDR_INET _) -> Lwt_unix.setsockopt c Unix.SO_KEEPALIVE true
   2361        in
   2362        Util.msg "server started\n";
   2363        let rec handleClients () =
   2364          let (connected, _) =
   2365            serve @@ Lwt_unix.run (Lwt.choose (List.mapi accept listening))
   2366          in
   2367          registerIOClose (connected, connected) (fun () -> doCleanup connected);
   2368          begin try
   2369            (* Accept a connection *)
   2370            let compatMode = Some (if is248Exe then "2.48" else "2.51") in
   2371            Lwt_unix.run (commandLoop ~compatMode connected connected)
   2372          with Util.Fatal "Lost connection with the server" -> () end;
   2373          (* The client has closed its end of the connection *)
   2374          if not (Prefs.read killServer) then handleClients ()
   2375        and doCleanup socket =
   2376          begin try Lwt_unix.close socket with Unix.Unix_error _ -> () end;
   2377          if not (Prefs.read killServer) then runConnCloseHandlers true
   2378        in
   2379        try
   2380          Sys.catch_break true;
   2381          handleClients ();
   2382          unixSocketCleanup ()
   2383        with
   2384        | Sys.Break ->
   2385            unixSocketCleanup ()
   2386        | (Util.Fatal _ | Unix.Unix_error _) as e ->
   2387            unixSocketCleanup ();
   2388            raise e
   2389     )
   2390 
   2391 let beAServer () =
   2392   begin try
   2393     let home = System.getenv "HOME" in
   2394     Util.convertUnixErrorsToFatal
   2395       "changing working directory"
   2396       (fun () -> System.chdir home)
   2397   with Not_found ->
   2398     Util.msg
   2399       "Environment variable HOME unbound: \
   2400        executing server in current directory\n"
   2401   end;
   2402   (* Let's start with 2.51-compatibility mode. Newer clients will add
   2403      a special override keyword in server args that will disable the
   2404      compatibility mode.
   2405 
   2406      FIX: It is a bit of a hack, so better not make it permanent.
   2407      It was added in 2021 and should be removed after a couple of years. *)
   2408   let compatMode =
   2409      try
   2410        not (Prefs.scanCmdLine "" |> Util.StringMap.find "rest"
   2411             |> Safelist.mem rpcServerCmdlineOverride)
   2412      with Not_found -> true
   2413   in
   2414   (* Additionally, do a best effort emulation of 2.48.
   2415      FIX: remove together with code above. *)
   2416   let compatMode =
   2417     match compatMode with
   2418     | true when is248Exe -> Some "2.48"
   2419     | true -> Some "2.51"
   2420     | false -> None
   2421   in
   2422   begin end;
   2423   Lwt_unix.run
   2424     (commandLoop ~compatMode
   2425        (Lwt_unix.of_unix_file_descr Unix.stdin)
   2426        (Lwt_unix.of_unix_file_descr Unix.stdout))