3 use tokio_util::codec::{Decoder, Encoder};
7 /// Stream codec for streaming protocols (ipc, tcp)
8 #[derive(Debug, Default)]
9 pub struct StreamCodec {
10 incoming_separator: Separator,
11 outgoing_separator: Separator,
15 /// Default codec with streaming input data. Input can be both enveloped and not.
16 pub fn stream_incoming() -> Self {
17 StreamCodec::new(b'\n', b'\n')
20 /// New custom stream codec
21 pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
29 impl Decoder for StreamCodec {
31 type Error = io::Error;
33 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
37 .position(|&b| b == self.incoming_separator)
39 let line = buf.split_to(i);
40 let _ = buf.split_to(1);
42 match str::from_utf8(line.as_ref()) {
43 Ok(s) => Ok(Some(s.to_string())),
44 Err(_) => Err(io::Error::other("invalid UTF-8")),
52 impl Encoder<String> for StreamCodec {
53 type Error = io::Error;
55 fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
56 let mut payload = msg.into_bytes();
57 payload.push(self.outgoing_separator);
58 buf.extend_from_slice(&payload);