rumqttc/v5/mqttbytes/v5/
connack.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Return code in connack
5// This contains return codes for both MQTT v311 and v5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ConnectReturnCode {
8    Success,
9    RefusedProtocolVersion,
10    BadClientId,
11    ServiceUnavailable,
12    UnspecifiedError,
13    MalformedPacket,
14    ProtocolError,
15    ImplementationSpecificError,
16    UnsupportedProtocolVersion,
17    ClientIdentifierNotValid,
18    BadUserNamePassword,
19    NotAuthorized,
20    ServerUnavailable,
21    ServerBusy,
22    Banned,
23    BadAuthenticationMethod,
24    TopicNameInvalid,
25    PacketTooLarge,
26    QuotaExceeded,
27    PayloadFormatInvalid,
28    RetainNotSupported,
29    QoSNotSupported,
30    UseAnotherServer,
31    ServerMoved,
32    ConnectionRateExceeded,
33}
34
35/// Acknowledgement to connect packet
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct ConnAck {
38    pub session_present: bool,
39    pub code: ConnectReturnCode,
40    pub properties: Option<ConnAckProperties>,
41}
42
43impl ConnAck {
44    fn len(&self) -> usize {
45        let mut len = 1  // session present
46                    + 1; // code
47
48        if let Some(p) = &self.properties {
49            let properties_len = p.len();
50            let properties_len_len = len_len(properties_len);
51            len += properties_len_len + properties_len;
52        } else {
53            len += 1;
54        }
55
56        len
57    }
58
59    pub fn size(&self) -> usize {
60        let len = self.len();
61        let remaining_len_size = len_len(len);
62
63        1 + remaining_len_size + len
64    }
65
66    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<ConnAck, Error> {
67        let variable_header_index = fixed_header.fixed_header_len;
68        bytes.advance(variable_header_index);
69
70        let flags = read_u8(&mut bytes)?;
71        let return_code = read_u8(&mut bytes)?;
72        let properties = ConnAckProperties::read(&mut bytes)?;
73
74        let session_present = (flags & 0x01) == 1;
75        let code = connect_return(return_code)?;
76        let connack = ConnAck {
77            session_present,
78            code,
79            properties,
80        };
81
82        Ok(connack)
83    }
84
85    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
86        let len = Self::len(self);
87        buffer.put_u8(0x20);
88
89        let count = write_remaining_length(buffer, len)?;
90        buffer.put_u8(self.session_present as u8);
91        buffer.put_u8(connect_code(self.code));
92
93        if let Some(p) = &self.properties {
94            p.write(buffer)?;
95        } else {
96            write_remaining_length(buffer, 0)?;
97        }
98
99        Ok(1 + count + len)
100    }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct ConnAckProperties {
105    pub session_expiry_interval: Option<u32>,
106    pub receive_max: Option<u16>,
107    pub max_qos: Option<u8>,
108    pub retain_available: Option<u8>,
109    pub max_packet_size: Option<u32>,
110    pub assigned_client_identifier: Option<String>,
111    pub topic_alias_max: Option<u16>,
112    pub reason_string: Option<String>,
113    pub user_properties: Vec<(String, String)>,
114    pub wildcard_subscription_available: Option<u8>,
115    pub subscription_identifiers_available: Option<u8>,
116    pub shared_subscription_available: Option<u8>,
117    pub server_keep_alive: Option<u16>,
118    pub response_information: Option<String>,
119    pub server_reference: Option<String>,
120    pub authentication_method: Option<String>,
121    pub authentication_data: Option<Bytes>,
122}
123
124impl ConnAckProperties {
125    fn len(&self) -> usize {
126        let mut len = 0;
127
128        if self.session_expiry_interval.is_some() {
129            len += 1 + 4;
130        }
131
132        if self.receive_max.is_some() {
133            len += 1 + 2;
134        }
135
136        if self.max_qos.is_some() {
137            len += 1 + 1;
138        }
139
140        if self.retain_available.is_some() {
141            len += 1 + 1;
142        }
143
144        if self.max_packet_size.is_some() {
145            len += 1 + 4;
146        }
147
148        if let Some(id) = &self.assigned_client_identifier {
149            len += 1 + 2 + id.len();
150        }
151
152        if self.topic_alias_max.is_some() {
153            len += 1 + 2;
154        }
155
156        if let Some(reason) = &self.reason_string {
157            len += 1 + 2 + reason.len();
158        }
159
160        for (key, value) in self.user_properties.iter() {
161            len += 1 + 2 + key.len() + 2 + value.len();
162        }
163
164        if self.wildcard_subscription_available.is_some() {
165            len += 1 + 1;
166        }
167
168        if self.subscription_identifiers_available.is_some() {
169            len += 1 + 1;
170        }
171
172        if self.shared_subscription_available.is_some() {
173            len += 1 + 1;
174        }
175
176        if self.server_keep_alive.is_some() {
177            len += 1 + 2;
178        }
179
180        if let Some(info) = &self.response_information {
181            len += 1 + 2 + info.len();
182        }
183
184        if let Some(reference) = &self.server_reference {
185            len += 1 + 2 + reference.len();
186        }
187
188        if let Some(authentication_method) = &self.authentication_method {
189            len += 1 + 2 + authentication_method.len();
190        }
191
192        if let Some(authentication_data) = &self.authentication_data {
193            len += 1 + 2 + authentication_data.len();
194        }
195
196        len
197    }
198
199    pub fn read(bytes: &mut Bytes) -> Result<Option<ConnAckProperties>, Error> {
200        let mut session_expiry_interval = None;
201        let mut receive_max = None;
202        let mut max_qos = None;
203        let mut retain_available = None;
204        let mut max_packet_size = None;
205        let mut assigned_client_identifier = None;
206        let mut topic_alias_max = None;
207        let mut reason_string = None;
208        let mut user_properties = Vec::new();
209        let mut wildcard_subscription_available = None;
210        let mut subscription_identifiers_available = None;
211        let mut shared_subscription_available = None;
212        let mut server_keep_alive = None;
213        let mut response_information = None;
214        let mut server_reference = None;
215        let mut authentication_method = None;
216        let mut authentication_data = None;
217
218        let (properties_len_len, properties_len) = length(bytes.iter())?;
219        bytes.advance(properties_len_len);
220        if properties_len == 0 {
221            return Ok(None);
222        }
223
224        let mut cursor = 0;
225        // read until cursor reaches property length. properties_len = 0 will skip this loop
226        while cursor < properties_len {
227            let prop = read_u8(bytes)?;
228            cursor += 1;
229
230            match property(prop)? {
231                PropertyType::SessionExpiryInterval => {
232                    session_expiry_interval = Some(read_u32(bytes)?);
233                    cursor += 4;
234                }
235                PropertyType::ReceiveMaximum => {
236                    receive_max = Some(read_u16(bytes)?);
237                    cursor += 2;
238                }
239                PropertyType::MaximumQos => {
240                    max_qos = Some(read_u8(bytes)?);
241                    cursor += 1;
242                }
243                PropertyType::RetainAvailable => {
244                    retain_available = Some(read_u8(bytes)?);
245                    cursor += 1;
246                }
247                PropertyType::AssignedClientIdentifier => {
248                    let id = read_mqtt_string(bytes)?;
249                    cursor += 2 + id.len();
250                    assigned_client_identifier = Some(id);
251                }
252                PropertyType::MaximumPacketSize => {
253                    max_packet_size = Some(read_u32(bytes)?);
254                    cursor += 4;
255                }
256                PropertyType::TopicAliasMaximum => {
257                    topic_alias_max = Some(read_u16(bytes)?);
258                    cursor += 2;
259                }
260                PropertyType::ReasonString => {
261                    let reason = read_mqtt_string(bytes)?;
262                    cursor += 2 + reason.len();
263                    reason_string = Some(reason);
264                }
265                PropertyType::UserProperty => {
266                    let key = read_mqtt_string(bytes)?;
267                    let value = read_mqtt_string(bytes)?;
268                    cursor += 2 + key.len() + 2 + value.len();
269                    user_properties.push((key, value));
270                }
271                PropertyType::WildcardSubscriptionAvailable => {
272                    wildcard_subscription_available = Some(read_u8(bytes)?);
273                    cursor += 1;
274                }
275                PropertyType::SubscriptionIdentifierAvailable => {
276                    subscription_identifiers_available = Some(read_u8(bytes)?);
277                    cursor += 1;
278                }
279                PropertyType::SharedSubscriptionAvailable => {
280                    shared_subscription_available = Some(read_u8(bytes)?);
281                    cursor += 1;
282                }
283                PropertyType::ServerKeepAlive => {
284                    server_keep_alive = Some(read_u16(bytes)?);
285                    cursor += 2;
286                }
287                PropertyType::ResponseInformation => {
288                    let info = read_mqtt_string(bytes)?;
289                    cursor += 2 + info.len();
290                    response_information = Some(info);
291                }
292                PropertyType::ServerReference => {
293                    let reference = read_mqtt_string(bytes)?;
294                    cursor += 2 + reference.len();
295                    server_reference = Some(reference);
296                }
297                PropertyType::AuthenticationMethod => {
298                    let method = read_mqtt_string(bytes)?;
299                    cursor += 2 + method.len();
300                    authentication_method = Some(method);
301                }
302                PropertyType::AuthenticationData => {
303                    let data = read_mqtt_bytes(bytes)?;
304                    cursor += 2 + data.len();
305                    authentication_data = Some(data);
306                }
307                _ => return Err(Error::InvalidPropertyType(prop)),
308            }
309        }
310
311        Ok(Some(ConnAckProperties {
312            session_expiry_interval,
313            receive_max,
314            max_qos,
315            retain_available,
316            max_packet_size,
317            assigned_client_identifier,
318            topic_alias_max,
319            reason_string,
320            user_properties,
321            wildcard_subscription_available,
322            subscription_identifiers_available,
323            shared_subscription_available,
324            server_keep_alive,
325            response_information,
326            server_reference,
327            authentication_method,
328            authentication_data,
329        }))
330    }
331
332    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
333        let len = self.len();
334        write_remaining_length(buffer, len)?;
335
336        if let Some(session_expiry_interval) = self.session_expiry_interval {
337            buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
338            buffer.put_u32(session_expiry_interval);
339        }
340
341        if let Some(receive_maximum) = self.receive_max {
342            buffer.put_u8(PropertyType::ReceiveMaximum as u8);
343            buffer.put_u16(receive_maximum);
344        }
345
346        if let Some(qos) = self.max_qos {
347            buffer.put_u8(PropertyType::MaximumQos as u8);
348            buffer.put_u8(qos);
349        }
350
351        if let Some(retain_available) = self.retain_available {
352            buffer.put_u8(PropertyType::RetainAvailable as u8);
353            buffer.put_u8(retain_available);
354        }
355
356        if let Some(max_packet_size) = self.max_packet_size {
357            buffer.put_u8(PropertyType::MaximumPacketSize as u8);
358            buffer.put_u32(max_packet_size);
359        }
360
361        if let Some(id) = &self.assigned_client_identifier {
362            buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
363            write_mqtt_string(buffer, id);
364        }
365
366        if let Some(topic_alias_max) = self.topic_alias_max {
367            buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
368            buffer.put_u16(topic_alias_max);
369        }
370
371        if let Some(reason) = &self.reason_string {
372            buffer.put_u8(PropertyType::ReasonString as u8);
373            write_mqtt_string(buffer, reason);
374        }
375
376        for (key, value) in self.user_properties.iter() {
377            buffer.put_u8(PropertyType::UserProperty as u8);
378            write_mqtt_string(buffer, key);
379            write_mqtt_string(buffer, value);
380        }
381
382        if let Some(w) = self.wildcard_subscription_available {
383            buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
384            buffer.put_u8(w);
385        }
386
387        if let Some(s) = self.subscription_identifiers_available {
388            buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
389            buffer.put_u8(s);
390        }
391
392        if let Some(s) = self.shared_subscription_available {
393            buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
394            buffer.put_u8(s);
395        }
396
397        if let Some(keep_alive) = self.server_keep_alive {
398            buffer.put_u8(PropertyType::ServerKeepAlive as u8);
399            buffer.put_u16(keep_alive);
400        }
401
402        if let Some(info) = &self.response_information {
403            buffer.put_u8(PropertyType::ResponseInformation as u8);
404            write_mqtt_string(buffer, info);
405        }
406
407        if let Some(reference) = &self.server_reference {
408            buffer.put_u8(PropertyType::ServerReference as u8);
409            write_mqtt_string(buffer, reference);
410        }
411
412        if let Some(authentication_method) = &self.authentication_method {
413            buffer.put_u8(PropertyType::AuthenticationMethod as u8);
414            write_mqtt_string(buffer, authentication_method);
415        }
416
417        if let Some(authentication_data) = &self.authentication_data {
418            buffer.put_u8(PropertyType::AuthenticationData as u8);
419            write_mqtt_bytes(buffer, authentication_data);
420        }
421
422        Ok(())
423    }
424}
425
426/// Connection return code type
427fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
428    let code = match num {
429        0 => ConnectReturnCode::Success,
430        128 => ConnectReturnCode::UnspecifiedError,
431        129 => ConnectReturnCode::MalformedPacket,
432        130 => ConnectReturnCode::ProtocolError,
433        131 => ConnectReturnCode::ImplementationSpecificError,
434        132 => ConnectReturnCode::UnsupportedProtocolVersion,
435        133 => ConnectReturnCode::ClientIdentifierNotValid,
436        134 => ConnectReturnCode::BadUserNamePassword,
437        135 => ConnectReturnCode::NotAuthorized,
438        136 => ConnectReturnCode::ServerUnavailable,
439        137 => ConnectReturnCode::ServerBusy,
440        138 => ConnectReturnCode::Banned,
441        140 => ConnectReturnCode::BadAuthenticationMethod,
442        144 => ConnectReturnCode::TopicNameInvalid,
443        149 => ConnectReturnCode::PacketTooLarge,
444        151 => ConnectReturnCode::QuotaExceeded,
445        153 => ConnectReturnCode::PayloadFormatInvalid,
446        154 => ConnectReturnCode::RetainNotSupported,
447        155 => ConnectReturnCode::QoSNotSupported,
448        156 => ConnectReturnCode::UseAnotherServer,
449        157 => ConnectReturnCode::ServerMoved,
450        159 => ConnectReturnCode::ConnectionRateExceeded,
451        num => return Err(Error::InvalidConnectReturnCode(num)),
452    };
453
454    Ok(code)
455}
456
457fn connect_code(return_code: ConnectReturnCode) -> u8 {
458    match return_code {
459        ConnectReturnCode::Success => 0,
460        ConnectReturnCode::UnspecifiedError => 128,
461        ConnectReturnCode::MalformedPacket => 129,
462        ConnectReturnCode::ProtocolError => 130,
463        ConnectReturnCode::ImplementationSpecificError => 131,
464        ConnectReturnCode::UnsupportedProtocolVersion => 132,
465        ConnectReturnCode::ClientIdentifierNotValid => 133,
466        ConnectReturnCode::BadUserNamePassword => 134,
467        ConnectReturnCode::NotAuthorized => 135,
468        ConnectReturnCode::ServerUnavailable => 136,
469        ConnectReturnCode::ServerBusy => 137,
470        ConnectReturnCode::Banned => 138,
471        ConnectReturnCode::BadAuthenticationMethod => 140,
472        ConnectReturnCode::TopicNameInvalid => 144,
473        ConnectReturnCode::PacketTooLarge => 149,
474        ConnectReturnCode::QuotaExceeded => 151,
475        ConnectReturnCode::PayloadFormatInvalid => 153,
476        ConnectReturnCode::RetainNotSupported => 154,
477        ConnectReturnCode::QoSNotSupported => 155,
478        ConnectReturnCode::UseAnotherServer => 156,
479        ConnectReturnCode::ServerMoved => 157,
480        ConnectReturnCode::ConnectionRateExceeded => 159,
481        _ => unreachable!(),
482    }
483}
484
485#[cfg(test)]
486mod test {
487    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
488    use super::*;
489    use bytes::BytesMut;
490    use pretty_assertions::assert_eq;
491
492    #[test]
493    fn length_calculation() {
494        let mut dummy_bytes = BytesMut::new();
495        // Use user_properties to pad the size to exceed ~128 bytes to make the
496        // remaining_length field in the packet be 2 bytes long.
497        let connack_props = ConnAckProperties {
498            session_expiry_interval: None,
499            receive_max: None,
500            max_qos: None,
501            retain_available: None,
502            max_packet_size: None,
503            assigned_client_identifier: None,
504            topic_alias_max: None,
505            reason_string: None,
506            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
507            wildcard_subscription_available: None,
508            subscription_identifiers_available: None,
509            shared_subscription_available: None,
510            server_keep_alive: None,
511            response_information: None,
512            server_reference: None,
513            authentication_method: None,
514            authentication_data: None,
515        };
516
517        let connack_pkt = ConnAck {
518            session_present: false,
519            code: ConnectReturnCode::Success,
520            properties: Some(connack_props),
521        };
522
523        let size_from_size = connack_pkt.size();
524        let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
525        let size_from_bytes = dummy_bytes.len();
526
527        assert_eq!(size_from_write, size_from_bytes);
528        assert_eq!(size_from_size, size_from_bytes);
529    }
530}