1use std::time::Duration;
4
5use super::mqttbytes::v5::{
6 Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
7 Unsubscribe, UnsubscribeProperties,
8};
9use super::mqttbytes::QoS;
10use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
11use crate::valid_topic;
12
13use bytes::Bytes;
14use flume::{SendError, Sender, TrySendError};
15use futures_util::FutureExt;
16use tokio::runtime::{self, Runtime};
17use tokio::time::timeout;
18
19#[derive(Debug, thiserror::Error)]
21pub enum ClientError {
22 #[error("Failed to send mqtt requests to eventloop")]
23 Request(Request),
24 #[error("Failed to send mqtt requests to eventloop")]
25 TryRequest(Request),
26}
27
28impl From<SendError<Request>> for ClientError {
29 fn from(e: SendError<Request>) -> Self {
30 Self::Request(e.into_inner())
31 }
32}
33
34impl From<TrySendError<Request>> for ClientError {
35 fn from(e: TrySendError<Request>) -> Self {
36 Self::TryRequest(e.into_inner())
37 }
38}
39
40#[derive(Clone, Debug)]
48pub struct AsyncClient {
49 request_tx: Sender<Request>,
50}
51
52impl AsyncClient {
53 pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
57 let eventloop = EventLoop::new(options, cap);
58 let request_tx = eventloop.requests_tx.clone();
59
60 let client = AsyncClient { request_tx };
61
62 (client, eventloop)
63 }
64
65 pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
70 AsyncClient { request_tx }
71 }
72
73 async fn handle_publish<S, P>(
75 &self,
76 topic: S,
77 qos: QoS,
78 retain: bool,
79 payload: P,
80 properties: Option<PublishProperties>,
81 ) -> Result<(), ClientError>
82 where
83 S: Into<String>,
84 P: Into<Bytes>,
85 {
86 let topic = topic.into();
87 let mut publish = Publish::new(&topic, qos, payload, properties);
88 publish.retain = retain;
89 let publish = Request::Publish(publish);
90 if !valid_topic(&topic) {
91 return Err(ClientError::Request(publish));
92 }
93 self.request_tx.send_async(publish).await?;
94 Ok(())
95 }
96
97 pub async fn publish_with_properties<S, P>(
98 &self,
99 topic: S,
100 qos: QoS,
101 retain: bool,
102 payload: P,
103 properties: PublishProperties,
104 ) -> Result<(), ClientError>
105 where
106 S: Into<String>,
107 P: Into<Bytes>,
108 {
109 self.handle_publish(topic, qos, retain, payload, Some(properties))
110 .await
111 }
112
113 pub async fn publish<S, P>(
114 &self,
115 topic: S,
116 qos: QoS,
117 retain: bool,
118 payload: P,
119 ) -> Result<(), ClientError>
120 where
121 S: Into<String>,
122 P: Into<Bytes>,
123 {
124 self.handle_publish(topic, qos, retain, payload, None).await
125 }
126
127 fn handle_try_publish<S, P>(
129 &self,
130 topic: S,
131 qos: QoS,
132 retain: bool,
133 payload: P,
134 properties: Option<PublishProperties>,
135 ) -> Result<(), ClientError>
136 where
137 S: Into<String>,
138 P: Into<Bytes>,
139 {
140 let topic = topic.into();
141 let mut publish = Publish::new(&topic, qos, payload, properties);
142 publish.retain = retain;
143 let publish = Request::Publish(publish);
144 if !valid_topic(&topic) {
145 return Err(ClientError::TryRequest(publish));
146 }
147 self.request_tx.try_send(publish)?;
148 Ok(())
149 }
150
151 pub fn try_publish_with_properties<S, P>(
152 &self,
153 topic: S,
154 qos: QoS,
155 retain: bool,
156 payload: P,
157 properties: PublishProperties,
158 ) -> Result<(), ClientError>
159 where
160 S: Into<String>,
161 P: Into<Bytes>,
162 {
163 self.handle_try_publish(topic, qos, retain, payload, Some(properties))
164 }
165
166 pub fn try_publish<S, P>(
167 &self,
168 topic: S,
169 qos: QoS,
170 retain: bool,
171 payload: P,
172 ) -> Result<(), ClientError>
173 where
174 S: Into<String>,
175 P: Into<Bytes>,
176 {
177 self.handle_try_publish(topic, qos, retain, payload, None)
178 }
179
180 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
182 let ack = get_ack_req(publish);
183
184 if let Some(ack) = ack {
185 self.request_tx.send_async(ack).await?;
186 }
187 Ok(())
188 }
189
190 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
192 let ack = get_ack_req(publish);
193 if let Some(ack) = ack {
194 self.request_tx.try_send(ack)?;
195 }
196 Ok(())
197 }
198
199 async fn handle_publish_bytes<S>(
201 &self,
202 topic: S,
203 qos: QoS,
204 retain: bool,
205 payload: Bytes,
206 properties: Option<PublishProperties>,
207 ) -> Result<(), ClientError>
208 where
209 S: Into<String>,
210 {
211 let topic = topic.into();
212 let mut publish = Publish::new(&topic, qos, payload, properties);
213 publish.retain = retain;
214 let publish = Request::Publish(publish);
215 if !valid_topic(&topic) {
216 return Err(ClientError::TryRequest(publish));
217 }
218 self.request_tx.send_async(publish).await?;
219 Ok(())
220 }
221
222 pub async fn publish_bytes_with_properties<S>(
223 &self,
224 topic: S,
225 qos: QoS,
226 retain: bool,
227 payload: Bytes,
228 properties: PublishProperties,
229 ) -> Result<(), ClientError>
230 where
231 S: Into<String>,
232 {
233 self.handle_publish_bytes(topic, qos, retain, payload, Some(properties))
234 .await
235 }
236
237 pub async fn publish_bytes<S>(
238 &self,
239 topic: S,
240 qos: QoS,
241 retain: bool,
242 payload: Bytes,
243 ) -> Result<(), ClientError>
244 where
245 S: Into<String>,
246 {
247 self.handle_publish_bytes(topic, qos, retain, payload, None)
248 .await
249 }
250
251 async fn handle_subscribe<S: Into<String>>(
253 &self,
254 topic: S,
255 qos: QoS,
256 properties: Option<SubscribeProperties>,
257 ) -> Result<(), ClientError> {
258 let filter = Filter::new(topic, qos);
259 let subscribe = Subscribe::new(filter, properties);
260 let request: Request = Request::Subscribe(subscribe);
261 self.request_tx.send_async(request).await?;
262 Ok(())
263 }
264
265 pub async fn subscribe_with_properties<S: Into<String>>(
266 &self,
267 topic: S,
268 qos: QoS,
269 properties: SubscribeProperties,
270 ) -> Result<(), ClientError> {
271 self.handle_subscribe(topic, qos, Some(properties)).await
272 }
273
274 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
275 self.handle_subscribe(topic, qos, None).await
276 }
277
278 fn handle_try_subscribe<S: Into<String>>(
280 &self,
281 topic: S,
282 qos: QoS,
283 properties: Option<SubscribeProperties>,
284 ) -> Result<(), ClientError> {
285 let filter = Filter::new(topic, qos);
286 let subscribe = Subscribe::new(filter, properties);
287 let request = Request::Subscribe(subscribe);
288 self.request_tx.try_send(request)?;
289 Ok(())
290 }
291
292 pub fn try_subscribe_with_properties<S: Into<String>>(
293 &self,
294 topic: S,
295 qos: QoS,
296 properties: SubscribeProperties,
297 ) -> Result<(), ClientError> {
298 self.handle_try_subscribe(topic, qos, Some(properties))
299 }
300
301 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
302 self.handle_try_subscribe(topic, qos, None)
303 }
304
305 async fn handle_subscribe_many<T>(
307 &self,
308 topics: T,
309 properties: Option<SubscribeProperties>,
310 ) -> Result<(), ClientError>
311 where
312 T: IntoIterator<Item = Filter>,
313 {
314 let subscribe = Subscribe::new_many(topics, properties);
315 let request = Request::Subscribe(subscribe);
316 self.request_tx.send_async(request).await?;
317 Ok(())
318 }
319
320 pub async fn subscribe_many_with_properties<T>(
321 &self,
322 topics: T,
323 properties: SubscribeProperties,
324 ) -> Result<(), ClientError>
325 where
326 T: IntoIterator<Item = Filter>,
327 {
328 self.handle_subscribe_many(topics, Some(properties)).await
329 }
330
331 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
332 where
333 T: IntoIterator<Item = Filter>,
334 {
335 self.handle_subscribe_many(topics, None).await
336 }
337
338 fn handle_try_subscribe_many<T>(
340 &self,
341 topics: T,
342 properties: Option<SubscribeProperties>,
343 ) -> Result<(), ClientError>
344 where
345 T: IntoIterator<Item = Filter>,
346 {
347 let subscribe = Subscribe::new_many(topics, properties);
348 let request = Request::Subscribe(subscribe);
349 self.request_tx.try_send(request)?;
350 Ok(())
351 }
352
353 pub fn try_subscribe_many_with_properties<T>(
354 &self,
355 topics: T,
356 properties: SubscribeProperties,
357 ) -> Result<(), ClientError>
358 where
359 T: IntoIterator<Item = Filter>,
360 {
361 self.handle_try_subscribe_many(topics, Some(properties))
362 }
363
364 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
365 where
366 T: IntoIterator<Item = Filter>,
367 {
368 self.handle_try_subscribe_many(topics, None)
369 }
370
371 async fn handle_unsubscribe<S: Into<String>>(
373 &self,
374 topic: S,
375 properties: Option<UnsubscribeProperties>,
376 ) -> Result<(), ClientError> {
377 let unsubscribe = Unsubscribe::new(topic, properties);
378 let request = Request::Unsubscribe(unsubscribe);
379 self.request_tx.send_async(request).await?;
380 Ok(())
381 }
382
383 pub async fn unsubscribe_with_properties<S: Into<String>>(
384 &self,
385 topic: S,
386 properties: UnsubscribeProperties,
387 ) -> Result<(), ClientError> {
388 self.handle_unsubscribe(topic, Some(properties)).await
389 }
390
391 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
392 self.handle_unsubscribe(topic, None).await
393 }
394
395 fn handle_try_unsubscribe<S: Into<String>>(
397 &self,
398 topic: S,
399 properties: Option<UnsubscribeProperties>,
400 ) -> Result<(), ClientError> {
401 let unsubscribe = Unsubscribe::new(topic, properties);
402 let request = Request::Unsubscribe(unsubscribe);
403 self.request_tx.try_send(request)?;
404 Ok(())
405 }
406
407 pub fn try_unsubscribe_with_properties<S: Into<String>>(
408 &self,
409 topic: S,
410 properties: UnsubscribeProperties,
411 ) -> Result<(), ClientError> {
412 self.handle_try_unsubscribe(topic, Some(properties))
413 }
414
415 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
416 self.handle_try_unsubscribe(topic, None)
417 }
418
419 pub async fn disconnect(&self) -> Result<(), ClientError> {
421 let request = Request::Disconnect;
422 self.request_tx.send_async(request).await?;
423 Ok(())
424 }
425
426 pub fn try_disconnect(&self) -> Result<(), ClientError> {
428 let request = Request::Disconnect;
429 self.request_tx.try_send(request)?;
430 Ok(())
431 }
432}
433
434fn get_ack_req(publish: &Publish) -> Option<Request> {
435 let ack = match publish.qos {
436 QoS::AtMostOnce => return None,
437 QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid, None)),
438 QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid, None)),
439 };
440 Some(ack)
441}
442
443#[derive(Clone)]
454pub struct Client {
455 client: AsyncClient,
456}
457
458impl Client {
459 pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
463 let (client, eventloop) = AsyncClient::new(options, cap);
464 let client = Client { client };
465
466 let runtime = runtime::Builder::new_current_thread()
467 .enable_all()
468 .build()
469 .unwrap();
470
471 let connection = Connection::new(eventloop, runtime);
472 (client, connection)
473 }
474
475 pub fn from_sender(request_tx: Sender<Request>) -> Client {
480 Client {
481 client: AsyncClient::from_senders(request_tx),
482 }
483 }
484
485 fn handle_publish<S, P>(
487 &self,
488 topic: S,
489 qos: QoS,
490 retain: bool,
491 payload: P,
492 properties: Option<PublishProperties>,
493 ) -> Result<(), ClientError>
494 where
495 S: Into<String>,
496 P: Into<Bytes>,
497 {
498 let topic = topic.into();
499 let mut publish = Publish::new(&topic, qos, payload, properties);
500 publish.retain = retain;
501 let publish = Request::Publish(publish);
502 if !valid_topic(&topic) {
503 return Err(ClientError::Request(publish));
504 }
505 self.client.request_tx.send(publish)?;
506 Ok(())
507 }
508
509 pub fn publish_with_properties<S, P>(
510 &self,
511 topic: S,
512 qos: QoS,
513 retain: bool,
514 payload: P,
515 properties: PublishProperties,
516 ) -> Result<(), ClientError>
517 where
518 S: Into<String>,
519 P: Into<Bytes>,
520 {
521 self.handle_publish(topic, qos, retain, payload, Some(properties))
522 }
523
524 pub fn publish<S, P>(
525 &self,
526 topic: S,
527 qos: QoS,
528 retain: bool,
529 payload: P,
530 ) -> Result<(), ClientError>
531 where
532 S: Into<String>,
533 P: Into<Bytes>,
534 {
535 self.handle_publish(topic, qos, retain, payload, None)
536 }
537
538 pub fn try_publish_with_properties<S, P>(
539 &self,
540 topic: S,
541 qos: QoS,
542 retain: bool,
543 payload: P,
544 properties: PublishProperties,
545 ) -> Result<(), ClientError>
546 where
547 S: Into<String>,
548 P: Into<Bytes>,
549 {
550 self.client
551 .try_publish_with_properties(topic, qos, retain, payload, properties)
552 }
553
554 pub fn try_publish<S, P>(
555 &self,
556 topic: S,
557 qos: QoS,
558 retain: bool,
559 payload: P,
560 ) -> Result<(), ClientError>
561 where
562 S: Into<String>,
563 P: Into<Bytes>,
564 {
565 self.client.try_publish(topic, qos, retain, payload)
566 }
567
568 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
570 let ack = get_ack_req(publish);
571
572 if let Some(ack) = ack {
573 self.client.request_tx.send(ack)?;
574 }
575 Ok(())
576 }
577
578 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
580 self.client.try_ack(publish)?;
581 Ok(())
582 }
583
584 fn handle_subscribe<S: Into<String>>(
586 &self,
587 topic: S,
588 qos: QoS,
589 properties: Option<SubscribeProperties>,
590 ) -> Result<(), ClientError> {
591 let filter = Filter::new(topic, qos);
592 let subscribe = Subscribe::new(filter, properties);
593 let request = Request::Subscribe(subscribe);
594 self.client.request_tx.send(request)?;
595 Ok(())
596 }
597
598 pub fn subscribe_with_properties<S: Into<String>>(
599 &self,
600 topic: S,
601 qos: QoS,
602 properties: SubscribeProperties,
603 ) -> Result<(), ClientError> {
604 self.handle_subscribe(topic, qos, Some(properties))
605 }
606
607 pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
608 self.handle_subscribe(topic, qos, None)
609 }
610
611 pub fn try_subscribe_with_properties<S: Into<String>>(
613 &self,
614 topic: S,
615 qos: QoS,
616 properties: SubscribeProperties,
617 ) -> Result<(), ClientError> {
618 self.client
619 .try_subscribe_with_properties(topic, qos, properties)
620 }
621
622 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
623 self.client.try_subscribe(topic, qos)
624 }
625
626 fn handle_subscribe_many<T>(
628 &self,
629 topics: T,
630 properties: Option<SubscribeProperties>,
631 ) -> Result<(), ClientError>
632 where
633 T: IntoIterator<Item = Filter>,
634 {
635 let subscribe = Subscribe::new_many(topics, properties);
636 let request = Request::Subscribe(subscribe);
637 self.client.request_tx.send(request)?;
638 Ok(())
639 }
640
641 pub fn subscribe_many_with_properties<T>(
642 &self,
643 topics: T,
644 properties: SubscribeProperties,
645 ) -> Result<(), ClientError>
646 where
647 T: IntoIterator<Item = Filter>,
648 {
649 self.handle_subscribe_many(topics, Some(properties))
650 }
651
652 pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
653 where
654 T: IntoIterator<Item = Filter>,
655 {
656 self.handle_subscribe_many(topics, None)
657 }
658
659 pub fn try_subscribe_many_with_properties<T>(
660 &self,
661 topics: T,
662 properties: SubscribeProperties,
663 ) -> Result<(), ClientError>
664 where
665 T: IntoIterator<Item = Filter>,
666 {
667 self.client
668 .try_subscribe_many_with_properties(topics, properties)
669 }
670
671 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
672 where
673 T: IntoIterator<Item = Filter>,
674 {
675 self.client.try_subscribe_many(topics)
676 }
677
678 fn handle_unsubscribe<S: Into<String>>(
680 &self,
681 topic: S,
682 properties: Option<UnsubscribeProperties>,
683 ) -> Result<(), ClientError> {
684 let unsubscribe = Unsubscribe::new(topic, properties);
685 let request = Request::Unsubscribe(unsubscribe);
686 self.client.request_tx.send(request)?;
687 Ok(())
688 }
689
690 pub fn unsubscribe_with_properties<S: Into<String>>(
691 &self,
692 topic: S,
693 properties: UnsubscribeProperties,
694 ) -> Result<(), ClientError> {
695 self.handle_unsubscribe(topic, Some(properties))
696 }
697
698 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
699 self.handle_unsubscribe(topic, None)
700 }
701
702 pub fn try_unsubscribe_with_properties<S: Into<String>>(
704 &self,
705 topic: S,
706 properties: UnsubscribeProperties,
707 ) -> Result<(), ClientError> {
708 self.client
709 .try_unsubscribe_with_properties(topic, properties)
710 }
711
712 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
713 self.client.try_unsubscribe(topic)
714 }
715
716 pub fn disconnect(&self) -> Result<(), ClientError> {
718 let request = Request::Disconnect;
719 self.client.request_tx.send(request)?;
720 Ok(())
721 }
722
723 pub fn try_disconnect(&self) -> Result<(), ClientError> {
725 self.client.try_disconnect()?;
726 Ok(())
727 }
728}
729
730#[derive(Debug, Eq, PartialEq)]
732pub struct RecvError;
733
734#[derive(Debug, Eq, PartialEq)]
736pub enum TryRecvError {
737 Disconnected,
739 Empty,
741}
742
743#[derive(Debug, Eq, PartialEq)]
745pub enum RecvTimeoutError {
746 Disconnected,
748 Timeout,
750}
751
752pub struct Connection {
754 pub eventloop: EventLoop,
755 runtime: Runtime,
756}
757impl Connection {
758 fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
759 Connection { eventloop, runtime }
760 }
761
762 #[must_use = "Connection should be iterated over a loop to make progress"]
769 pub fn iter(&mut self) -> Iter<'_> {
770 Iter { connection: self }
771 }
772
773 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
778 let f = self.eventloop.poll();
779 let event = self.runtime.block_on(f);
780
781 resolve_event(event).ok_or(RecvError)
782 }
783
784 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
789 let f = self.eventloop.poll();
790 let _guard = self.runtime.enter();
793 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
794
795 resolve_event(event).ok_or(TryRecvError::Disconnected)
796 }
797
798 pub fn recv_timeout(
803 &mut self,
804 duration: Duration,
805 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
806 let f = self.eventloop.poll();
807 let event = self
808 .runtime
809 .block_on(async { timeout(duration, f).await })
810 .map_err(|_| RecvTimeoutError::Timeout)?;
811
812 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
813 }
814}
815
816fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
817 match event {
818 Ok(v) => Some(Ok(v)),
819 Err(ConnectionError::RequestsDone) => {
821 trace!("Done with requests");
822 None
823 }
824 Err(e) => Some(Err(e)),
825 }
826}
827
828pub struct Iter<'a> {
830 connection: &'a mut Connection,
831}
832
833impl Iterator for Iter<'_> {
834 type Item = Result<Event, ConnectionError>;
835
836 fn next(&mut self) -> Option<Self::Item> {
837 self.connection.recv().ok()
838 }
839}
840
841#[cfg(test)]
842mod test {
843 use crate::v5::mqttbytes::v5::LastWill;
844
845 use super::*;
846
847 #[test]
848 fn calling_iter_twice_on_connection_shouldnt_panic() {
849 use std::time::Duration;
850
851 let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
852 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false, None);
853 mqttoptions
854 .set_keep_alive(Duration::from_secs(5))
855 .set_last_will(will);
856
857 let (_, mut connection) = Client::new(mqttoptions, 10);
858 let _ = connection.iter();
859 let _ = connection.iter();
860 }
861
862 #[test]
863 fn should_be_able_to_build_test_client_from_channel() {
864 let (tx, rx) = flume::bounded(1);
865 let client = Client::from_sender(tx);
866 client
867 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
868 .expect("Should be able to publish");
869 let _ = rx.try_recv().expect("Should have message");
870 }
871}