1use super::*;
2use bytes::{Buf, Bytes};
3
4#[derive(Clone, PartialEq, Eq)]
6pub struct Subscribe {
7 pub pkid: u16,
8 pub filters: Vec<SubscribeFilter>,
9}
10
11impl Subscribe {
12 pub fn new<S: Into<String>>(path: S, qos: QoS) -> Subscribe {
13 let filter = SubscribeFilter {
14 path: path.into(),
15 qos,
16 };
17
18 Subscribe {
19 pkid: 0,
20 filters: vec![filter],
21 }
22 }
23
24 pub fn new_many<T>(topics: T) -> Subscribe
25 where
26 T: IntoIterator<Item = SubscribeFilter>,
27 {
28 let filters: Vec<SubscribeFilter> = topics.into_iter().collect();
29
30 Subscribe { pkid: 0, filters }
31 }
32
33 pub fn add(&mut self, path: String, qos: QoS) -> &mut Self {
34 let filter = SubscribeFilter { path, qos };
35
36 self.filters.push(filter);
37 self
38 }
39
40 fn len(&self) -> usize {
41 2 + self.filters.iter().fold(0, |s, t| s + t.len())
43 }
44
45 pub fn size(&self) -> usize {
46 let len = self.len();
47 let remaining_len_size = len_len(len);
48
49 1 + remaining_len_size + len
50 }
51
52 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
53 let variable_header_index = fixed_header.fixed_header_len;
54 bytes.advance(variable_header_index);
55
56 let pkid = read_u16(&mut bytes)?;
57
58 let mut filters = Vec::new();
60
61 while bytes.has_remaining() {
62 let path = read_mqtt_string(&mut bytes)?;
63 let options = read_u8(&mut bytes)?;
64 let requested_qos = options & 0b0000_0011;
65
66 filters.push(SubscribeFilter {
67 path,
68 qos: qos(requested_qos)?,
69 });
70 }
71
72 match filters.len() {
73 0 => Err(Error::EmptySubscription),
74 _ => Ok(Subscribe { pkid, filters }),
75 }
76 }
77
78 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
79 buffer.put_u8(0x82);
81
82 let remaining_len = self.len();
84 let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
85
86 buffer.put_u16(self.pkid);
88
89 for filter in self.filters.iter() {
91 filter.write(buffer);
92 }
93
94 Ok(1 + remaining_len_bytes + remaining_len)
95 }
96}
97
98#[derive(Clone, PartialEq, Eq)]
100pub struct SubscribeFilter {
101 pub path: String,
102 pub qos: QoS,
103}
104
105impl SubscribeFilter {
106 pub fn new(path: String, qos: QoS) -> SubscribeFilter {
107 SubscribeFilter { path, qos }
108 }
109
110 fn len(&self) -> usize {
111 2 + self.path.len() + 1
113 }
114
115 fn write(&self, buffer: &mut BytesMut) {
116 let mut options = 0;
117 options |= self.qos as u8;
118
119 write_mqtt_string(buffer, self.path.as_str());
120 buffer.put_u8(options);
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum RetainForwardRule {
126 OnEverySubscribe,
127 OnNewSubscribe,
128 Never,
129}
130
131impl fmt::Debug for Subscribe {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 write!(
134 f,
135 "Filters = {:?}, Packet id = {:?}",
136 self.filters, self.pkid
137 )
138 }
139}
140
141impl fmt::Debug for SubscribeFilter {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 write!(f, "Filter = {}, Qos = {:?}", self.path, self.qos)
144 }
145}
146
147#[cfg(test)]
148mod test {
149 use super::*;
150 use bytes::BytesMut;
151 use pretty_assertions::assert_eq;
152
153 #[test]
154 fn subscribe_parsing_works() {
155 let stream = &[
156 0b1000_0010,
157 20, 0x01,
159 0x04, 0x00,
161 0x03,
162 b'a',
163 b'/',
164 b'+', 0x00, 0x00,
167 0x01,
168 b'#', 0x01, 0x00,
171 0x05,
172 b'a',
173 b'/',
174 b'b',
175 b'/',
176 b'c', 0x02, 0xDE,
179 0xAD,
180 0xBE,
181 0xEF, ];
183 let mut stream = BytesMut::from(&stream[..]);
184 let fixed_header = parse_fixed_header(stream.iter()).unwrap();
185 let subscribe_bytes = stream.split_to(fixed_header.frame_length()).freeze();
186 let packet = Subscribe::read(fixed_header, subscribe_bytes).unwrap();
187
188 assert_eq!(
189 packet,
190 Subscribe {
191 pkid: 260,
192 filters: vec![
193 SubscribeFilter::new("a/+".to_owned(), QoS::AtMostOnce),
194 SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce),
195 SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce)
196 ],
197 }
198 );
199 }
200
201 #[test]
202 fn subscribe_encoding_works() {
203 let subscribe = Subscribe {
204 pkid: 260,
205 filters: vec![
206 SubscribeFilter::new("a/+".to_owned(), QoS::AtMostOnce),
207 SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce),
208 SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce),
209 ],
210 };
211
212 let mut buf = BytesMut::new();
213 subscribe.write(&mut buf).unwrap();
214 assert_eq!(
215 buf,
216 vec![
217 0b1000_0010,
218 20,
219 0x01,
220 0x04, 0x00,
222 0x03,
223 b'a',
224 b'/',
225 b'+', 0x00, 0x00,
228 0x01,
229 b'#', 0x01, 0x00,
232 0x05,
233 b'a',
234 b'/',
235 b'b',
236 b'/',
237 b'c', 0x02 ]
240 );
241 }
242}