1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum ConnectReturnCode {
8 Success,
9 RefusedProtocolVersion,
10 BadClientId,
11 ServiceUnavailable,
12 UnspecifiedError,
13 MalformedPacket,
14 ProtocolError,
15 ImplementationSpecificError,
16 UnsupportedProtocolVersion,
17 ClientIdentifierNotValid,
18 BadUserNamePassword,
19 NotAuthorized,
20 ServerUnavailable,
21 ServerBusy,
22 Banned,
23 BadAuthenticationMethod,
24 TopicNameInvalid,
25 PacketTooLarge,
26 QuotaExceeded,
27 PayloadFormatInvalid,
28 RetainNotSupported,
29 QoSNotSupported,
30 UseAnotherServer,
31 ServerMoved,
32 ConnectionRateExceeded,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct ConnAck {
38 pub session_present: bool,
39 pub code: ConnectReturnCode,
40 pub properties: Option<ConnAckProperties>,
41}
42
43impl ConnAck {
44 fn len(&self) -> usize {
45 let mut len = 1 + 1; if let Some(p) = &self.properties {
49 let properties_len = p.len();
50 let properties_len_len = len_len(properties_len);
51 len += properties_len_len + properties_len;
52 } else {
53 len += 1;
54 }
55
56 len
57 }
58
59 pub fn size(&self) -> usize {
60 let len = self.len();
61 let remaining_len_size = len_len(len);
62
63 1 + remaining_len_size + len
64 }
65
66 pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<ConnAck, Error> {
67 let variable_header_index = fixed_header.fixed_header_len;
68 bytes.advance(variable_header_index);
69
70 let flags = read_u8(&mut bytes)?;
71 let return_code = read_u8(&mut bytes)?;
72 let properties = ConnAckProperties::read(&mut bytes)?;
73
74 let session_present = (flags & 0x01) == 1;
75 let code = connect_return(return_code)?;
76 let connack = ConnAck {
77 session_present,
78 code,
79 properties,
80 };
81
82 Ok(connack)
83 }
84
85 pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
86 let len = Self::len(self);
87 buffer.put_u8(0x20);
88
89 let count = write_remaining_length(buffer, len)?;
90 buffer.put_u8(self.session_present as u8);
91 buffer.put_u8(connect_code(self.code));
92
93 if let Some(p) = &self.properties {
94 p.write(buffer)?;
95 } else {
96 write_remaining_length(buffer, 0)?;
97 }
98
99 Ok(1 + count + len)
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct ConnAckProperties {
105 pub session_expiry_interval: Option<u32>,
106 pub receive_max: Option<u16>,
107 pub max_qos: Option<u8>,
108 pub retain_available: Option<u8>,
109 pub max_packet_size: Option<u32>,
110 pub assigned_client_identifier: Option<String>,
111 pub topic_alias_max: Option<u16>,
112 pub reason_string: Option<String>,
113 pub user_properties: Vec<(String, String)>,
114 pub wildcard_subscription_available: Option<u8>,
115 pub subscription_identifiers_available: Option<u8>,
116 pub shared_subscription_available: Option<u8>,
117 pub server_keep_alive: Option<u16>,
118 pub response_information: Option<String>,
119 pub server_reference: Option<String>,
120 pub authentication_method: Option<String>,
121 pub authentication_data: Option<Bytes>,
122}
123
124impl ConnAckProperties {
125 fn len(&self) -> usize {
126 let mut len = 0;
127
128 if self.session_expiry_interval.is_some() {
129 len += 1 + 4;
130 }
131
132 if self.receive_max.is_some() {
133 len += 1 + 2;
134 }
135
136 if self.max_qos.is_some() {
137 len += 1 + 1;
138 }
139
140 if self.retain_available.is_some() {
141 len += 1 + 1;
142 }
143
144 if self.max_packet_size.is_some() {
145 len += 1 + 4;
146 }
147
148 if let Some(id) = &self.assigned_client_identifier {
149 len += 1 + 2 + id.len();
150 }
151
152 if self.topic_alias_max.is_some() {
153 len += 1 + 2;
154 }
155
156 if let Some(reason) = &self.reason_string {
157 len += 1 + 2 + reason.len();
158 }
159
160 for (key, value) in self.user_properties.iter() {
161 len += 1 + 2 + key.len() + 2 + value.len();
162 }
163
164 if self.wildcard_subscription_available.is_some() {
165 len += 1 + 1;
166 }
167
168 if self.subscription_identifiers_available.is_some() {
169 len += 1 + 1;
170 }
171
172 if self.shared_subscription_available.is_some() {
173 len += 1 + 1;
174 }
175
176 if self.server_keep_alive.is_some() {
177 len += 1 + 2;
178 }
179
180 if let Some(info) = &self.response_information {
181 len += 1 + 2 + info.len();
182 }
183
184 if let Some(reference) = &self.server_reference {
185 len += 1 + 2 + reference.len();
186 }
187
188 if let Some(authentication_method) = &self.authentication_method {
189 len += 1 + 2 + authentication_method.len();
190 }
191
192 if let Some(authentication_data) = &self.authentication_data {
193 len += 1 + 2 + authentication_data.len();
194 }
195
196 len
197 }
198
199 pub fn read(bytes: &mut Bytes) -> Result<Option<ConnAckProperties>, Error> {
200 let mut session_expiry_interval = None;
201 let mut receive_max = None;
202 let mut max_qos = None;
203 let mut retain_available = None;
204 let mut max_packet_size = None;
205 let mut assigned_client_identifier = None;
206 let mut topic_alias_max = None;
207 let mut reason_string = None;
208 let mut user_properties = Vec::new();
209 let mut wildcard_subscription_available = None;
210 let mut subscription_identifiers_available = None;
211 let mut shared_subscription_available = None;
212 let mut server_keep_alive = None;
213 let mut response_information = None;
214 let mut server_reference = None;
215 let mut authentication_method = None;
216 let mut authentication_data = None;
217
218 let (properties_len_len, properties_len) = length(bytes.iter())?;
219 bytes.advance(properties_len_len);
220 if properties_len == 0 {
221 return Ok(None);
222 }
223
224 let mut cursor = 0;
225 while cursor < properties_len {
227 let prop = read_u8(bytes)?;
228 cursor += 1;
229
230 match property(prop)? {
231 PropertyType::SessionExpiryInterval => {
232 session_expiry_interval = Some(read_u32(bytes)?);
233 cursor += 4;
234 }
235 PropertyType::ReceiveMaximum => {
236 receive_max = Some(read_u16(bytes)?);
237 cursor += 2;
238 }
239 PropertyType::MaximumQos => {
240 max_qos = Some(read_u8(bytes)?);
241 cursor += 1;
242 }
243 PropertyType::RetainAvailable => {
244 retain_available = Some(read_u8(bytes)?);
245 cursor += 1;
246 }
247 PropertyType::AssignedClientIdentifier => {
248 let id = read_mqtt_string(bytes)?;
249 cursor += 2 + id.len();
250 assigned_client_identifier = Some(id);
251 }
252 PropertyType::MaximumPacketSize => {
253 max_packet_size = Some(read_u32(bytes)?);
254 cursor += 4;
255 }
256 PropertyType::TopicAliasMaximum => {
257 topic_alias_max = Some(read_u16(bytes)?);
258 cursor += 2;
259 }
260 PropertyType::ReasonString => {
261 let reason = read_mqtt_string(bytes)?;
262 cursor += 2 + reason.len();
263 reason_string = Some(reason);
264 }
265 PropertyType::UserProperty => {
266 let key = read_mqtt_string(bytes)?;
267 let value = read_mqtt_string(bytes)?;
268 cursor += 2 + key.len() + 2 + value.len();
269 user_properties.push((key, value));
270 }
271 PropertyType::WildcardSubscriptionAvailable => {
272 wildcard_subscription_available = Some(read_u8(bytes)?);
273 cursor += 1;
274 }
275 PropertyType::SubscriptionIdentifierAvailable => {
276 subscription_identifiers_available = Some(read_u8(bytes)?);
277 cursor += 1;
278 }
279 PropertyType::SharedSubscriptionAvailable => {
280 shared_subscription_available = Some(read_u8(bytes)?);
281 cursor += 1;
282 }
283 PropertyType::ServerKeepAlive => {
284 server_keep_alive = Some(read_u16(bytes)?);
285 cursor += 2;
286 }
287 PropertyType::ResponseInformation => {
288 let info = read_mqtt_string(bytes)?;
289 cursor += 2 + info.len();
290 response_information = Some(info);
291 }
292 PropertyType::ServerReference => {
293 let reference = read_mqtt_string(bytes)?;
294 cursor += 2 + reference.len();
295 server_reference = Some(reference);
296 }
297 PropertyType::AuthenticationMethod => {
298 let method = read_mqtt_string(bytes)?;
299 cursor += 2 + method.len();
300 authentication_method = Some(method);
301 }
302 PropertyType::AuthenticationData => {
303 let data = read_mqtt_bytes(bytes)?;
304 cursor += 2 + data.len();
305 authentication_data = Some(data);
306 }
307 _ => return Err(Error::InvalidPropertyType(prop)),
308 }
309 }
310
311 Ok(Some(ConnAckProperties {
312 session_expiry_interval,
313 receive_max,
314 max_qos,
315 retain_available,
316 max_packet_size,
317 assigned_client_identifier,
318 topic_alias_max,
319 reason_string,
320 user_properties,
321 wildcard_subscription_available,
322 subscription_identifiers_available,
323 shared_subscription_available,
324 server_keep_alive,
325 response_information,
326 server_reference,
327 authentication_method,
328 authentication_data,
329 }))
330 }
331
332 pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
333 let len = self.len();
334 write_remaining_length(buffer, len)?;
335
336 if let Some(session_expiry_interval) = self.session_expiry_interval {
337 buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
338 buffer.put_u32(session_expiry_interval);
339 }
340
341 if let Some(receive_maximum) = self.receive_max {
342 buffer.put_u8(PropertyType::ReceiveMaximum as u8);
343 buffer.put_u16(receive_maximum);
344 }
345
346 if let Some(qos) = self.max_qos {
347 buffer.put_u8(PropertyType::MaximumQos as u8);
348 buffer.put_u8(qos);
349 }
350
351 if let Some(retain_available) = self.retain_available {
352 buffer.put_u8(PropertyType::RetainAvailable as u8);
353 buffer.put_u8(retain_available);
354 }
355
356 if let Some(max_packet_size) = self.max_packet_size {
357 buffer.put_u8(PropertyType::MaximumPacketSize as u8);
358 buffer.put_u32(max_packet_size);
359 }
360
361 if let Some(id) = &self.assigned_client_identifier {
362 buffer.put_u8(PropertyType::AssignedClientIdentifier as u8);
363 write_mqtt_string(buffer, id);
364 }
365
366 if let Some(topic_alias_max) = self.topic_alias_max {
367 buffer.put_u8(PropertyType::TopicAliasMaximum as u8);
368 buffer.put_u16(topic_alias_max);
369 }
370
371 if let Some(reason) = &self.reason_string {
372 buffer.put_u8(PropertyType::ReasonString as u8);
373 write_mqtt_string(buffer, reason);
374 }
375
376 for (key, value) in self.user_properties.iter() {
377 buffer.put_u8(PropertyType::UserProperty as u8);
378 write_mqtt_string(buffer, key);
379 write_mqtt_string(buffer, value);
380 }
381
382 if let Some(w) = self.wildcard_subscription_available {
383 buffer.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
384 buffer.put_u8(w);
385 }
386
387 if let Some(s) = self.subscription_identifiers_available {
388 buffer.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
389 buffer.put_u8(s);
390 }
391
392 if let Some(s) = self.shared_subscription_available {
393 buffer.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
394 buffer.put_u8(s);
395 }
396
397 if let Some(keep_alive) = self.server_keep_alive {
398 buffer.put_u8(PropertyType::ServerKeepAlive as u8);
399 buffer.put_u16(keep_alive);
400 }
401
402 if let Some(info) = &self.response_information {
403 buffer.put_u8(PropertyType::ResponseInformation as u8);
404 write_mqtt_string(buffer, info);
405 }
406
407 if let Some(reference) = &self.server_reference {
408 buffer.put_u8(PropertyType::ServerReference as u8);
409 write_mqtt_string(buffer, reference);
410 }
411
412 if let Some(authentication_method) = &self.authentication_method {
413 buffer.put_u8(PropertyType::AuthenticationMethod as u8);
414 write_mqtt_string(buffer, authentication_method);
415 }
416
417 if let Some(authentication_data) = &self.authentication_data {
418 buffer.put_u8(PropertyType::AuthenticationData as u8);
419 write_mqtt_bytes(buffer, authentication_data);
420 }
421
422 Ok(())
423 }
424}
425
426fn connect_return(num: u8) -> Result<ConnectReturnCode, Error> {
428 let code = match num {
429 0 => ConnectReturnCode::Success,
430 128 => ConnectReturnCode::UnspecifiedError,
431 129 => ConnectReturnCode::MalformedPacket,
432 130 => ConnectReturnCode::ProtocolError,
433 131 => ConnectReturnCode::ImplementationSpecificError,
434 132 => ConnectReturnCode::UnsupportedProtocolVersion,
435 133 => ConnectReturnCode::ClientIdentifierNotValid,
436 134 => ConnectReturnCode::BadUserNamePassword,
437 135 => ConnectReturnCode::NotAuthorized,
438 136 => ConnectReturnCode::ServerUnavailable,
439 137 => ConnectReturnCode::ServerBusy,
440 138 => ConnectReturnCode::Banned,
441 140 => ConnectReturnCode::BadAuthenticationMethod,
442 144 => ConnectReturnCode::TopicNameInvalid,
443 149 => ConnectReturnCode::PacketTooLarge,
444 151 => ConnectReturnCode::QuotaExceeded,
445 153 => ConnectReturnCode::PayloadFormatInvalid,
446 154 => ConnectReturnCode::RetainNotSupported,
447 155 => ConnectReturnCode::QoSNotSupported,
448 156 => ConnectReturnCode::UseAnotherServer,
449 157 => ConnectReturnCode::ServerMoved,
450 159 => ConnectReturnCode::ConnectionRateExceeded,
451 num => return Err(Error::InvalidConnectReturnCode(num)),
452 };
453
454 Ok(code)
455}
456
457fn connect_code(return_code: ConnectReturnCode) -> u8 {
458 match return_code {
459 ConnectReturnCode::Success => 0,
460 ConnectReturnCode::UnspecifiedError => 128,
461 ConnectReturnCode::MalformedPacket => 129,
462 ConnectReturnCode::ProtocolError => 130,
463 ConnectReturnCode::ImplementationSpecificError => 131,
464 ConnectReturnCode::UnsupportedProtocolVersion => 132,
465 ConnectReturnCode::ClientIdentifierNotValid => 133,
466 ConnectReturnCode::BadUserNamePassword => 134,
467 ConnectReturnCode::NotAuthorized => 135,
468 ConnectReturnCode::ServerUnavailable => 136,
469 ConnectReturnCode::ServerBusy => 137,
470 ConnectReturnCode::Banned => 138,
471 ConnectReturnCode::BadAuthenticationMethod => 140,
472 ConnectReturnCode::TopicNameInvalid => 144,
473 ConnectReturnCode::PacketTooLarge => 149,
474 ConnectReturnCode::QuotaExceeded => 151,
475 ConnectReturnCode::PayloadFormatInvalid => 153,
476 ConnectReturnCode::RetainNotSupported => 154,
477 ConnectReturnCode::QoSNotSupported => 155,
478 ConnectReturnCode::UseAnotherServer => 156,
479 ConnectReturnCode::ServerMoved => 157,
480 ConnectReturnCode::ConnectionRateExceeded => 159,
481 _ => unreachable!(),
482 }
483}
484
485#[cfg(test)]
486mod test {
487 use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
488 use super::*;
489 use bytes::BytesMut;
490 use pretty_assertions::assert_eq;
491
492 #[test]
493 fn length_calculation() {
494 let mut dummy_bytes = BytesMut::new();
495 let connack_props = ConnAckProperties {
498 session_expiry_interval: None,
499 receive_max: None,
500 max_qos: None,
501 retain_available: None,
502 max_packet_size: None,
503 assigned_client_identifier: None,
504 topic_alias_max: None,
505 reason_string: None,
506 user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
507 wildcard_subscription_available: None,
508 subscription_identifiers_available: None,
509 shared_subscription_available: None,
510 server_keep_alive: None,
511 response_information: None,
512 server_reference: None,
513 authentication_method: None,
514 authentication_data: None,
515 };
516
517 let connack_pkt = ConnAck {
518 session_present: false,
519 code: ConnectReturnCode::Success,
520 properties: Some(connack_props),
521 };
522
523 let size_from_size = connack_pkt.size();
524 let size_from_write = connack_pkt.write(&mut dummy_bytes).unwrap();
525 let size_from_bytes = dummy_bytes.len();
526
527 assert_eq!(size_from_write, size_from_bytes);
528 assert_eq!(size_from_size, size_from_bytes);
529 }
530}