rumqttc/v5/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::time::Duration;
4
5use super::mqttbytes::v5::{
6    Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
7    Unsubscribe, UnsubscribeProperties,
8};
9use super::mqttbytes::QoS;
10use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
11use crate::valid_topic;
12
13use bytes::Bytes;
14use flume::{SendError, Sender, TrySendError};
15use futures_util::FutureExt;
16use tokio::runtime::{self, Runtime};
17use tokio::time::timeout;
18
19/// Client Error
20#[derive(Debug, thiserror::Error)]
21pub enum ClientError {
22    #[error("Failed to send mqtt requests to eventloop")]
23    Request(Request),
24    #[error("Failed to send mqtt requests to eventloop")]
25    TryRequest(Request),
26}
27
28impl From<SendError<Request>> for ClientError {
29    fn from(e: SendError<Request>) -> Self {
30        Self::Request(e.into_inner())
31    }
32}
33
34impl From<TrySendError<Request>> for ClientError {
35    fn from(e: TrySendError<Request>) -> Self {
36        Self::TryRequest(e.into_inner())
37    }
38}
39
40/// An asynchronous client, communicates with MQTT `EventLoop`.
41///
42/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
43/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
44///
45/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
46/// from the broker, i.e. move ahead.
47#[derive(Clone, Debug)]
48pub struct AsyncClient {
49    request_tx: Sender<Request>,
50}
51
52impl AsyncClient {
53    /// Create a new `AsyncClient`.
54    ///
55    /// `cap` specifies the capacity of the bounded async channel.
56    pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
57        let eventloop = EventLoop::new(options, cap);
58        let request_tx = eventloop.requests_tx.clone();
59
60        let client = AsyncClient { request_tx };
61
62        (client, eventloop)
63    }
64
65    /// Create a new `AsyncClient` from a channel `Sender`.
66    ///
67    /// This is mostly useful for creating a test instance where you can
68    /// listen on the corresponding receiver.
69    pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
70        AsyncClient { request_tx }
71    }
72
73    /// Sends a MQTT Publish to the `EventLoop`.
74    async fn handle_publish<S, P>(
75        &self,
76        topic: S,
77        qos: QoS,
78        retain: bool,
79        payload: P,
80        properties: Option<PublishProperties>,
81    ) -> Result<(), ClientError>
82    where
83        S: Into<String>,
84        P: Into<Bytes>,
85    {
86        let topic = topic.into();
87        let mut publish = Publish::new(&topic, qos, payload, properties);
88        publish.retain = retain;
89        let publish = Request::Publish(publish);
90        if !valid_topic(&topic) {
91            return Err(ClientError::Request(publish));
92        }
93        self.request_tx.send_async(publish).await?;
94        Ok(())
95    }
96
97    pub async fn publish_with_properties<S, P>(
98        &self,
99        topic: S,
100        qos: QoS,
101        retain: bool,
102        payload: P,
103        properties: PublishProperties,
104    ) -> Result<(), ClientError>
105    where
106        S: Into<String>,
107        P: Into<Bytes>,
108    {
109        self.handle_publish(topic, qos, retain, payload, Some(properties))
110            .await
111    }
112
113    pub async fn publish<S, P>(
114        &self,
115        topic: S,
116        qos: QoS,
117        retain: bool,
118        payload: P,
119    ) -> Result<(), ClientError>
120    where
121        S: Into<String>,
122        P: Into<Bytes>,
123    {
124        self.handle_publish(topic, qos, retain, payload, None).await
125    }
126
127    /// Attempts to send a MQTT Publish to the `EventLoop`.
128    fn handle_try_publish<S, P>(
129        &self,
130        topic: S,
131        qos: QoS,
132        retain: bool,
133        payload: P,
134        properties: Option<PublishProperties>,
135    ) -> Result<(), ClientError>
136    where
137        S: Into<String>,
138        P: Into<Bytes>,
139    {
140        let topic = topic.into();
141        let mut publish = Publish::new(&topic, qos, payload, properties);
142        publish.retain = retain;
143        let publish = Request::Publish(publish);
144        if !valid_topic(&topic) {
145            return Err(ClientError::TryRequest(publish));
146        }
147        self.request_tx.try_send(publish)?;
148        Ok(())
149    }
150
151    pub fn try_publish_with_properties<S, P>(
152        &self,
153        topic: S,
154        qos: QoS,
155        retain: bool,
156        payload: P,
157        properties: PublishProperties,
158    ) -> Result<(), ClientError>
159    where
160        S: Into<String>,
161        P: Into<Bytes>,
162    {
163        self.handle_try_publish(topic, qos, retain, payload, Some(properties))
164    }
165
166    pub fn try_publish<S, P>(
167        &self,
168        topic: S,
169        qos: QoS,
170        retain: bool,
171        payload: P,
172    ) -> Result<(), ClientError>
173    where
174        S: Into<String>,
175        P: Into<Bytes>,
176    {
177        self.handle_try_publish(topic, qos, retain, payload, None)
178    }
179
180    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
181    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
182        let ack = get_ack_req(publish);
183
184        if let Some(ack) = ack {
185            self.request_tx.send_async(ack).await?;
186        }
187        Ok(())
188    }
189
190    /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
191    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
192        let ack = get_ack_req(publish);
193        if let Some(ack) = ack {
194            self.request_tx.try_send(ack)?;
195        }
196        Ok(())
197    }
198
199    /// Sends a MQTT Publish to the `EventLoop`
200    async fn handle_publish_bytes<S>(
201        &self,
202        topic: S,
203        qos: QoS,
204        retain: bool,
205        payload: Bytes,
206        properties: Option<PublishProperties>,
207    ) -> Result<(), ClientError>
208    where
209        S: Into<String>,
210    {
211        let topic = topic.into();
212        let mut publish = Publish::new(&topic, qos, payload, properties);
213        publish.retain = retain;
214        let publish = Request::Publish(publish);
215        if !valid_topic(&topic) {
216            return Err(ClientError::TryRequest(publish));
217        }
218        self.request_tx.send_async(publish).await?;
219        Ok(())
220    }
221
222    pub async fn publish_bytes_with_properties<S>(
223        &self,
224        topic: S,
225        qos: QoS,
226        retain: bool,
227        payload: Bytes,
228        properties: PublishProperties,
229    ) -> Result<(), ClientError>
230    where
231        S: Into<String>,
232    {
233        self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
234            .await
235    }
236
237    pub async fn publish_bytes<S>(
238        &self,
239        topic: S,
240        qos: QoS,
241        retain: bool,
242        payload: Bytes,
243    ) -> Result<(), ClientError>
244    where
245        S: Into<String>,
246    {
247        self.handle_publish_bytes(topic, qos, retain, payload, None)
248            .await
249    }
250
251    /// Sends a MQTT Subscribe to the `EventLoop`
252    async fn handle_subscribe<S: Into<String>>(
253        &self,
254        topic: S,
255        qos: QoS,
256        properties: Option<SubscribeProperties>,
257    ) -> Result<(), ClientError> {
258        let filter = Filter::new(topic, qos);
259        let subscribe = Subscribe::new(filter, properties);
260        let request: Request = Request::Subscribe(subscribe);
261        self.request_tx.send_async(request).await?;
262        Ok(())
263    }
264
265    pub async fn subscribe_with_properties<S: Into<String>>(
266        &self,
267        topic: S,
268        qos: QoS,
269        properties: SubscribeProperties,
270    ) -> Result<(), ClientError> {
271        self.handle_subscribe(topic, qos, Some(properties)).await
272    }
273
274    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
275        self.handle_subscribe(topic, qos, None).await
276    }
277
278    /// Attempts to send a MQTT Subscribe to the `EventLoop`
279    fn handle_try_subscribe<S: Into<String>>(
280        &self,
281        topic: S,
282        qos: QoS,
283        properties: Option<SubscribeProperties>,
284    ) -> Result<(), ClientError> {
285        let filter = Filter::new(topic, qos);
286        let subscribe = Subscribe::new(filter, properties);
287        let request = Request::Subscribe(subscribe);
288        self.request_tx.try_send(request)?;
289        Ok(())
290    }
291
292    pub fn try_subscribe_with_properties<S: Into<String>>(
293        &self,
294        topic: S,
295        qos: QoS,
296        properties: SubscribeProperties,
297    ) -> Result<(), ClientError> {
298        self.handle_try_subscribe(topic, qos, Some(properties))
299    }
300
301    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
302        self.handle_try_subscribe(topic, qos, None)
303    }
304
305    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
306    async fn handle_subscribe_many<T>(
307        &self,
308        topics: T,
309        properties: Option<SubscribeProperties>,
310    ) -> Result<(), ClientError>
311    where
312        T: IntoIterator<Item = Filter>,
313    {
314        let subscribe = Subscribe::new_many(topics, properties);
315        let request = Request::Subscribe(subscribe);
316        self.request_tx.send_async(request).await?;
317        Ok(())
318    }
319
320    pub async fn subscribe_many_with_properties<T>(
321        &self,
322        topics: T,
323        properties: SubscribeProperties,
324    ) -> Result<(), ClientError>
325    where
326        T: IntoIterator<Item = Filter>,
327    {
328        self.handle_subscribe_many(topics, Some(properties)).await
329    }
330
331    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
332    where
333        T: IntoIterator<Item = Filter>,
334    {
335        self.handle_subscribe_many(topics, None).await
336    }
337
338    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
339    fn handle_try_subscribe_many<T>(
340        &self,
341        topics: T,
342        properties: Option<SubscribeProperties>,
343    ) -> Result<(), ClientError>
344    where
345        T: IntoIterator<Item = Filter>,
346    {
347        let subscribe = Subscribe::new_many(topics, properties);
348        let request = Request::Subscribe(subscribe);
349        self.request_tx.try_send(request)?;
350        Ok(())
351    }
352
353    pub fn try_subscribe_many_with_properties<T>(
354        &self,
355        topics: T,
356        properties: SubscribeProperties,
357    ) -> Result<(), ClientError>
358    where
359        T: IntoIterator<Item = Filter>,
360    {
361        self.handle_try_subscribe_many(topics, Some(properties))
362    }
363
364    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
365    where
366        T: IntoIterator<Item = Filter>,
367    {
368        self.handle_try_subscribe_many(topics, None)
369    }
370
371    /// Sends a MQTT Unsubscribe to the `EventLoop`
372    async fn handle_unsubscribe<S: Into<String>>(
373        &self,
374        topic: S,
375        properties: Option<UnsubscribeProperties>,
376    ) -> Result<(), ClientError> {
377        let unsubscribe = Unsubscribe::new(topic, properties);
378        let request = Request::Unsubscribe(unsubscribe);
379        self.request_tx.send_async(request).await?;
380        Ok(())
381    }
382
383    pub async fn unsubscribe_with_properties<S: Into<String>>(
384        &self,
385        topic: S,
386        properties: UnsubscribeProperties,
387    ) -> Result<(), ClientError> {
388        self.handle_unsubscribe(topic, Some(properties)).await
389    }
390
391    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
392        self.handle_unsubscribe(topic, None).await
393    }
394
395    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
396    fn handle_try_unsubscribe<S: Into<String>>(
397        &self,
398        topic: S,
399        properties: Option<UnsubscribeProperties>,
400    ) -> Result<(), ClientError> {
401        let unsubscribe = Unsubscribe::new(topic, properties);
402        let request = Request::Unsubscribe(unsubscribe);
403        self.request_tx.try_send(request)?;
404        Ok(())
405    }
406
407    pub fn try_unsubscribe_with_properties<S: Into<String>>(
408        &self,
409        topic: S,
410        properties: UnsubscribeProperties,
411    ) -> Result<(), ClientError> {
412        self.handle_try_unsubscribe(topic, Some(properties))
413    }
414
415    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
416        self.handle_try_unsubscribe(topic, None)
417    }
418
419    /// Sends a MQTT disconnect to the `EventLoop`
420    pub async fn disconnect(&self) -> Result<(), ClientError> {
421        let request = Request::Disconnect;
422        self.request_tx.send_async(request).await?;
423        Ok(())
424    }
425
426    /// Attempts to send a MQTT disconnect to the `EventLoop`
427    pub fn try_disconnect(&self) -> Result<(), ClientError> {
428        let request = Request::Disconnect;
429        self.request_tx.try_send(request)?;
430        Ok(())
431    }
432}
433
434fn get_ack_req(publish: &Publish) -> Option<Request> {
435    let ack = match publish.qos {
436        QoS::AtMostOnce => return None,
437        QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None)),
438        QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None)),
439    };
440    Some(ack)
441}
442
443/// A synchronous client, communicates with MQTT `EventLoop`.
444///
445/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
446/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
447/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
448///
449/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
450/// to send, receive and process packets from the broker, i.e. move ahead.
451///
452/// An asynchronous channel handle can also be extracted if necessary.
453#[derive(Clone)]
454pub struct Client {
455    client: AsyncClient,
456}
457
458impl Client {
459    /// Create a new `Client`
460    ///
461    /// `cap` specifies the capacity of the bounded async channel.
462    pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
463        let (client, eventloop) = AsyncClient::new(options, cap);
464        let client = Client { client };
465
466        let runtime = runtime::Builder::new_current_thread()
467            .enable_all()
468            .build()
469            .unwrap();
470
471        let connection = Connection::new(eventloop, runtime);
472        (client, connection)
473    }
474
475    /// Create a new `Client` from a channel `Sender`.
476    ///
477    /// This is mostly useful for creating a test instance where you can
478    /// listen on the corresponding receiver.
479    pub fn from_sender(request_tx: Sender<Request>) -> Client {
480        Client {
481            client: AsyncClient::from_senders(request_tx),
482        }
483    }
484
485    /// Sends a MQTT Publish to the `EventLoop`
486    fn handle_publish<S, P>(
487        &self,
488        topic: S,
489        qos: QoS,
490        retain: bool,
491        payload: P,
492        properties: Option<PublishProperties>,
493    ) -> Result<(), ClientError>
494    where
495        S: Into<String>,
496        P: Into<Bytes>,
497    {
498        let topic = topic.into();
499        let mut publish = Publish::new(&topic, qos, payload, properties);
500        publish.retain = retain;
501        let publish = Request::Publish(publish);
502        if !valid_topic(&topic) {
503            return Err(ClientError::Request(publish));
504        }
505        self.client.request_tx.send(publish)?;
506        Ok(())
507    }
508
509    pub fn publish_with_properties<S, P>(
510        &self,
511        topic: S,
512        qos: QoS,
513        retain: bool,
514        payload: P,
515        properties: PublishProperties,
516    ) -> Result<(), ClientError>
517    where
518        S: Into<String>,
519        P: Into<Bytes>,
520    {
521        self.handle_publish(topic, qos, retain, payload, Some(properties))
522    }
523
524    pub fn publish<S, P>(
525        &self,
526        topic: S,
527        qos: QoS,
528        retain: bool,
529        payload: P,
530    ) -> Result<(), ClientError>
531    where
532        S: Into<String>,
533        P: Into<Bytes>,
534    {
535        self.handle_publish(topic, qos, retain, payload, None)
536    }
537
538    pub fn try_publish_with_properties<S, P>(
539        &self,
540        topic: S,
541        qos: QoS,
542        retain: bool,
543        payload: P,
544        properties: PublishProperties,
545    ) -> Result<(), ClientError>
546    where
547        S: Into<String>,
548        P: Into<Bytes>,
549    {
550        self.client
551            .try_publish_with_properties(topic, qos, retain, payload, properties)
552    }
553
554    pub fn try_publish<S, P>(
555        &self,
556        topic: S,
557        qos: QoS,
558        retain: bool,
559        payload: P,
560    ) -> Result<(), ClientError>
561    where
562        S: Into<String>,
563        P: Into<Bytes>,
564    {
565        self.client.try_publish(topic, qos, retain, payload)
566    }
567
568    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
569    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
570        let ack = get_ack_req(publish);
571
572        if let Some(ack) = ack {
573            self.client.request_tx.send(ack)?;
574        }
575        Ok(())
576    }
577
578    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
579    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
580        self.client.try_ack(publish)?;
581        Ok(())
582    }
583
584    /// Sends a MQTT Subscribe to the `EventLoop`
585    fn handle_subscribe<S: Into<String>>(
586        &self,
587        topic: S,
588        qos: QoS,
589        properties: Option<SubscribeProperties>,
590    ) -> Result<(), ClientError> {
591        let filter = Filter::new(topic, qos);
592        let subscribe = Subscribe::new(filter, properties);
593        let request = Request::Subscribe(subscribe);
594        self.client.request_tx.send(request)?;
595        Ok(())
596    }
597
598    pub fn subscribe_with_properties<S: Into<String>>(
599        &self,
600        topic: S,
601        qos: QoS,
602        properties: SubscribeProperties,
603    ) -> Result<(), ClientError> {
604        self.handle_subscribe(topic, qos, Some(properties))
605    }
606
607    pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
608        self.handle_subscribe(topic, qos, None)
609    }
610
611    /// Sends a MQTT Subscribe to the `EventLoop`
612    pub fn try_subscribe_with_properties<S: Into<String>>(
613        &self,
614        topic: S,
615        qos: QoS,
616        properties: SubscribeProperties,
617    ) -> Result<(), ClientError> {
618        self.client
619            .try_subscribe_with_properties(topic, qos, properties)
620    }
621
622    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
623        self.client.try_subscribe(topic, qos)
624    }
625
626    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
627    fn handle_subscribe_many<T>(
628        &self,
629        topics: T,
630        properties: Option<SubscribeProperties>,
631    ) -> Result<(), ClientError>
632    where
633        T: IntoIterator<Item = Filter>,
634    {
635        let subscribe = Subscribe::new_many(topics, properties);
636        let request = Request::Subscribe(subscribe);
637        self.client.request_tx.send(request)?;
638        Ok(())
639    }
640
641    pub fn subscribe_many_with_properties<T>(
642        &self,
643        topics: T,
644        properties: SubscribeProperties,
645    ) -> Result<(), ClientError>
646    where
647        T: IntoIterator<Item = Filter>,
648    {
649        self.handle_subscribe_many(topics, Some(properties))
650    }
651
652    pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
653    where
654        T: IntoIterator<Item = Filter>,
655    {
656        self.handle_subscribe_many(topics, None)
657    }
658
659    pub fn try_subscribe_many_with_properties<T>(
660        &self,
661        topics: T,
662        properties: SubscribeProperties,
663    ) -> Result<(), ClientError>
664    where
665        T: IntoIterator<Item = Filter>,
666    {
667        self.client
668            .try_subscribe_many_with_properties(topics, properties)
669    }
670
671    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
672    where
673        T: IntoIterator<Item = Filter>,
674    {
675        self.client.try_subscribe_many(topics)
676    }
677
678    /// Sends a MQTT Unsubscribe to the `EventLoop`
679    fn handle_unsubscribe<S: Into<String>>(
680        &self,
681        topic: S,
682        properties: Option<UnsubscribeProperties>,
683    ) -> Result<(), ClientError> {
684        let unsubscribe = Unsubscribe::new(topic, properties);
685        let request = Request::Unsubscribe(unsubscribe);
686        self.client.request_tx.send(request)?;
687        Ok(())
688    }
689
690    pub fn unsubscribe_with_properties<S: Into<String>>(
691        &self,
692        topic: S,
693        properties: UnsubscribeProperties,
694    ) -> Result<(), ClientError> {
695        self.handle_unsubscribe(topic, Some(properties))
696    }
697
698    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
699        self.handle_unsubscribe(topic, None)
700    }
701
702    /// Sends a MQTT Unsubscribe to the `EventLoop`
703    pub fn try_unsubscribe_with_properties<S: Into<String>>(
704        &self,
705        topic: S,
706        properties: UnsubscribeProperties,
707    ) -> Result<(), ClientError> {
708        self.client
709            .try_unsubscribe_with_properties(topic, properties)
710    }
711
712    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
713        self.client.try_unsubscribe(topic)
714    }
715
716    /// Sends a MQTT disconnect to the `EventLoop`
717    pub fn disconnect(&self) -> Result<(), ClientError> {
718        let request = Request::Disconnect;
719        self.client.request_tx.send(request)?;
720        Ok(())
721    }
722
723    /// Sends a MQTT disconnect to the `EventLoop`
724    pub fn try_disconnect(&self) -> Result<(), ClientError> {
725        self.client.try_disconnect()?;
726        Ok(())
727    }
728}
729
730/// Error type returned by [`Connection::recv`]
731#[derive(Debug, Eq, PartialEq)]
732pub struct RecvError;
733
734/// Error type returned by [`Connection::try_recv`]
735#[derive(Debug, Eq, PartialEq)]
736pub enum TryRecvError {
737    /// User has closed requests channel
738    Disconnected,
739    /// Did not resolve
740    Empty,
741}
742
743/// Error type returned by [`Connection::recv_timeout`]
744#[derive(Debug, Eq, PartialEq)]
745pub enum RecvTimeoutError {
746    /// User has closed requests channel
747    Disconnected,
748    /// Recv request timedout
749    Timeout,
750}
751
752///  MQTT connection. Maintains all the necessary state
753pub struct Connection {
754    pub eventloop: EventLoop,
755    runtime: Runtime,
756}
757impl Connection {
758    fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
759        Connection { eventloop, runtime }
760    }
761
762    /// Returns an iterator over this connection. Iterating over this is all that's
763    /// necessary to make connection progress and maintain a robust connection.
764    /// Just continuing to loop will reconnect
765    /// **NOTE** Don't block this while iterating
766    // ideally this should be named iter_mut because it requires a mutable reference
767    // Also we can implement IntoIter for this to make it easy to iterate over it
768    #[must_use = "Connection should be iterated over a loop to make progress"]
769    pub fn iter(&mut self) -> Iter<'_> {
770        Iter { connection: self }
771    }
772
773    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
774    /// if all clients/users have closed requests channel.
775    ///
776    /// [`EvenLoop`]: super::EventLoop
777    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
778        let f = self.eventloop.poll();
779        let event = self.runtime.block_on(f);
780
781        resolve_event(event).ok_or(RecvError)
782    }
783
784    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
785    /// if none immediately present or all clients/users have closed requests channel.
786    ///
787    /// [`EvenLoop`]: super::EventLoop
788    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
789        let f = self.eventloop.poll();
790        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
791        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
792        let _guard = self.runtime.enter();
793        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
794
795        resolve_event(event).ok_or(TryRecvError::Disconnected)
796    }
797
798    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
799    /// if all clients/users have closed requests channel or the timeout has expired.
800    ///
801    /// [`EvenLoop`]: super::EventLoop
802    pub fn recv_timeout(
803        &mut self,
804        duration: Duration,
805    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
806        let f = self.eventloop.poll();
807        let event = self
808            .runtime
809            .block_on(async { timeout(duration, f).await })
810            .map_err(|_| RecvTimeoutError::Timeout)?;
811
812        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
813    }
814}
815
816fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
817    match event {
818        Ok(v) => Some(Ok(v)),
819        // closing of request channel should stop the iterator
820        Err(ConnectionError::RequestsDone) => {
821            trace!("Done with requests");
822            None
823        }
824        Err(e) => Some(Err(e)),
825    }
826}
827
828/// Iterator which polls the `EventLoop` for connection progress
829pub struct Iter<'a> {
830    connection: &'a mut Connection,
831}
832
833impl Iterator for Iter<'_> {
834    type Item = Result<Event, ConnectionError>;
835
836    fn next(&mut self) -> Option<Self::Item> {
837        self.connection.recv().ok()
838    }
839}
840
841#[cfg(test)]
842mod test {
843    use crate::v5::mqttbytes::v5::LastWill;
844
845    use super::*;
846
847    #[test]
848    fn calling_iter_twice_on_connection_shouldnt_panic() {
849        use std::time::Duration;
850
851        let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
852        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
853        mqttoptions
854            .set_keep_alive(Duration::from_secs(5))
855            .set_last_will(will);
856
857        let (_, mut connection) = Client::new(mqttoptions, 10);
858        let _ = connection.iter();
859        let _ = connection.iter();
860    }
861
862    #[test]
863    fn should_be_able_to_build_test_client_from_channel() {
864        let (tx, rx) = flume::bounded(1);
865        let client = Client::from_sender(tx);
866        client
867            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
868            .expect("Should be able to publish");
869        let _ = rx.try_recv().expect("Should have message");
870    }
871}