1use super::*;
2use bytes::{Buf, Bytes};
3
4#[derive(Clone, PartialEq, Eq)]
6pub struct Publish {
7 pub dup: bool,
8 pub qos: QoS,
9 pub retain: bool,
10 pub topic: String,
11 pub pkid: u16,
12 pub payload: Bytes,
13}
14
15impl Publish {
16 pub fn new<S: Into<String>, P: Into<Vec<u8>>>(topic: S, qos: QoS, payload: P) -> Publish {
17 Publish {
18 dup: false,
19 qos,
20 retain: false,
21 pkid: 0,
22 topic: topic.into(),
23 payload: Bytes::from(payload.into()),
24 }
25 }
26
27 pub fn from_bytes<S: Into<String>>(topic: S, qos: QoS, payload: Bytes) -> Publish {
28 Publish {
29 dup: false,
30 qos,
31 retain: false,
32 pkid: 0,
33 topic: topic.into(),
34 payload,
35 }
36 }
37
38 fn len(&self) -> usize {
39 let len = 2 + self.topic.len() + self.payload.len();
40 if self.qos != QoS::AtMostOnce && self.pkid != 0 {
41 len + 2
42 } else {
43 len
44 }
45 }
46
47 pub fn size(&self) -> usize {
48 let len = self.len();
49 let remaining_len_size = len_len(len);
50
51 1 + remaining_len_size + len
52 }
53
54 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
55 let qos = qos((fixed_header.byte1 & 0b0110) >> 1)?;
56 let dup = (fixed_header.byte1 & 0b1000) != 0;
57 let retain = (fixed_header.byte1 & 0b0001) != 0;
58
59 let variable_header_index = fixed_header.fixed_header_len;
60 bytes.advance(variable_header_index);
61 let topic = read_mqtt_string(&mut bytes)?;
62
63 let pkid = match qos {
65 QoS::AtMostOnce => 0,
66 QoS::AtLeastOnce | QoS::ExactlyOnce => read_u16(&mut bytes)?,
67 };
68
69 if qos != QoS::AtMostOnce && pkid == 0 {
70 return Err(Error::PacketIdZero);
71 }
72
73 let publish = Publish {
74 dup,
75 retain,
76 qos,
77 pkid,
78 topic,
79 payload: bytes,
80 };
81
82 Ok(publish)
83 }
84
85 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
86 let len = self.len();
87
88 let dup = self.dup as u8;
89 let qos = self.qos as u8;
90 let retain = self.retain as u8;
91 buffer.put_u8(0b0011_0000 | retain | qos << 1 | dup << 3);
92
93 let count = write_remaining_length(buffer, len)?;
94 write_mqtt_string(buffer, self.topic.as_str());
95
96 if self.qos != QoS::AtMostOnce {
97 let pkid = self.pkid;
98 if pkid == 0 {
99 return Err(Error::PacketIdZero);
100 }
101
102 buffer.put_u16(pkid);
103 }
104
105 buffer.extend_from_slice(&self.payload);
106
107 Ok(1 + count + len)
109 }
110}
111
112impl fmt::Debug for Publish {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 write!(
115 f,
116 "Topic = {}, Qos = {:?}, Retain = {}, Pkid = {:?}, Payload Size = {}",
117 self.topic,
118 self.qos,
119 self.retain,
120 self.pkid,
121 self.payload.len()
122 )
123 }
124}
125
126#[cfg(test)]
127mod test {
128 use super::*;
129 use bytes::{Bytes, BytesMut};
130 use pretty_assertions::assert_eq;
131
132 #[test]
133 fn qos1_publish_parsing_works() {
134 let stream = &[
135 0b0011_0010,
136 11, 0x00,
138 0x03,
139 b'a',
140 b'/',
141 b'b', 0x00,
143 0x0a, 0xF1,
145 0xF2,
146 0xF3,
147 0xF4, 0xDE,
149 0xAD,
150 0xBE,
151 0xEF, ];
153
154 let mut stream = BytesMut::from(&stream[..]);
155 let fixed_header = parse_fixed_header(stream.iter()).unwrap();
156 let publish_bytes = stream.split_to(fixed_header.frame_length()).freeze();
157 let packet = Publish::read(fixed_header, publish_bytes).unwrap();
158
159 let payload = &[0xF1, 0xF2, 0xF3, 0xF4];
160 assert_eq!(
161 packet,
162 Publish {
163 dup: false,
164 qos: QoS::AtLeastOnce,
165 retain: false,
166 topic: "a/b".to_owned(),
167 pkid: 10,
168 payload: Bytes::from(&payload[..]),
169 }
170 );
171 }
172
173 #[test]
174 fn qos0_publish_parsing_works() {
175 let stream = &[
176 0b0011_0000,
177 7, 0x00,
179 0x03,
180 b'a',
181 b'/',
182 b'b', 0x01,
184 0x02, 0xDE,
186 0xAD,
187 0xBE,
188 0xEF, ];
190
191 let mut stream = BytesMut::from(&stream[..]);
192 let fixed_header = parse_fixed_header(stream.iter()).unwrap();
193 let publish_bytes = stream.split_to(fixed_header.frame_length()).freeze();
194 let packet = Publish::read(fixed_header, publish_bytes).unwrap();
195
196 assert_eq!(
197 packet,
198 Publish {
199 dup: false,
200 qos: QoS::AtMostOnce,
201 retain: false,
202 topic: "a/b".to_owned(),
203 pkid: 0,
204 payload: Bytes::from(&[0x01, 0x02][..]),
205 }
206 );
207 }
208
209 #[test]
210 fn qos1_publish_encoding_works() {
211 let publish = Publish {
212 dup: false,
213 qos: QoS::AtLeastOnce,
214 retain: false,
215 topic: "a/b".to_owned(),
216 pkid: 10,
217 payload: Bytes::from(vec![0xF1, 0xF2, 0xF3, 0xF4]),
218 };
219
220 let mut buf = BytesMut::new();
221 publish.write(&mut buf).unwrap();
222
223 assert_eq!(
224 buf,
225 vec![
226 0b0011_0010,
227 11,
228 0x00,
229 0x03,
230 b'a',
231 b'/',
232 b'b',
233 0x00,
234 0x0a,
235 0xF1,
236 0xF2,
237 0xF3,
238 0xF4
239 ]
240 );
241 }
242
243 #[test]
244 fn qos0_publish_encoding_works() {
245 let publish = Publish {
246 dup: false,
247 qos: QoS::AtMostOnce,
248 retain: false,
249 topic: "a/b".to_owned(),
250 pkid: 0,
251 payload: Bytes::from(vec![0xE1, 0xE2, 0xE3, 0xE4]),
252 };
253
254 let mut buf = BytesMut::new();
255 publish.write(&mut buf).unwrap();
256
257 assert_eq!(
258 buf,
259 vec![
260 0b0011_0000,
261 9,
262 0x00,
263 0x03,
264 b'a',
265 b'/',
266 b'b',
267 0xE1,
268 0xE2,
269 0xE3,
270 0xE4
271 ]
272 );
273 }
274}