rumqttc/mqttbytes/v4/
subscribe.rs

1use super::*;
2use bytes::{Buf, Bytes};
3
4/// Subscription packet
5#[derive(Clone, PartialEq, Eq)]
6pub struct Subscribe {
7    pub pkid: u16,
8    pub filters: Vec<SubscribeFilter>,
9}
10
11impl Subscribe {
12    pub fn new<S: Into<String>>(path: S, qos: QoS) -> Subscribe {
13        let filter = SubscribeFilter {
14            path: path.into(),
15            qos,
16        };
17
18        Subscribe {
19            pkid: 0,
20            filters: vec![filter],
21        }
22    }
23
24    pub fn new_many<T>(topics: T) -> Subscribe
25    where
26        T: IntoIterator<Item = SubscribeFilter>,
27    {
28        let filters: Vec<SubscribeFilter> = topics.into_iter().collect();
29
30        Subscribe { pkid: 0, filters }
31    }
32
33    pub fn add(&mut self, path: String, qos: QoS) -> &mut Self {
34        let filter = SubscribeFilter { path, qos };
35
36        self.filters.push(filter);
37        self
38    }
39
40    fn len(&self) -> usize {
41        // len of pkid + vec![subscribe filter len]
42        2 + self.filters.iter().fold(0, |s, t| s + t.len())
43    }
44
45    pub fn size(&self) -> usize {
46        let len = self.len();
47        let remaining_len_size = len_len(len);
48
49        1 + remaining_len_size + len
50    }
51
52    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
53        let variable_header_index = fixed_header.fixed_header_len;
54        bytes.advance(variable_header_index);
55
56        let pkid = read_u16(&mut bytes)?;
57
58        // variable header size = 2 (packet identifier)
59        let mut filters = Vec::new();
60
61        while bytes.has_remaining() {
62            let path = read_mqtt_string(&mut bytes)?;
63            let options = read_u8(&mut bytes)?;
64            let requested_qos = options & 0b0000_0011;
65
66            filters.push(SubscribeFilter {
67                path,
68                qos: qos(requested_qos)?,
69            });
70        }
71
72        match filters.len() {
73            0 => Err(Error::EmptySubscription),
74            _ => Ok(Subscribe { pkid, filters }),
75        }
76    }
77
78    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
79        // write packet type
80        buffer.put_u8(0x82);
81
82        // write remaining length
83        let remaining_len = self.len();
84        let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
85
86        // write packet id
87        buffer.put_u16(self.pkid);
88
89        // write filters
90        for filter in self.filters.iter() {
91            filter.write(buffer);
92        }
93
94        Ok(1 + remaining_len_bytes + remaining_len)
95    }
96}
97
98///  Subscription filter
99#[derive(Clone, PartialEq, Eq)]
100pub struct SubscribeFilter {
101    pub path: String,
102    pub qos: QoS,
103}
104
105impl SubscribeFilter {
106    pub fn new(path: String, qos: QoS) -> SubscribeFilter {
107        SubscribeFilter { path, qos }
108    }
109
110    fn len(&self) -> usize {
111        // filter len + filter + options
112        2 + self.path.len() + 1
113    }
114
115    fn write(&self, buffer: &mut BytesMut) {
116        let mut options = 0;
117        options |= self.qos as u8;
118
119        write_mqtt_string(buffer, self.path.as_str());
120        buffer.put_u8(options);
121    }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum RetainForwardRule {
126    OnEverySubscribe,
127    OnNewSubscribe,
128    Never,
129}
130
131impl fmt::Debug for Subscribe {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        write!(
134            f,
135            "Filters = {:?}, Packet id = {:?}",
136            self.filters, self.pkid
137        )
138    }
139}
140
141impl fmt::Debug for SubscribeFilter {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        write!(f, "Filter = {}, Qos = {:?}", self.path, self.qos)
144    }
145}
146
147#[cfg(test)]
148mod test {
149    use super::*;
150    use bytes::BytesMut;
151    use pretty_assertions::assert_eq;
152
153    #[test]
154    fn subscribe_parsing_works() {
155        let stream = &[
156            0b1000_0010,
157            20, // packet type, flags and remaining len
158            0x01,
159            0x04, // variable header. pkid = 260
160            0x00,
161            0x03,
162            b'a',
163            b'/',
164            b'+', // payload. topic filter = 'a/+'
165            0x00, // payload. qos = 0
166            0x00,
167            0x01,
168            b'#', // payload. topic filter = '#'
169            0x01, // payload. qos = 1
170            0x00,
171            0x05,
172            b'a',
173            b'/',
174            b'b',
175            b'/',
176            b'c', // payload. topic filter = 'a/b/c'
177            0x02, // payload. qos = 2
178            0xDE,
179            0xAD,
180            0xBE,
181            0xEF, // extra packets in the stream
182        ];
183        let mut stream = BytesMut::from(&stream[..]);
184        let fixed_header = parse_fixed_header(stream.iter()).unwrap();
185        let subscribe_bytes = stream.split_to(fixed_header.frame_length()).freeze();
186        let packet = Subscribe::read(fixed_header, subscribe_bytes).unwrap();
187
188        assert_eq!(
189            packet,
190            Subscribe {
191                pkid: 260,
192                filters: vec![
193                    SubscribeFilter::new("a/+".to_owned(), QoS::AtMostOnce),
194                    SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce),
195                    SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce)
196                ],
197            }
198        );
199    }
200
201    #[test]
202    fn subscribe_encoding_works() {
203        let subscribe = Subscribe {
204            pkid: 260,
205            filters: vec![
206                SubscribeFilter::new("a/+".to_owned(), QoS::AtMostOnce),
207                SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce),
208                SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce),
209            ],
210        };
211
212        let mut buf = BytesMut::new();
213        subscribe.write(&mut buf).unwrap();
214        assert_eq!(
215            buf,
216            vec![
217                0b1000_0010,
218                20,
219                0x01,
220                0x04, // pkid = 260
221                0x00,
222                0x03,
223                b'a',
224                b'/',
225                b'+', // topic filter = 'a/+'
226                0x00, // qos = 0
227                0x00,
228                0x01,
229                b'#', // topic filter = '#'
230                0x01, // qos = 1
231                0x00,
232                0x05,
233                b'a',
234                b'/',
235                b'b',
236                b'/',
237                b'c', // topic filter = 'a/b/c'
238                0x02  // qos = 2
239            ]
240        );
241    }
242}