rumqttc/
state.rs

1use crate::{Event, Incoming, Outgoing, Request};
2
3use crate::mqttbytes::v4::*;
4use crate::mqttbytes::{self, *};
5use bytes::BytesMut;
6use std::collections::VecDeque;
7use std::{io, time::Instant};
8
9/// Errors during state handling
10#[derive(Debug, thiserror::Error)]
11pub enum StateError {
12    /// Io Error while state is passed to network
13    #[error("Io error: {0:?}")]
14    Io(#[from] io::Error),
15    /// Invalid state for a given operation
16    #[error("Invalid state for a given operation")]
17    InvalidState,
18    /// Received a packet (ack) which isn't asked for
19    #[error("Received unsolicited ack pkid: {0}")]
20    Unsolicited(u16),
21    /// Last pingreq isn't acked
22    #[error("Last pingreq isn't acked")]
23    AwaitPingResp,
24    /// Received a wrong packet while waiting for another packet
25    #[error("Received a wrong packet while waiting for another packet")]
26    WrongPacket,
27    #[error("Timeout while waiting to resolve collision")]
28    CollisionTimeout,
29    #[error("A Subscribe packet must contain atleast one filter")]
30    EmptySubscription,
31    #[error("Mqtt serialization/deserialization error: {0}")]
32    Deserialization(#[from] mqttbytes::Error),
33    #[error("Cannot send packet of size '{pkt_size:?}'. It's greater than the broker's maximum packet size of: '{max:?}'")]
34    OutgoingPacketTooLarge { pkt_size: usize, max: usize },
35}
36
37/// State of the mqtt connection.
38// Design: Methods will just modify the state of the object without doing any network operations
39// Design: All inflight queues are maintained in a pre initialized vec with index as packet id.
40// This is done for 2 reasons
41// Bad acks or out of order acks aren't O(n) causing cpu spikes
42// Any missing acks from the broker are detected during the next recycled use of packet ids
43#[derive(Debug, Clone)]
44pub struct MqttState {
45    /// Status of last ping
46    pub await_pingresp: bool,
47    /// Collision ping count. Collisions stop user requests
48    /// which inturn trigger pings. Multiple pings without
49    /// resolving collisions will result in error
50    pub collision_ping_count: usize,
51    /// Last incoming packet time
52    last_incoming: Instant,
53    /// Last outgoing packet time
54    last_outgoing: Instant,
55    /// Packet id of the last outgoing packet
56    pub(crate) last_pkid: u16,
57    /// Packet id of the last acked publish
58    pub(crate) last_puback: u16,
59    /// Number of outgoing inflight publishes
60    pub(crate) inflight: u16,
61    /// Maximum number of allowed inflight
62    pub(crate) max_inflight: u16,
63    /// Outgoing QoS 1, 2 publishes which aren't acked yet
64    pub(crate) outgoing_pub: Vec<Option<Publish>>,
65    /// Packet ids of released QoS 2 publishes
66    pub(crate) outgoing_rel: Vec<Option<u16>>,
67    /// Packet ids on incoming QoS 2 publishes
68    pub(crate) incoming_pub: Vec<Option<u16>>,
69    /// Last collision due to broker not acking in order
70    pub collision: Option<Publish>,
71    /// Buffered incoming packets
72    pub events: VecDeque<Event>,
73    /// Write buffer
74    pub write: BytesMut,
75    /// Indicates if acknowledgements should be send immediately
76    pub manual_acks: bool,
77    /// Maximum outgoing packet size, set via MqttOptions
78    pub max_outgoing_packet_size: usize,
79}
80
81impl MqttState {
82    /// Creates new mqtt state. Same state should be used during a
83    /// connection for persistent sessions while new state should
84    /// instantiated for clean sessions
85    pub fn new(max_inflight: u16, manual_acks: bool, max_outgoing_packet_size: usize) -> Self {
86        MqttState {
87            await_pingresp: false,
88            collision_ping_count: 0,
89            last_incoming: Instant::now(),
90            last_outgoing: Instant::now(),
91            last_pkid: 0,
92            last_puback: 0,
93            inflight: 0,
94            max_inflight,
95            // index 0 is wasted as 0 is not a valid packet id
96            outgoing_pub: vec![None; max_inflight as usize + 1],
97            outgoing_rel: vec![None; max_inflight as usize + 1],
98            incoming_pub: vec![None; std::u16::MAX as usize + 1],
99            collision: None,
100            // TODO: Optimize these sizes later
101            events: VecDeque::with_capacity(100),
102            write: BytesMut::with_capacity(10 * 1024),
103            manual_acks,
104            max_outgoing_packet_size,
105        }
106    }
107
108    /// Returns inflight outgoing packets and clears internal queues
109    pub fn clean(&mut self) -> Vec<Request> {
110        let mut pending = Vec::with_capacity(100);
111        let (first_half, second_half) = self
112            .outgoing_pub
113            .split_at_mut(self.last_puback as usize + 1);
114
115        for publish in second_half.iter_mut().chain(first_half) {
116            if let Some(publish) = publish.take() {
117                let request = Request::Publish(publish);
118                pending.push(request);
119            }
120        }
121
122        // remove and collect pending releases
123        for rel in self.outgoing_rel.iter_mut() {
124            if let Some(pkid) = rel.take() {
125                let request = Request::PubRel(PubRel::new(pkid));
126                pending.push(request);
127            }
128        }
129
130        // remove packed ids of incoming qos2 publishes
131        for id in self.incoming_pub.iter_mut() {
132            id.take();
133        }
134
135        self.await_pingresp = false;
136        self.collision_ping_count = 0;
137        self.inflight = 0;
138        self.write.clear();
139        pending
140    }
141
142    pub fn inflight(&self) -> u16 {
143        self.inflight
144    }
145
146    /// Consolidates handling of all outgoing mqtt packet logic. Returns a packet which should
147    /// be put on to the network by the eventloop
148    pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
149        // Enforce max outgoing packet size
150        self.check_size(request.size())?;
151        match request {
152            Request::Publish(publish) => self.outgoing_publish(publish)?,
153            Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?,
154            Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
155            Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
156            Request::PingReq(_) => self.outgoing_ping()?,
157            Request::Disconnect(_) => self.outgoing_disconnect()?,
158            Request::PubAck(puback) => self.outgoing_puback(puback)?,
159            Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?,
160            _ => unimplemented!(),
161        };
162
163        self.last_outgoing = Instant::now();
164        Ok(())
165    }
166
167    /// Consolidates handling of all incoming mqtt packets. Returns a `Notification` which for the
168    /// user to consume and `Packet` which for the eventloop to put on the network
169    /// E.g For incoming QoS1 publish packet, this method returns (Publish, Puback). Publish packet will
170    /// be forwarded to user and Pubck packet will be written to network
171    pub fn handle_incoming_packet(&mut self, packet: Incoming) -> Result<(), StateError> {
172        let out = match &packet {
173            Incoming::PingResp => self.handle_incoming_pingresp(),
174            Incoming::Publish(publish) => self.handle_incoming_publish(publish),
175            Incoming::SubAck(_suback) => self.handle_incoming_suback(),
176            Incoming::UnsubAck(_unsuback) => self.handle_incoming_unsuback(),
177            Incoming::PubAck(puback) => self.handle_incoming_puback(puback),
178            Incoming::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec),
179            Incoming::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel),
180            Incoming::PubComp(pubcomp) => self.handle_incoming_pubcomp(pubcomp),
181            _ => {
182                error!("Invalid incoming packet = {:?}", packet);
183                return Err(StateError::WrongPacket);
184            }
185        };
186
187        out?;
188        self.events.push_back(Event::Incoming(packet));
189        self.last_incoming = Instant::now();
190        Ok(())
191    }
192
193    fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
194        Ok(())
195    }
196
197    fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
198        Ok(())
199    }
200
201    /// Results in a publish notification in all the QoS cases. Replys with an ack
202    /// in case of QoS1 and Replys rec in case of QoS while also storing the message
203    fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
204        let qos = publish.qos;
205
206        match qos {
207            QoS::AtMostOnce => Ok(()),
208            QoS::AtLeastOnce => {
209                if !self.manual_acks {
210                    let puback = PubAck::new(publish.pkid);
211                    self.outgoing_puback(puback)?;
212                }
213                Ok(())
214            }
215            QoS::ExactlyOnce => {
216                let pkid = publish.pkid;
217                self.incoming_pub[pkid as usize] = Some(pkid);
218
219                if !self.manual_acks {
220                    let pubrec = PubRec::new(pkid);
221                    self.outgoing_pubrec(pubrec)?;
222                }
223                Ok(())
224            }
225        }
226    }
227
228    fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result<(), StateError> {
229        let publish = self
230            .outgoing_pub
231            .get_mut(puback.pkid as usize)
232            .ok_or(StateError::Unsolicited(puback.pkid))?;
233
234        self.last_puback = puback.pkid;
235        let v = match publish.take() {
236            Some(_) => {
237                self.inflight -= 1;
238                Ok(())
239            }
240            None => {
241                error!("Unsolicited puback packet: {:?}", puback.pkid);
242                Err(StateError::Unsolicited(puback.pkid))
243            }
244        };
245
246        if let Some(publish) = self.check_collision(puback.pkid) {
247            self.outgoing_pub[publish.pkid as usize] = Some(publish.clone());
248            self.inflight += 1;
249
250            publish.write(&mut self.write)?;
251            let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
252            self.events.push_back(event);
253            self.collision_ping_count = 0;
254        }
255
256        v
257    }
258
259    fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<(), StateError> {
260        let publish = self
261            .outgoing_pub
262            .get_mut(pubrec.pkid as usize)
263            .ok_or(StateError::Unsolicited(pubrec.pkid))?;
264        match publish.take() {
265            Some(_) => {
266                // NOTE: Inflight - 1 for qos2 in comp
267                self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
268                PubRel::new(pubrec.pkid).write(&mut self.write)?;
269
270                let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
271                self.events.push_back(event);
272                Ok(())
273            }
274            None => {
275                error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
276                Err(StateError::Unsolicited(pubrec.pkid))
277            }
278        }
279    }
280
281    fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<(), StateError> {
282        let publish = self
283            .incoming_pub
284            .get_mut(pubrel.pkid as usize)
285            .ok_or(StateError::Unsolicited(pubrel.pkid))?;
286        match publish.take() {
287            Some(_) => {
288                PubComp::new(pubrel.pkid).write(&mut self.write)?;
289                let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
290                self.events.push_back(event);
291                Ok(())
292            }
293            None => {
294                error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
295                Err(StateError::Unsolicited(pubrel.pkid))
296            }
297        }
298    }
299
300    fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<(), StateError> {
301        if let Some(publish) = self.check_collision(pubcomp.pkid) {
302            publish.write(&mut self.write)?;
303            let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
304            self.events.push_back(event);
305            self.collision_ping_count = 0;
306        }
307
308        let pubrel = self
309            .outgoing_rel
310            .get_mut(pubcomp.pkid as usize)
311            .ok_or(StateError::Unsolicited(pubcomp.pkid))?;
312        match pubrel.take() {
313            Some(_) => {
314                self.inflight -= 1;
315                Ok(())
316            }
317            None => {
318                error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
319                Err(StateError::Unsolicited(pubcomp.pkid))
320            }
321        }
322    }
323
324    fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
325        self.await_pingresp = false;
326        Ok(())
327    }
328
329    /// Adds next packet identifier to QoS 1 and 2 publish packets and returns
330    /// it buy wrapping publish in packet
331    fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> {
332        if publish.qos != QoS::AtMostOnce {
333            if publish.pkid == 0 {
334                publish.pkid = self.next_pkid();
335            }
336
337            let pkid = publish.pkid;
338            if self
339                .outgoing_pub
340                .get(publish.pkid as usize)
341                .ok_or(StateError::Unsolicited(publish.pkid))?
342                .is_some()
343            {
344                info!("Collision on packet id = {:?}", publish.pkid);
345                self.collision = Some(publish);
346                let event = Event::Outgoing(Outgoing::AwaitAck(pkid));
347                self.events.push_back(event);
348                return Ok(());
349            }
350
351            // if there is an existing publish at this pkid, this implies that broker hasn't acked this
352            // packet yet. This error is possible only when broker isn't acking sequentially
353            self.outgoing_pub[pkid as usize] = Some(publish.clone());
354            self.inflight += 1;
355        };
356
357        debug!(
358            "Publish. Topic = {}, Pkid = {:?}, Payload Size = {:?}",
359            publish.topic,
360            publish.pkid,
361            publish.payload.len()
362        );
363
364        publish.write(&mut self.write)?;
365        let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
366        self.events.push_back(event);
367        Ok(())
368    }
369
370    fn outgoing_pubrel(&mut self, pubrel: PubRel) -> Result<(), StateError> {
371        let pubrel = self.save_pubrel(pubrel)?;
372
373        debug!("Pubrel. Pkid = {}", pubrel.pkid);
374        PubRel::new(pubrel.pkid).write(&mut self.write)?;
375
376        let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid));
377        self.events.push_back(event);
378        Ok(())
379    }
380
381    fn outgoing_puback(&mut self, puback: PubAck) -> Result<(), StateError> {
382        puback.write(&mut self.write)?;
383        let event = Event::Outgoing(Outgoing::PubAck(puback.pkid));
384        self.events.push_back(event);
385        Ok(())
386    }
387
388    fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result<(), StateError> {
389        pubrec.write(&mut self.write)?;
390        let event = Event::Outgoing(Outgoing::PubRec(pubrec.pkid));
391        self.events.push_back(event);
392        Ok(())
393    }
394
395    /// check when the last control packet/pingreq packet is received and return
396    /// the status which tells if keep alive time has exceeded
397    /// NOTE: status will be checked for zero keepalive times also
398    fn outgoing_ping(&mut self) -> Result<(), StateError> {
399        let elapsed_in = self.last_incoming.elapsed();
400        let elapsed_out = self.last_outgoing.elapsed();
401
402        if self.collision.is_some() {
403            self.collision_ping_count += 1;
404            if self.collision_ping_count >= 2 {
405                return Err(StateError::CollisionTimeout);
406            }
407        }
408
409        // raise error if last ping didn't receive ack
410        if self.await_pingresp {
411            return Err(StateError::AwaitPingResp);
412        }
413
414        self.await_pingresp = true;
415
416        debug!(
417            "Pingreq,
418            last incoming packet before {} millisecs,
419            last outgoing request before {} millisecs",
420            elapsed_in.as_millis(),
421            elapsed_out.as_millis()
422        );
423
424        PingReq.write(&mut self.write)?;
425        let event = Event::Outgoing(Outgoing::PingReq);
426        self.events.push_back(event);
427        Ok(())
428    }
429
430    fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
431        if subscription.filters.is_empty() {
432            return Err(StateError::EmptySubscription);
433        }
434
435        let pkid = self.next_pkid();
436        subscription.pkid = pkid;
437
438        debug!(
439            "Subscribe. Topics = {:?}, Pkid = {:?}",
440            subscription.filters, subscription.pkid
441        );
442
443        subscription.write(&mut self.write)?;
444        let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid));
445        self.events.push_back(event);
446        Ok(())
447    }
448
449    fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> {
450        let pkid = self.next_pkid();
451        unsub.pkid = pkid;
452
453        debug!(
454            "Unsubscribe. Topics = {:?}, Pkid = {:?}",
455            unsub.topics, unsub.pkid
456        );
457
458        unsub.write(&mut self.write)?;
459        let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid));
460        self.events.push_back(event);
461        Ok(())
462    }
463
464    fn outgoing_disconnect(&mut self) -> Result<(), StateError> {
465        debug!("Disconnect");
466
467        Disconnect.write(&mut self.write)?;
468        let event = Event::Outgoing(Outgoing::Disconnect);
469        self.events.push_back(event);
470        Ok(())
471    }
472
473    fn check_collision(&mut self, pkid: u16) -> Option<Publish> {
474        if let Some(publish) = &self.collision {
475            if publish.pkid == pkid {
476                return self.collision.take();
477            }
478        }
479
480        None
481    }
482
483    fn check_size(&self, pkt_size: usize) -> Result<(), StateError> {
484        if pkt_size > self.max_outgoing_packet_size {
485            Err(StateError::OutgoingPacketTooLarge {
486                pkt_size,
487                max: self.max_outgoing_packet_size,
488            })
489        } else {
490            Ok(())
491        }
492    }
493
494    fn save_pubrel(&mut self, mut pubrel: PubRel) -> Result<PubRel, StateError> {
495        let pubrel = match pubrel.pkid {
496            // consider PacketIdentifier(0) as uninitialized packets
497            0 => {
498                pubrel.pkid = self.next_pkid();
499                pubrel
500            }
501            _ => pubrel,
502        };
503
504        self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
505        self.inflight += 1;
506        Ok(pubrel)
507    }
508
509    /// http://stackoverflow.com/questions/11115364/mqtt-messageid-practical-implementation
510    /// Packet ids are incremented till maximum set inflight messages and reset to 1 after that.
511    ///
512    fn next_pkid(&mut self) -> u16 {
513        let next_pkid = self.last_pkid + 1;
514
515        // When next packet id is at the edge of inflight queue,
516        // set await flag. This instructs eventloop to stop
517        // processing requests until all the inflight publishes
518        // are acked
519        if next_pkid == self.max_inflight {
520            self.last_pkid = 0;
521            return next_pkid;
522        }
523
524        self.last_pkid = next_pkid;
525        next_pkid
526    }
527}
528
529#[cfg(test)]
530mod test {
531    use super::{MqttState, StateError};
532    use crate::mqttbytes::v4::*;
533    use crate::mqttbytes::*;
534    use crate::{Event, Incoming, Outgoing, Request};
535    use bytes::BufMut;
536
537    fn build_outgoing_publish(qos: QoS) -> Publish {
538        let topic = "hello/world".to_owned();
539        let payload = vec![1, 2, 3];
540
541        let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
542        publish.qos = qos;
543        publish
544    }
545
546    fn build_incoming_publish(qos: QoS, pkid: u16) -> Publish {
547        let topic = "hello/world".to_owned();
548        let payload = vec![1, 2, 3];
549
550        let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
551        publish.pkid = pkid;
552        publish.qos = qos;
553        publish
554    }
555
556    fn build_mqttstate() -> MqttState {
557        MqttState::new(100, false, usize::MAX)
558    }
559
560    #[test]
561    fn next_pkid_increments_as_expected() {
562        let mut mqtt = build_mqttstate();
563
564        for i in 1..=100 {
565            let pkid = mqtt.next_pkid();
566
567            // loops between 0-99. % 100 == 0 implies border
568            let expected = i % 100;
569            if expected == 0 {
570                break;
571            }
572
573            assert_eq!(expected, pkid);
574        }
575    }
576
577    #[test]
578    fn outgoing_max_packet_size_check() {
579        let mut mqtt = MqttState::new(100, false, 200);
580
581        let small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100]);
582        assert_eq!(
583            mqtt.handle_outgoing_packet(Request::Publish(small_publish))
584                .is_ok(),
585            true
586        );
587
588        let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265]);
589        assert_eq!(
590            mqtt.handle_outgoing_packet(Request::Publish(large_publish))
591                .is_ok(),
592            false
593        );
594    }
595
596    #[test]
597    fn outgoing_publish_should_set_pkid_and_add_publish_to_queue() {
598        let mut mqtt = build_mqttstate();
599
600        // QoS0 Publish
601        let publish = build_outgoing_publish(QoS::AtMostOnce);
602
603        // QoS 0 publish shouldn't be saved in queue
604        mqtt.outgoing_publish(publish).unwrap();
605        assert_eq!(mqtt.last_pkid, 0);
606        assert_eq!(mqtt.inflight, 0);
607
608        // QoS1 Publish
609        let publish = build_outgoing_publish(QoS::AtLeastOnce);
610
611        // Packet id should be set and publish should be saved in queue
612        mqtt.outgoing_publish(publish.clone()).unwrap();
613        assert_eq!(mqtt.last_pkid, 1);
614        assert_eq!(mqtt.inflight, 1);
615
616        // Packet id should be incremented and publish should be saved in queue
617        mqtt.outgoing_publish(publish).unwrap();
618        assert_eq!(mqtt.last_pkid, 2);
619        assert_eq!(mqtt.inflight, 2);
620
621        // QoS1 Publish
622        let publish = build_outgoing_publish(QoS::ExactlyOnce);
623
624        // Packet id should be set and publish should be saved in queue
625        mqtt.outgoing_publish(publish.clone()).unwrap();
626        assert_eq!(mqtt.last_pkid, 3);
627        assert_eq!(mqtt.inflight, 3);
628
629        // Packet id should be incremented and publish should be saved in queue
630        mqtt.outgoing_publish(publish).unwrap();
631        assert_eq!(mqtt.last_pkid, 4);
632        assert_eq!(mqtt.inflight, 4);
633    }
634
635    #[test]
636    fn incoming_publish_should_be_added_to_queue_correctly() {
637        let mut mqtt = build_mqttstate();
638
639        // QoS0, 1, 2 Publishes
640        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
641        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
642        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
643
644        mqtt.handle_incoming_publish(&publish1).unwrap();
645        mqtt.handle_incoming_publish(&publish2).unwrap();
646        mqtt.handle_incoming_publish(&publish3).unwrap();
647
648        let pkid = mqtt.incoming_pub[3].unwrap();
649
650        // only qos2 publish should be add to queue
651        assert_eq!(pkid, 3);
652    }
653
654    #[test]
655    fn incoming_publish_should_be_acked() {
656        let mut mqtt = build_mqttstate();
657
658        // QoS0, 1, 2 Publishes
659        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
660        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
661        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
662
663        mqtt.handle_incoming_publish(&publish1).unwrap();
664        mqtt.handle_incoming_publish(&publish2).unwrap();
665        mqtt.handle_incoming_publish(&publish3).unwrap();
666
667        if let Event::Outgoing(Outgoing::PubAck(pkid)) = mqtt.events[0] {
668            assert_eq!(pkid, 2);
669        } else {
670            panic!("missing puback");
671        }
672
673        if let Event::Outgoing(Outgoing::PubRec(pkid)) = mqtt.events[1] {
674            assert_eq!(pkid, 3);
675        } else {
676            panic!("missing PubRec");
677        }
678    }
679
680    #[test]
681    fn incoming_publish_should_not_be_acked_with_manual_acks() {
682        let mut mqtt = build_mqttstate();
683        mqtt.manual_acks = true;
684
685        // QoS0, 1, 2 Publishes
686        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
687        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
688        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
689
690        mqtt.handle_incoming_publish(&publish1).unwrap();
691        mqtt.handle_incoming_publish(&publish2).unwrap();
692        mqtt.handle_incoming_publish(&publish3).unwrap();
693
694        let pkid = mqtt.incoming_pub[3].unwrap();
695        assert_eq!(pkid, 3);
696
697        assert!(mqtt.events.is_empty());
698    }
699
700    #[test]
701    fn incoming_qos2_publish_should_send_rec_to_network_and_publish_to_user() {
702        let mut mqtt = build_mqttstate();
703        let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
704
705        mqtt.handle_incoming_publish(&publish).unwrap();
706        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
707        match packet {
708            Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
709            _ => panic!("Invalid network request: {:?}", packet),
710        }
711    }
712
713    #[test]
714    fn incoming_puback_should_remove_correct_publish_from_queue() {
715        let mut mqtt = build_mqttstate();
716
717        let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
718        let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
719
720        mqtt.outgoing_publish(publish1).unwrap();
721        mqtt.outgoing_publish(publish2).unwrap();
722        assert_eq!(mqtt.inflight, 2);
723
724        mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap();
725        assert_eq!(mqtt.inflight, 1);
726
727        mqtt.handle_incoming_puback(&PubAck::new(2)).unwrap();
728        assert_eq!(mqtt.inflight, 0);
729
730        assert!(mqtt.outgoing_pub[1].is_none());
731        assert!(mqtt.outgoing_pub[2].is_none());
732    }
733
734    #[test]
735    fn incoming_puback_with_pkid_greater_than_max_inflight_should_be_handled_gracefully() {
736        let mut mqtt = build_mqttstate();
737
738        let got = mqtt.handle_incoming_puback(&PubAck::new(101)).unwrap_err();
739
740        match got {
741            StateError::Unsolicited(pkid) => assert_eq!(pkid, 101),
742            e => panic!("Unexpected error: {}", e),
743        }
744    }
745
746    #[test]
747    fn incoming_pubrec_should_release_publish_from_queue_and_add_relid_to_rel_queue() {
748        let mut mqtt = build_mqttstate();
749
750        let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
751        let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
752
753        let _publish_out = mqtt.outgoing_publish(publish1);
754        let _publish_out = mqtt.outgoing_publish(publish2);
755
756        mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap();
757        assert_eq!(mqtt.inflight, 2);
758
759        // check if the remaining element's pkid is 1
760        let backup = mqtt.outgoing_pub[1].clone();
761        assert_eq!(backup.unwrap().pkid, 1);
762
763        // check if the qos2 element's release pkid is 2
764        assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
765    }
766
767    #[test]
768    fn incoming_pubrec_should_send_release_to_network_and_nothing_to_user() {
769        let mut mqtt = build_mqttstate();
770
771        let publish = build_outgoing_publish(QoS::ExactlyOnce);
772        mqtt.outgoing_publish(publish).unwrap();
773        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
774        match packet {
775            Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
776            packet => panic!("Invalid network request: {:?}", packet),
777        }
778
779        mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
780        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
781        match packet {
782            Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
783            packet => panic!("Invalid network request: {:?}", packet),
784        }
785    }
786
787    #[test]
788    fn incoming_pubrel_should_send_comp_to_network_and_nothing_to_user() {
789        let mut mqtt = build_mqttstate();
790        let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
791
792        mqtt.handle_incoming_publish(&publish).unwrap();
793        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
794        match packet {
795            Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
796            packet => panic!("Invalid network request: {:?}", packet),
797        }
798
799        mqtt.handle_incoming_pubrel(&PubRel::new(1)).unwrap();
800        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
801        match packet {
802            Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
803            packet => panic!("Invalid network request: {:?}", packet),
804        }
805    }
806
807    #[test]
808    fn incoming_pubcomp_should_release_correct_pkid_from_release_queue() {
809        let mut mqtt = build_mqttstate();
810        let publish = build_outgoing_publish(QoS::ExactlyOnce);
811
812        mqtt.outgoing_publish(publish).unwrap();
813        mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
814
815        mqtt.handle_incoming_pubcomp(&PubComp::new(1)).unwrap();
816        assert_eq!(mqtt.inflight, 0);
817    }
818
819    #[test]
820    fn outgoing_ping_handle_should_throw_errors_for_no_pingresp() {
821        let mut mqtt = build_mqttstate();
822        mqtt.outgoing_ping().unwrap();
823
824        // network activity other than pingresp
825        let publish = build_outgoing_publish(QoS::AtLeastOnce);
826        mqtt.handle_outgoing_packet(Request::Publish(publish))
827            .unwrap();
828        mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1)))
829            .unwrap();
830
831        // should throw error because we didn't get pingresp for previous ping
832        match mqtt.outgoing_ping() {
833            Ok(_) => panic!("Should throw pingresp await error"),
834            Err(StateError::AwaitPingResp) => (),
835            Err(e) => panic!("Should throw pingresp await error. Error = {:?}", e),
836        }
837    }
838
839    #[test]
840    fn outgoing_ping_handle_should_succeed_if_pingresp_is_received() {
841        let mut mqtt = build_mqttstate();
842
843        // should ping
844        mqtt.outgoing_ping().unwrap();
845        mqtt.handle_incoming_packet(Incoming::PingResp).unwrap();
846
847        // should ping
848        mqtt.outgoing_ping().unwrap();
849    }
850
851    #[test]
852    fn state_should_be_clean_properly() {
853        let mut mqtt = build_mqttstate();
854        mqtt.write.put(&b"test"[..]);
855        // After this clean state.write should be empty
856        mqtt.clean();
857        assert!(mqtt.write.is_empty());
858    }
859
860    #[test]
861    fn clean_is_calculating_pending_correctly() {
862        let mut mqtt = build_mqttstate();
863
864        fn build_outgoing_pub() -> Vec<Option<Publish>> {
865            vec![
866                None,
867                Some(Publish {
868                    dup: false,
869                    qos: QoS::AtMostOnce,
870                    retain: false,
871                    topic: "test".to_string(),
872                    pkid: 1,
873                    payload: "".into(),
874                }),
875                Some(Publish {
876                    dup: false,
877                    qos: QoS::AtMostOnce,
878                    retain: false,
879                    topic: "test".to_string(),
880                    pkid: 2,
881                    payload: "".into(),
882                }),
883                Some(Publish {
884                    dup: false,
885                    qos: QoS::AtMostOnce,
886                    retain: false,
887                    topic: "test".to_string(),
888                    pkid: 3,
889                    payload: "".into(),
890                }),
891                None,
892                None,
893                Some(Publish {
894                    dup: false,
895                    qos: QoS::AtMostOnce,
896                    retain: false,
897                    topic: "test".to_string(),
898                    pkid: 6,
899                    payload: "".into(),
900                }),
901            ]
902        }
903
904        mqtt.outgoing_pub = build_outgoing_pub();
905        mqtt.last_puback = 3;
906        let requests = mqtt.clean();
907        let res = vec![6, 1, 2, 3];
908        for (req, idx) in requests.iter().zip(res) {
909            if let Request::Publish(publish) = req {
910                assert_eq!(publish.pkid, idx);
911            } else {
912                unreachable!()
913            }
914        }
915
916        mqtt.outgoing_pub = build_outgoing_pub();
917        mqtt.last_puback = 0;
918        let requests = mqtt.clean();
919        let res = vec![1, 2, 3, 6];
920        for (req, idx) in requests.iter().zip(res) {
921            if let Request::Publish(publish) = req {
922                assert_eq!(publish.pkid, idx);
923            } else {
924                unreachable!()
925            }
926        }
927
928        mqtt.outgoing_pub = build_outgoing_pub();
929        mqtt.last_puback = 6;
930        let requests = mqtt.clean();
931        let res = vec![1, 2, 3, 6];
932        for (req, idx) in requests.iter().zip(res) {
933            if let Request::Publish(publish) = req {
934                assert_eq!(publish.pkid, idx);
935            } else {
936                unreachable!()
937            }
938        }
939    }
940}