rumqttc/mqttbytes/v4/
publish.rs

1use super::*;
2use bytes::{Buf, Bytes};
3
4/// Publish packet
5#[derive(Clone, PartialEq, Eq)]
6pub struct Publish {
7    pub dup: bool,
8    pub qos: QoS,
9    pub retain: bool,
10    pub topic: String,
11    pub pkid: u16,
12    pub payload: Bytes,
13}
14
15impl Publish {
16    pub fn new<S: Into<String>, P: Into<Vec<u8>>>(topic: S, qos: QoS, payload: P) -> Publish {
17        Publish {
18            dup: false,
19            qos,
20            retain: false,
21            pkid: 0,
22            topic: topic.into(),
23            payload: Bytes::from(payload.into()),
24        }
25    }
26
27    pub fn from_bytes<S: Into<String>>(topic: S, qos: QoS, payload: Bytes) -> Publish {
28        Publish {
29            dup: false,
30            qos,
31            retain: false,
32            pkid: 0,
33            topic: topic.into(),
34            payload,
35        }
36    }
37
38    fn len(&self) -> usize {
39        let len = 2 + self.topic.len() + self.payload.len();
40        if self.qos != QoS::AtMostOnce && self.pkid != 0 {
41            len + 2
42        } else {
43            len
44        }
45    }
46
47    pub fn size(&self) -> usize {
48        let len = self.len();
49        let remaining_len_size = len_len(len);
50
51        1 + remaining_len_size + len
52    }
53
54    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
55        let qos = qos((fixed_header.byte1 & 0b0110) >> 1)?;
56        let dup = (fixed_header.byte1 & 0b1000) != 0;
57        let retain = (fixed_header.byte1 & 0b0001) != 0;
58
59        let variable_header_index = fixed_header.fixed_header_len;
60        bytes.advance(variable_header_index);
61        let topic = read_mqtt_string(&mut bytes)?;
62
63        // Packet identifier exists where QoS > 0
64        let pkid = match qos {
65            QoS::AtMostOnce => 0,
66            QoS::AtLeastOnce | QoS::ExactlyOnce => read_u16(&mut bytes)?,
67        };
68
69        if qos != QoS::AtMostOnce && pkid == 0 {
70            return Err(Error::PacketIdZero);
71        }
72
73        let publish = Publish {
74            dup,
75            retain,
76            qos,
77            pkid,
78            topic,
79            payload: bytes,
80        };
81
82        Ok(publish)
83    }
84
85    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
86        let len = self.len();
87
88        let dup = self.dup as u8;
89        let qos = self.qos as u8;
90        let retain = self.retain as u8;
91        buffer.put_u8(0b0011_0000 | retain | qos << 1 | dup << 3);
92
93        let count = write_remaining_length(buffer, len)?;
94        write_mqtt_string(buffer, self.topic.as_str());
95
96        if self.qos != QoS::AtMostOnce {
97            let pkid = self.pkid;
98            if pkid == 0 {
99                return Err(Error::PacketIdZero);
100            }
101
102            buffer.put_u16(pkid);
103        }
104
105        buffer.extend_from_slice(&self.payload);
106
107        // TODO: Returned length is wrong in other packets. Fix it
108        Ok(1 + count + len)
109    }
110}
111
112impl fmt::Debug for Publish {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        write!(
115            f,
116            "Topic = {}, Qos = {:?}, Retain = {}, Pkid = {:?}, Payload Size = {}",
117            self.topic,
118            self.qos,
119            self.retain,
120            self.pkid,
121            self.payload.len()
122        )
123    }
124}
125
126#[cfg(test)]
127mod test {
128    use super::*;
129    use bytes::{Bytes, BytesMut};
130    use pretty_assertions::assert_eq;
131
132    #[test]
133    fn qos1_publish_parsing_works() {
134        let stream = &[
135            0b0011_0010,
136            11, // packet type, flags and remaining len
137            0x00,
138            0x03,
139            b'a',
140            b'/',
141            b'b', // variable header. topic name = 'a/b'
142            0x00,
143            0x0a, // variable header. pkid = 10
144            0xF1,
145            0xF2,
146            0xF3,
147            0xF4, // publish payload
148            0xDE,
149            0xAD,
150            0xBE,
151            0xEF, // extra packets in the stream
152        ];
153
154        let mut stream = BytesMut::from(&stream[..]);
155        let fixed_header = parse_fixed_header(stream.iter()).unwrap();
156        let publish_bytes = stream.split_to(fixed_header.frame_length()).freeze();
157        let packet = Publish::read(fixed_header, publish_bytes).unwrap();
158
159        let payload = &[0xF1, 0xF2, 0xF3, 0xF4];
160        assert_eq!(
161            packet,
162            Publish {
163                dup: false,
164                qos: QoS::AtLeastOnce,
165                retain: false,
166                topic: "a/b".to_owned(),
167                pkid: 10,
168                payload: Bytes::from(&payload[..]),
169            }
170        );
171    }
172
173    #[test]
174    fn qos0_publish_parsing_works() {
175        let stream = &[
176            0b0011_0000,
177            7, // packet type, flags and remaining len
178            0x00,
179            0x03,
180            b'a',
181            b'/',
182            b'b', // variable header. topic name = 'a/b'
183            0x01,
184            0x02, // payload
185            0xDE,
186            0xAD,
187            0xBE,
188            0xEF, // extra packets in the stream
189        ];
190
191        let mut stream = BytesMut::from(&stream[..]);
192        let fixed_header = parse_fixed_header(stream.iter()).unwrap();
193        let publish_bytes = stream.split_to(fixed_header.frame_length()).freeze();
194        let packet = Publish::read(fixed_header, publish_bytes).unwrap();
195
196        assert_eq!(
197            packet,
198            Publish {
199                dup: false,
200                qos: QoS::AtMostOnce,
201                retain: false,
202                topic: "a/b".to_owned(),
203                pkid: 0,
204                payload: Bytes::from(&[0x01, 0x02][..]),
205            }
206        );
207    }
208
209    #[test]
210    fn qos1_publish_encoding_works() {
211        let publish = Publish {
212            dup: false,
213            qos: QoS::AtLeastOnce,
214            retain: false,
215            topic: "a/b".to_owned(),
216            pkid: 10,
217            payload: Bytes::from(vec![0xF1, 0xF2, 0xF3, 0xF4]),
218        };
219
220        let mut buf = BytesMut::new();
221        publish.write(&mut buf).unwrap();
222
223        assert_eq!(
224            buf,
225            vec![
226                0b0011_0010,
227                11,
228                0x00,
229                0x03,
230                b'a',
231                b'/',
232                b'b',
233                0x00,
234                0x0a,
235                0xF1,
236                0xF2,
237                0xF3,
238                0xF4
239            ]
240        );
241    }
242
243    #[test]
244    fn qos0_publish_encoding_works() {
245        let publish = Publish {
246            dup: false,
247            qos: QoS::AtMostOnce,
248            retain: false,
249            topic: "a/b".to_owned(),
250            pkid: 0,
251            payload: Bytes::from(vec![0xE1, 0xE2, 0xE3, 0xE4]),
252        };
253
254        let mut buf = BytesMut::new();
255        publish.write(&mut buf).unwrap();
256
257        assert_eq!(
258            buf,
259            vec![
260                0b0011_0000,
261                9,
262                0x00,
263                0x03,
264                b'a',
265                b'/',
266                b'b',
267                0xE1,
268                0xE2,
269                0xE3,
270                0xE4
271            ]
272        );
273    }
274}