1use crate::{Event, Incoming, Outgoing, Request};
2
3use crate::mqttbytes::v4::*;
4use crate::mqttbytes::{self, *};
5use bytes::BytesMut;
6use std::collections::VecDeque;
7use std::{io, time::Instant};
8
9#[derive(Debug, thiserror::Error)]
11pub enum StateError {
12 #[error("Io error: {0:?}")]
14 Io(#[from] io::Error),
15 #[error("Invalid state for a given operation")]
17 InvalidState,
18 #[error("Received unsolicited ack pkid: {0}")]
20 Unsolicited(u16),
21 #[error("Last pingreq isn't acked")]
23 AwaitPingResp,
24 #[error("Received a wrong packet while waiting for another packet")]
26 WrongPacket,
27 #[error("Timeout while waiting to resolve collision")]
28 CollisionTimeout,
29 #[error("A Subscribe packet must contain atleast one filter")]
30 EmptySubscription,
31 #[error("Mqtt serialization/deserialization error: {0}")]
32 Deserialization(#[from] mqttbytes::Error),
33 #[error("Cannot send packet of size '{pkt_size:?}'. It's greater than the broker's maximum packet size of: '{max:?}'")]
34 OutgoingPacketTooLarge { pkt_size: usize, max: usize },
35}
36
37#[derive(Debug, Clone)]
44pub struct MqttState {
45 pub await_pingresp: bool,
47 pub collision_ping_count: usize,
51 last_incoming: Instant,
53 last_outgoing: Instant,
55 pub(crate) last_pkid: u16,
57 pub(crate) last_puback: u16,
59 pub(crate) inflight: u16,
61 pub(crate) max_inflight: u16,
63 pub(crate) outgoing_pub: Vec<Option<Publish>>,
65 pub(crate) outgoing_rel: Vec<Option<u16>>,
67 pub(crate) incoming_pub: Vec<Option<u16>>,
69 pub collision: Option<Publish>,
71 pub events: VecDeque<Event>,
73 pub write: BytesMut,
75 pub manual_acks: bool,
77 pub max_outgoing_packet_size: usize,
79}
80
81impl MqttState {
82 pub fn new(max_inflight: u16, manual_acks: bool, max_outgoing_packet_size: usize) -> Self {
86 MqttState {
87 await_pingresp: false,
88 collision_ping_count: 0,
89 last_incoming: Instant::now(),
90 last_outgoing: Instant::now(),
91 last_pkid: 0,
92 last_puback: 0,
93 inflight: 0,
94 max_inflight,
95 outgoing_pub: vec![None; max_inflight as usize + 1],
97 outgoing_rel: vec![None; max_inflight as usize + 1],
98 incoming_pub: vec![None; std::u16::MAX as usize + 1],
99 collision: None,
100 events: VecDeque::with_capacity(100),
102 write: BytesMut::with_capacity(10 * 1024),
103 manual_acks,
104 max_outgoing_packet_size,
105 }
106 }
107
108 pub fn clean(&mut self) -> Vec<Request> {
110 let mut pending = Vec::with_capacity(100);
111 let (first_half, second_half) = self
112 .outgoing_pub
113 .split_at_mut(self.last_puback as usize + 1);
114
115 for publish in second_half.iter_mut().chain(first_half) {
116 if let Some(publish) = publish.take() {
117 let request = Request::Publish(publish);
118 pending.push(request);
119 }
120 }
121
122 for rel in self.outgoing_rel.iter_mut() {
124 if let Some(pkid) = rel.take() {
125 let request = Request::PubRel(PubRel::new(pkid));
126 pending.push(request);
127 }
128 }
129
130 for id in self.incoming_pub.iter_mut() {
132 id.take();
133 }
134
135 self.await_pingresp = false;
136 self.collision_ping_count = 0;
137 self.inflight = 0;
138 self.write.clear();
139 pending
140 }
141
142 pub fn inflight(&self) -> u16 {
143 self.inflight
144 }
145
146 pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
149 self.check_size(request.size())?;
151 match request {
152 Request::Publish(publish) => self.outgoing_publish(publish)?,
153 Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?,
154 Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
155 Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
156 Request::PingReq(_) => self.outgoing_ping()?,
157 Request::Disconnect(_) => self.outgoing_disconnect()?,
158 Request::PubAck(puback) => self.outgoing_puback(puback)?,
159 Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?,
160 _ => unimplemented!(),
161 };
162
163 self.last_outgoing = Instant::now();
164 Ok(())
165 }
166
167 pub fn handle_incoming_packet(&mut self, packet: Incoming) -> Result<(), StateError> {
172 let out = match &packet {
173 Incoming::PingResp => self.handle_incoming_pingresp(),
174 Incoming::Publish(publish) => self.handle_incoming_publish(publish),
175 Incoming::SubAck(_suback) => self.handle_incoming_suback(),
176 Incoming::UnsubAck(_unsuback) => self.handle_incoming_unsuback(),
177 Incoming::PubAck(puback) => self.handle_incoming_puback(puback),
178 Incoming::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec),
179 Incoming::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel),
180 Incoming::PubComp(pubcomp) => self.handle_incoming_pubcomp(pubcomp),
181 _ => {
182 error!("Invalid incoming packet = {:?}", packet);
183 return Err(StateError::WrongPacket);
184 }
185 };
186
187 out?;
188 self.events.push_back(Event::Incoming(packet));
189 self.last_incoming = Instant::now();
190 Ok(())
191 }
192
193 fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
194 Ok(())
195 }
196
197 fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
198 Ok(())
199 }
200
201 fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
204 let qos = publish.qos;
205
206 match qos {
207 QoS::AtMostOnce => Ok(()),
208 QoS::AtLeastOnce => {
209 if !self.manual_acks {
210 let puback = PubAck::new(publish.pkid);
211 self.outgoing_puback(puback)?;
212 }
213 Ok(())
214 }
215 QoS::ExactlyOnce => {
216 let pkid = publish.pkid;
217 self.incoming_pub[pkid as usize] = Some(pkid);
218
219 if !self.manual_acks {
220 let pubrec = PubRec::new(pkid);
221 self.outgoing_pubrec(pubrec)?;
222 }
223 Ok(())
224 }
225 }
226 }
227
228 fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result<(), StateError> {
229 let publish = self
230 .outgoing_pub
231 .get_mut(puback.pkid as usize)
232 .ok_or(StateError::Unsolicited(puback.pkid))?;
233
234 self.last_puback = puback.pkid;
235 let v = match publish.take() {
236 Some(_) => {
237 self.inflight -= 1;
238 Ok(())
239 }
240 None => {
241 error!("Unsolicited puback packet: {:?}", puback.pkid);
242 Err(StateError::Unsolicited(puback.pkid))
243 }
244 };
245
246 if let Some(publish) = self.check_collision(puback.pkid) {
247 self.outgoing_pub[publish.pkid as usize] = Some(publish.clone());
248 self.inflight += 1;
249
250 publish.write(&mut self.write)?;
251 let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
252 self.events.push_back(event);
253 self.collision_ping_count = 0;
254 }
255
256 v
257 }
258
259 fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<(), StateError> {
260 let publish = self
261 .outgoing_pub
262 .get_mut(pubrec.pkid as usize)
263 .ok_or(StateError::Unsolicited(pubrec.pkid))?;
264 match publish.take() {
265 Some(_) => {
266 self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
268 PubRel::new(pubrec.pkid).write(&mut self.write)?;
269
270 let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
271 self.events.push_back(event);
272 Ok(())
273 }
274 None => {
275 error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
276 Err(StateError::Unsolicited(pubrec.pkid))
277 }
278 }
279 }
280
281 fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<(), StateError> {
282 let publish = self
283 .incoming_pub
284 .get_mut(pubrel.pkid as usize)
285 .ok_or(StateError::Unsolicited(pubrel.pkid))?;
286 match publish.take() {
287 Some(_) => {
288 PubComp::new(pubrel.pkid).write(&mut self.write)?;
289 let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
290 self.events.push_back(event);
291 Ok(())
292 }
293 None => {
294 error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
295 Err(StateError::Unsolicited(pubrel.pkid))
296 }
297 }
298 }
299
300 fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<(), StateError> {
301 if let Some(publish) = self.check_collision(pubcomp.pkid) {
302 publish.write(&mut self.write)?;
303 let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
304 self.events.push_back(event);
305 self.collision_ping_count = 0;
306 }
307
308 let pubrel = self
309 .outgoing_rel
310 .get_mut(pubcomp.pkid as usize)
311 .ok_or(StateError::Unsolicited(pubcomp.pkid))?;
312 match pubrel.take() {
313 Some(_) => {
314 self.inflight -= 1;
315 Ok(())
316 }
317 None => {
318 error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
319 Err(StateError::Unsolicited(pubcomp.pkid))
320 }
321 }
322 }
323
324 fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
325 self.await_pingresp = false;
326 Ok(())
327 }
328
329 fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> {
332 if publish.qos != QoS::AtMostOnce {
333 if publish.pkid == 0 {
334 publish.pkid = self.next_pkid();
335 }
336
337 let pkid = publish.pkid;
338 if self
339 .outgoing_pub
340 .get(publish.pkid as usize)
341 .ok_or(StateError::Unsolicited(publish.pkid))?
342 .is_some()
343 {
344 info!("Collision on packet id = {:?}", publish.pkid);
345 self.collision = Some(publish);
346 let event = Event::Outgoing(Outgoing::AwaitAck(pkid));
347 self.events.push_back(event);
348 return Ok(());
349 }
350
351 self.outgoing_pub[pkid as usize] = Some(publish.clone());
354 self.inflight += 1;
355 };
356
357 debug!(
358 "Publish. Topic = {}, Pkid = {:?}, Payload Size = {:?}",
359 publish.topic,
360 publish.pkid,
361 publish.payload.len()
362 );
363
364 publish.write(&mut self.write)?;
365 let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
366 self.events.push_back(event);
367 Ok(())
368 }
369
370 fn outgoing_pubrel(&mut self, pubrel: PubRel) -> Result<(), StateError> {
371 let pubrel = self.save_pubrel(pubrel)?;
372
373 debug!("Pubrel. Pkid = {}", pubrel.pkid);
374 PubRel::new(pubrel.pkid).write(&mut self.write)?;
375
376 let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid));
377 self.events.push_back(event);
378 Ok(())
379 }
380
381 fn outgoing_puback(&mut self, puback: PubAck) -> Result<(), StateError> {
382 puback.write(&mut self.write)?;
383 let event = Event::Outgoing(Outgoing::PubAck(puback.pkid));
384 self.events.push_back(event);
385 Ok(())
386 }
387
388 fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result<(), StateError> {
389 pubrec.write(&mut self.write)?;
390 let event = Event::Outgoing(Outgoing::PubRec(pubrec.pkid));
391 self.events.push_back(event);
392 Ok(())
393 }
394
395 fn outgoing_ping(&mut self) -> Result<(), StateError> {
399 let elapsed_in = self.last_incoming.elapsed();
400 let elapsed_out = self.last_outgoing.elapsed();
401
402 if self.collision.is_some() {
403 self.collision_ping_count += 1;
404 if self.collision_ping_count >= 2 {
405 return Err(StateError::CollisionTimeout);
406 }
407 }
408
409 if self.await_pingresp {
411 return Err(StateError::AwaitPingResp);
412 }
413
414 self.await_pingresp = true;
415
416 debug!(
417 "Pingreq,
418 last incoming packet before {} millisecs,
419 last outgoing request before {} millisecs",
420 elapsed_in.as_millis(),
421 elapsed_out.as_millis()
422 );
423
424 PingReq.write(&mut self.write)?;
425 let event = Event::Outgoing(Outgoing::PingReq);
426 self.events.push_back(event);
427 Ok(())
428 }
429
430 fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
431 if subscription.filters.is_empty() {
432 return Err(StateError::EmptySubscription);
433 }
434
435 let pkid = self.next_pkid();
436 subscription.pkid = pkid;
437
438 debug!(
439 "Subscribe. Topics = {:?}, Pkid = {:?}",
440 subscription.filters, subscription.pkid
441 );
442
443 subscription.write(&mut self.write)?;
444 let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid));
445 self.events.push_back(event);
446 Ok(())
447 }
448
449 fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> {
450 let pkid = self.next_pkid();
451 unsub.pkid = pkid;
452
453 debug!(
454 "Unsubscribe. Topics = {:?}, Pkid = {:?}",
455 unsub.topics, unsub.pkid
456 );
457
458 unsub.write(&mut self.write)?;
459 let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid));
460 self.events.push_back(event);
461 Ok(())
462 }
463
464 fn outgoing_disconnect(&mut self) -> Result<(), StateError> {
465 debug!("Disconnect");
466
467 Disconnect.write(&mut self.write)?;
468 let event = Event::Outgoing(Outgoing::Disconnect);
469 self.events.push_back(event);
470 Ok(())
471 }
472
473 fn check_collision(&mut self, pkid: u16) -> Option<Publish> {
474 if let Some(publish) = &self.collision {
475 if publish.pkid == pkid {
476 return self.collision.take();
477 }
478 }
479
480 None
481 }
482
483 fn check_size(&self, pkt_size: usize) -> Result<(), StateError> {
484 if pkt_size > self.max_outgoing_packet_size {
485 Err(StateError::OutgoingPacketTooLarge {
486 pkt_size,
487 max: self.max_outgoing_packet_size,
488 })
489 } else {
490 Ok(())
491 }
492 }
493
494 fn save_pubrel(&mut self, mut pubrel: PubRel) -> Result<PubRel, StateError> {
495 let pubrel = match pubrel.pkid {
496 0 => {
498 pubrel.pkid = self.next_pkid();
499 pubrel
500 }
501 _ => pubrel,
502 };
503
504 self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
505 self.inflight += 1;
506 Ok(pubrel)
507 }
508
509 fn next_pkid(&mut self) -> u16 {
513 let next_pkid = self.last_pkid + 1;
514
515 if next_pkid == self.max_inflight {
520 self.last_pkid = 0;
521 return next_pkid;
522 }
523
524 self.last_pkid = next_pkid;
525 next_pkid
526 }
527}
528
529#[cfg(test)]
530mod test {
531 use super::{MqttState, StateError};
532 use crate::mqttbytes::v4::*;
533 use crate::mqttbytes::*;
534 use crate::{Event, Incoming, Outgoing, Request};
535 use bytes::BufMut;
536
537 fn build_outgoing_publish(qos: QoS) -> Publish {
538 let topic = "hello/world".to_owned();
539 let payload = vec![1, 2, 3];
540
541 let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
542 publish.qos = qos;
543 publish
544 }
545
546 fn build_incoming_publish(qos: QoS, pkid: u16) -> Publish {
547 let topic = "hello/world".to_owned();
548 let payload = vec![1, 2, 3];
549
550 let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
551 publish.pkid = pkid;
552 publish.qos = qos;
553 publish
554 }
555
556 fn build_mqttstate() -> MqttState {
557 MqttState::new(100, false, usize::MAX)
558 }
559
560 #[test]
561 fn next_pkid_increments_as_expected() {
562 let mut mqtt = build_mqttstate();
563
564 for i in 1..=100 {
565 let pkid = mqtt.next_pkid();
566
567 let expected = i % 100;
569 if expected == 0 {
570 break;
571 }
572
573 assert_eq!(expected, pkid);
574 }
575 }
576
577 #[test]
578 fn outgoing_max_packet_size_check() {
579 let mut mqtt = MqttState::new(100, false, 200);
580
581 let small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100]);
582 assert_eq!(
583 mqtt.handle_outgoing_packet(Request::Publish(small_publish))
584 .is_ok(),
585 true
586 );
587
588 let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265]);
589 assert_eq!(
590 mqtt.handle_outgoing_packet(Request::Publish(large_publish))
591 .is_ok(),
592 false
593 );
594 }
595
596 #[test]
597 fn outgoing_publish_should_set_pkid_and_add_publish_to_queue() {
598 let mut mqtt = build_mqttstate();
599
600 let publish = build_outgoing_publish(QoS::AtMostOnce);
602
603 mqtt.outgoing_publish(publish).unwrap();
605 assert_eq!(mqtt.last_pkid, 0);
606 assert_eq!(mqtt.inflight, 0);
607
608 let publish = build_outgoing_publish(QoS::AtLeastOnce);
610
611 mqtt.outgoing_publish(publish.clone()).unwrap();
613 assert_eq!(mqtt.last_pkid, 1);
614 assert_eq!(mqtt.inflight, 1);
615
616 mqtt.outgoing_publish(publish).unwrap();
618 assert_eq!(mqtt.last_pkid, 2);
619 assert_eq!(mqtt.inflight, 2);
620
621 let publish = build_outgoing_publish(QoS::ExactlyOnce);
623
624 mqtt.outgoing_publish(publish.clone()).unwrap();
626 assert_eq!(mqtt.last_pkid, 3);
627 assert_eq!(mqtt.inflight, 3);
628
629 mqtt.outgoing_publish(publish).unwrap();
631 assert_eq!(mqtt.last_pkid, 4);
632 assert_eq!(mqtt.inflight, 4);
633 }
634
635 #[test]
636 fn incoming_publish_should_be_added_to_queue_correctly() {
637 let mut mqtt = build_mqttstate();
638
639 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
641 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
642 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
643
644 mqtt.handle_incoming_publish(&publish1).unwrap();
645 mqtt.handle_incoming_publish(&publish2).unwrap();
646 mqtt.handle_incoming_publish(&publish3).unwrap();
647
648 let pkid = mqtt.incoming_pub[3].unwrap();
649
650 assert_eq!(pkid, 3);
652 }
653
654 #[test]
655 fn incoming_publish_should_be_acked() {
656 let mut mqtt = build_mqttstate();
657
658 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
660 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
661 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
662
663 mqtt.handle_incoming_publish(&publish1).unwrap();
664 mqtt.handle_incoming_publish(&publish2).unwrap();
665 mqtt.handle_incoming_publish(&publish3).unwrap();
666
667 if let Event::Outgoing(Outgoing::PubAck(pkid)) = mqtt.events[0] {
668 assert_eq!(pkid, 2);
669 } else {
670 panic!("missing puback");
671 }
672
673 if let Event::Outgoing(Outgoing::PubRec(pkid)) = mqtt.events[1] {
674 assert_eq!(pkid, 3);
675 } else {
676 panic!("missing PubRec");
677 }
678 }
679
680 #[test]
681 fn incoming_publish_should_not_be_acked_with_manual_acks() {
682 let mut mqtt = build_mqttstate();
683 mqtt.manual_acks = true;
684
685 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
687 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
688 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
689
690 mqtt.handle_incoming_publish(&publish1).unwrap();
691 mqtt.handle_incoming_publish(&publish2).unwrap();
692 mqtt.handle_incoming_publish(&publish3).unwrap();
693
694 let pkid = mqtt.incoming_pub[3].unwrap();
695 assert_eq!(pkid, 3);
696
697 assert!(mqtt.events.is_empty());
698 }
699
700 #[test]
701 fn incoming_qos2_publish_should_send_rec_to_network_and_publish_to_user() {
702 let mut mqtt = build_mqttstate();
703 let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
704
705 mqtt.handle_incoming_publish(&publish).unwrap();
706 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
707 match packet {
708 Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
709 _ => panic!("Invalid network request: {:?}", packet),
710 }
711 }
712
713 #[test]
714 fn incoming_puback_should_remove_correct_publish_from_queue() {
715 let mut mqtt = build_mqttstate();
716
717 let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
718 let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
719
720 mqtt.outgoing_publish(publish1).unwrap();
721 mqtt.outgoing_publish(publish2).unwrap();
722 assert_eq!(mqtt.inflight, 2);
723
724 mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap();
725 assert_eq!(mqtt.inflight, 1);
726
727 mqtt.handle_incoming_puback(&PubAck::new(2)).unwrap();
728 assert_eq!(mqtt.inflight, 0);
729
730 assert!(mqtt.outgoing_pub[1].is_none());
731 assert!(mqtt.outgoing_pub[2].is_none());
732 }
733
734 #[test]
735 fn incoming_puback_with_pkid_greater_than_max_inflight_should_be_handled_gracefully() {
736 let mut mqtt = build_mqttstate();
737
738 let got = mqtt.handle_incoming_puback(&PubAck::new(101)).unwrap_err();
739
740 match got {
741 StateError::Unsolicited(pkid) => assert_eq!(pkid, 101),
742 e => panic!("Unexpected error: {}", e),
743 }
744 }
745
746 #[test]
747 fn incoming_pubrec_should_release_publish_from_queue_and_add_relid_to_rel_queue() {
748 let mut mqtt = build_mqttstate();
749
750 let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
751 let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
752
753 let _publish_out = mqtt.outgoing_publish(publish1);
754 let _publish_out = mqtt.outgoing_publish(publish2);
755
756 mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap();
757 assert_eq!(mqtt.inflight, 2);
758
759 let backup = mqtt.outgoing_pub[1].clone();
761 assert_eq!(backup.unwrap().pkid, 1);
762
763 assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
765 }
766
767 #[test]
768 fn incoming_pubrec_should_send_release_to_network_and_nothing_to_user() {
769 let mut mqtt = build_mqttstate();
770
771 let publish = build_outgoing_publish(QoS::ExactlyOnce);
772 mqtt.outgoing_publish(publish).unwrap();
773 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
774 match packet {
775 Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
776 packet => panic!("Invalid network request: {:?}", packet),
777 }
778
779 mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
780 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
781 match packet {
782 Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
783 packet => panic!("Invalid network request: {:?}", packet),
784 }
785 }
786
787 #[test]
788 fn incoming_pubrel_should_send_comp_to_network_and_nothing_to_user() {
789 let mut mqtt = build_mqttstate();
790 let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
791
792 mqtt.handle_incoming_publish(&publish).unwrap();
793 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
794 match packet {
795 Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
796 packet => panic!("Invalid network request: {:?}", packet),
797 }
798
799 mqtt.handle_incoming_pubrel(&PubRel::new(1)).unwrap();
800 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
801 match packet {
802 Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
803 packet => panic!("Invalid network request: {:?}", packet),
804 }
805 }
806
807 #[test]
808 fn incoming_pubcomp_should_release_correct_pkid_from_release_queue() {
809 let mut mqtt = build_mqttstate();
810 let publish = build_outgoing_publish(QoS::ExactlyOnce);
811
812 mqtt.outgoing_publish(publish).unwrap();
813 mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
814
815 mqtt.handle_incoming_pubcomp(&PubComp::new(1)).unwrap();
816 assert_eq!(mqtt.inflight, 0);
817 }
818
819 #[test]
820 fn outgoing_ping_handle_should_throw_errors_for_no_pingresp() {
821 let mut mqtt = build_mqttstate();
822 mqtt.outgoing_ping().unwrap();
823
824 let publish = build_outgoing_publish(QoS::AtLeastOnce);
826 mqtt.handle_outgoing_packet(Request::Publish(publish))
827 .unwrap();
828 mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1)))
829 .unwrap();
830
831 match mqtt.outgoing_ping() {
833 Ok(_) => panic!("Should throw pingresp await error"),
834 Err(StateError::AwaitPingResp) => (),
835 Err(e) => panic!("Should throw pingresp await error. Error = {:?}", e),
836 }
837 }
838
839 #[test]
840 fn outgoing_ping_handle_should_succeed_if_pingresp_is_received() {
841 let mut mqtt = build_mqttstate();
842
843 mqtt.outgoing_ping().unwrap();
845 mqtt.handle_incoming_packet(Incoming::PingResp).unwrap();
846
847 mqtt.outgoing_ping().unwrap();
849 }
850
851 #[test]
852 fn state_should_be_clean_properly() {
853 let mut mqtt = build_mqttstate();
854 mqtt.write.put(&b"test"[..]);
855 mqtt.clean();
857 assert!(mqtt.write.is_empty());
858 }
859
860 #[test]
861 fn clean_is_calculating_pending_correctly() {
862 let mut mqtt = build_mqttstate();
863
864 fn build_outgoing_pub() -> Vec<Option<Publish>> {
865 vec![
866 None,
867 Some(Publish {
868 dup: false,
869 qos: QoS::AtMostOnce,
870 retain: false,
871 topic: "test".to_string(),
872 pkid: 1,
873 payload: "".into(),
874 }),
875 Some(Publish {
876 dup: false,
877 qos: QoS::AtMostOnce,
878 retain: false,
879 topic: "test".to_string(),
880 pkid: 2,
881 payload: "".into(),
882 }),
883 Some(Publish {
884 dup: false,
885 qos: QoS::AtMostOnce,
886 retain: false,
887 topic: "test".to_string(),
888 pkid: 3,
889 payload: "".into(),
890 }),
891 None,
892 None,
893 Some(Publish {
894 dup: false,
895 qos: QoS::AtMostOnce,
896 retain: false,
897 topic: "test".to_string(),
898 pkid: 6,
899 payload: "".into(),
900 }),
901 ]
902 }
903
904 mqtt.outgoing_pub = build_outgoing_pub();
905 mqtt.last_puback = 3;
906 let requests = mqtt.clean();
907 let res = vec![6, 1, 2, 3];
908 for (req, idx) in requests.iter().zip(res) {
909 if let Request::Publish(publish) = req {
910 assert_eq!(publish.pkid, idx);
911 } else {
912 unreachable!()
913 }
914 }
915
916 mqtt.outgoing_pub = build_outgoing_pub();
917 mqtt.last_puback = 0;
918 let requests = mqtt.clean();
919 let res = vec![1, 2, 3, 6];
920 for (req, idx) in requests.iter().zip(res) {
921 if let Request::Publish(publish) = req {
922 assert_eq!(publish.pkid, idx);
923 } else {
924 unreachable!()
925 }
926 }
927
928 mqtt.outgoing_pub = build_outgoing_pub();
929 mqtt.last_puback = 6;
930 let requests = mqtt.clean();
931 let res = vec![1, 2, 3, 6];
932 for (req, idx) in requests.iter().zip(res) {
933 if let Request::Publish(publish) = req {
934 assert_eq!(publish.pkid, idx);
935 } else {
936 unreachable!()
937 }
938 }
939 }
940}