]> nmode's Git Repositories - signal-cli/blob - client/src/transports/stream_codec.rs
6f77306f689901c806a91073cc8af3b5cc746174
[signal-cli] / client / src / transports / stream_codec.rs
1 use bytes::BytesMut;
2 use std::{io, str};
3 use tokio_util::codec::{Decoder, Encoder};
4
5 type Separator = u8;
6
7 /// Stream codec for streaming protocols (ipc, tcp)
8 #[derive(Debug, Default)]
9 pub struct StreamCodec {
10 incoming_separator: Separator,
11 outgoing_separator: Separator,
12 }
13
14 impl StreamCodec {
15 /// Default codec with streaming input data. Input can be both enveloped and not.
16 pub fn stream_incoming() -> Self {
17 StreamCodec::new(b'\n', b'\n')
18 }
19
20 /// New custom stream codec
21 pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
22 StreamCodec {
23 incoming_separator,
24 outgoing_separator,
25 }
26 }
27 }
28
29 impl Decoder for StreamCodec {
30 type Item = String;
31 type Error = io::Error;
32
33 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
34 if let Some(i) = buf
35 .as_ref()
36 .iter()
37 .position(|&b| b == self.incoming_separator)
38 {
39 let line = buf.split_to(i);
40 let _ = buf.split_to(1);
41
42 match str::from_utf8(line.as_ref()) {
43 Ok(s) => Ok(Some(s.to_string())),
44 Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
45 }
46 } else {
47 Ok(None)
48 }
49 }
50 }
51
52 impl Encoder<String> for StreamCodec {
53 type Error = io::Error;
54
55 fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
56 let mut payload = msg.into_bytes();
57 payload.push(self.outgoing_separator);
58 buf.extend_from_slice(&payload);
59 Ok(())
60 }
61 }