X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/72611296095e8a91d39fa5929c8c61548d413c4f..abde122a35d85f5db956d652300d7a995d460863:/client/src/tcp.rs diff --git a/client/src/tcp.rs b/client/src/tcp.rs new file mode 100644 index 00000000..53650381 --- /dev/null +++ b/client/src/tcp.rs @@ -0,0 +1,29 @@ +use jsonrpc_client_transports::{transports::duplex, RpcChannel, RpcError}; +use jsonrpc_core::futures_util::{SinkExt, StreamExt, TryStreamExt}; +use jsonrpc_server_utils::{codecs::StreamCodec, tokio_util::codec::Decoder}; +use tokio::net::{TcpStream, ToSocketAddrs}; + +/// Connect to a JSON-RPC TCP server. +pub async fn connect>( + socket: S, +) -> Result { + let connection = TcpStream::connect(socket) + .await + .map_err(|e| RpcError::Other(Box::new(e)))?; + let (sink, stream) = StreamCodec::stream_incoming().framed(connection).split(); + let sink = sink.sink_map_err(|e| RpcError::Other(Box::new(e))); + let stream = stream.map_err(|e| log::error!("TCP stream error: {}", e)); + + let (client, sender) = duplex( + Box::pin(sink), + Box::pin( + stream + .take_while(|x| std::future::ready(x.is_ok())) + .map(|x| x.expect("Stream is closed upon first error.")), + ), + ); + + tokio::spawn(client); + + Ok(sender.into()) +}