]> nmode's Git Repositories - signal-cli/blobdiff - client/src/transports/stream_codec.rs
Switch to jsonrpsee
[signal-cli] / client / src / transports / stream_codec.rs
diff --git a/client/src/transports/stream_codec.rs b/client/src/transports/stream_codec.rs
new file mode 100644 (file)
index 0000000..6f77306
--- /dev/null
@@ -0,0 +1,61 @@
+use bytes::BytesMut;
+use std::{io, str};
+use tokio_util::codec::{Decoder, Encoder};
+
+type Separator = u8;
+
+/// Stream codec for streaming protocols (ipc, tcp)
+#[derive(Debug, Default)]
+pub struct StreamCodec {
+    incoming_separator: Separator,
+    outgoing_separator: Separator,
+}
+
+impl StreamCodec {
+    /// Default codec with streaming input data. Input can be both enveloped and not.
+    pub fn stream_incoming() -> Self {
+        StreamCodec::new(b'\n', b'\n')
+    }
+
+    /// New custom stream codec
+    pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
+        StreamCodec {
+            incoming_separator,
+            outgoing_separator,
+        }
+    }
+}
+
+impl Decoder for StreamCodec {
+    type Item = String;
+    type Error = io::Error;
+
+    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
+        if let Some(i) = buf
+            .as_ref()
+            .iter()
+            .position(|&b| b == self.incoming_separator)
+        {
+            let line = buf.split_to(i);
+            let _ = buf.split_to(1);
+
+            match str::from_utf8(line.as_ref()) {
+                Ok(s) => Ok(Some(s.to_string())),
+                Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
+            }
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+impl Encoder<String> for StreamCodec {
+    type Error = io::Error;
+
+    fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
+        let mut payload = msg.into_bytes();
+        payload.push(self.outgoing_separator);
+        buf.extend_from_slice(&payload);
+        Ok(())
+    }
+}