rumqttc/
client.rs

1//! This module offers a high level synchronous and asynchronous abstraction to
2//! async eventloop.
3use std::time::Duration;
4
5use crate::mqttbytes::{v4::*, QoS};
6use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request};
7
8use bytes::Bytes;
9use flume::{SendError, Sender, TrySendError};
10use futures_util::FutureExt;
11use tokio::runtime::{self, Runtime};
12use tokio::time::timeout;
13
14/// Client Error
15#[derive(Debug, thiserror::Error)]
16pub enum ClientError {
17    #[error("Failed to send mqtt requests to eventloop")]
18    Request(Request),
19    #[error("Failed to send mqtt requests to eventloop")]
20    TryRequest(Request),
21}
22
23impl From<SendError<Request>> for ClientError {
24    fn from(e: SendError<Request>) -> Self {
25        Self::Request(e.into_inner())
26    }
27}
28
29impl From<TrySendError<Request>> for ClientError {
30    fn from(e: TrySendError<Request>) -> Self {
31        Self::TryRequest(e.into_inner())
32    }
33}
34
35/// An asynchronous client, communicates with MQTT `EventLoop`.
36///
37/// This is cloneable and can be used to asynchronously [`publish`](`AsyncClient::publish`),
38/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`, which is to be polled parallelly.
39///
40/// **NOTE**: The `EventLoop` must be regularly polled in order to send, receive and process packets
41/// from the broker, i.e. move ahead.
42#[derive(Clone, Debug)]
43pub struct AsyncClient {
44    request_tx: Sender<Request>,
45}
46
47impl AsyncClient {
48    /// Create a new `AsyncClient`.
49    ///
50    /// `cap` specifies the capacity of the bounded async channel.
51    pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
52        let eventloop = EventLoop::new(options, cap);
53        let request_tx = eventloop.requests_tx.clone();
54
55        let client = AsyncClient { request_tx };
56
57        (client, eventloop)
58    }
59
60    /// Create a new `AsyncClient` from a channel `Sender`.
61    ///
62    /// This is mostly useful for creating a test instance where you can
63    /// listen on the corresponding receiver.
64    pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
65        AsyncClient { request_tx }
66    }
67
68    /// Sends a MQTT Publish to the `EventLoop`.
69    pub async fn publish<S, V>(
70        &self,
71        topic: S,
72        qos: QoS,
73        retain: bool,
74        payload: V,
75    ) -> Result<(), ClientError>
76    where
77        S: Into<String>,
78        V: Into<Vec<u8>>,
79    {
80        let topic = topic.into();
81        let mut publish = Publish::new(&topic, qos, payload);
82        publish.retain = retain;
83        let publish = Request::Publish(publish);
84        if !valid_topic(&topic) {
85            return Err(ClientError::Request(publish));
86        }
87        self.request_tx.send_async(publish).await?;
88        Ok(())
89    }
90
91    /// Attempts to send a MQTT Publish to the `EventLoop`.
92    pub fn try_publish<S, V>(
93        &self,
94        topic: S,
95        qos: QoS,
96        retain: bool,
97        payload: V,
98    ) -> Result<(), ClientError>
99    where
100        S: Into<String>,
101        V: Into<Vec<u8>>,
102    {
103        let topic = topic.into();
104        let mut publish = Publish::new(&topic, qos, payload);
105        publish.retain = retain;
106        let publish = Request::Publish(publish);
107        if !valid_topic(&topic) {
108            return Err(ClientError::TryRequest(publish));
109        }
110        self.request_tx.try_send(publish)?;
111        Ok(())
112    }
113
114    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
115    pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
116        let ack = get_ack_req(publish);
117
118        if let Some(ack) = ack {
119            self.request_tx.send_async(ack).await?;
120        }
121        Ok(())
122    }
123
124    /// Attempts to send a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
125    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
126        let ack = get_ack_req(publish);
127        if let Some(ack) = ack {
128            self.request_tx.try_send(ack)?;
129        }
130        Ok(())
131    }
132
133    /// Sends a MQTT Publish to the `EventLoop`
134    pub async fn publish_bytes<S>(
135        &self,
136        topic: S,
137        qos: QoS,
138        retain: bool,
139        payload: Bytes,
140    ) -> Result<(), ClientError>
141    where
142        S: Into<String>,
143    {
144        let mut publish = Publish::from_bytes(topic, qos, payload);
145        publish.retain = retain;
146        let publish = Request::Publish(publish);
147        self.request_tx.send_async(publish).await?;
148        Ok(())
149    }
150
151    /// Sends a MQTT Subscribe to the `EventLoop`
152    pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
153        let subscribe = Subscribe::new(topic.into(), qos);
154        let request = Request::Subscribe(subscribe);
155        self.request_tx.send_async(request).await?;
156        Ok(())
157    }
158
159    /// Attempts to send a MQTT Subscribe to the `EventLoop`
160    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
161        let subscribe = Subscribe::new(topic.into(), qos);
162        let request = Request::Subscribe(subscribe);
163        self.request_tx.try_send(request)?;
164        Ok(())
165    }
166
167    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
168    pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
169    where
170        T: IntoIterator<Item = SubscribeFilter>,
171    {
172        let subscribe = Subscribe::new_many(topics);
173        let request = Request::Subscribe(subscribe);
174        self.request_tx.send_async(request).await?;
175        Ok(())
176    }
177
178    /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop`
179    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
180    where
181        T: IntoIterator<Item = SubscribeFilter>,
182    {
183        let subscribe = Subscribe::new_many(topics);
184        let request = Request::Subscribe(subscribe);
185        self.request_tx.try_send(request)?;
186        Ok(())
187    }
188
189    /// Sends a MQTT Unsubscribe to the `EventLoop`
190    pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
191        let unsubscribe = Unsubscribe::new(topic.into());
192        let request = Request::Unsubscribe(unsubscribe);
193        self.request_tx.send_async(request).await?;
194        Ok(())
195    }
196
197    /// Attempts to send a MQTT Unsubscribe to the `EventLoop`
198    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
199        let unsubscribe = Unsubscribe::new(topic.into());
200        let request = Request::Unsubscribe(unsubscribe);
201        self.request_tx.try_send(request)?;
202        Ok(())
203    }
204
205    /// Sends a MQTT disconnect to the `EventLoop`
206    pub async fn disconnect(&self) -> Result<(), ClientError> {
207        let request = Request::Disconnect(Disconnect);
208        self.request_tx.send_async(request).await?;
209        Ok(())
210    }
211
212    /// Attempts to send a MQTT disconnect to the `EventLoop`
213    pub fn try_disconnect(&self) -> Result<(), ClientError> {
214        let request = Request::Disconnect(Disconnect);
215        self.request_tx.try_send(request)?;
216        Ok(())
217    }
218}
219
220fn get_ack_req(publish: &Publish) -> Option<Request> {
221    let ack = match publish.qos {
222        QoS::AtMostOnce => return None,
223        QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid)),
224        QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid)),
225    };
226    Some(ack)
227}
228
229/// A synchronous client, communicates with MQTT `EventLoop`.
230///
231/// This is cloneable and can be used to synchronously [`publish`](`AsyncClient::publish`),
232/// [`subscribe`](`AsyncClient::subscribe`) through the `EventLoop`/`Connection`, which is to be polled in parallel
233/// by iterating over the object returned by [`Connection.iter()`](Connection::iter) in a separate thread.
234///
235/// **NOTE**: The `EventLoop`/`Connection` must be regularly polled(`.next()` in case of `Connection`) in order
236/// to send, receive and process packets from the broker, i.e. move ahead.
237///
238/// An asynchronous channel handle can also be extracted if necessary.
239#[derive(Clone)]
240pub struct Client {
241    client: AsyncClient,
242}
243
244impl Client {
245    /// Create a new `Client`
246    ///
247    /// `cap` specifies the capacity of the bounded async channel.
248    pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
249        let (client, eventloop) = AsyncClient::new(options, cap);
250        let client = Client { client };
251        let runtime = runtime::Builder::new_current_thread()
252            .enable_all()
253            .build()
254            .unwrap();
255
256        let connection = Connection::new(eventloop, runtime);
257        (client, connection)
258    }
259
260    /// Create a new `Client` from a channel `Sender`.
261    ///
262    /// This is mostly useful for creating a test instance where you can
263    /// listen on the corresponding receiver.
264    pub fn from_sender(request_tx: Sender<Request>) -> Client {
265        Client {
266            client: AsyncClient::from_senders(request_tx),
267        }
268    }
269
270    /// Sends a MQTT Publish to the `EventLoop`
271    pub fn publish<S, V>(
272        &self,
273        topic: S,
274        qos: QoS,
275        retain: bool,
276        payload: V,
277    ) -> Result<(), ClientError>
278    where
279        S: Into<String>,
280        V: Into<Vec<u8>>,
281    {
282        let topic = topic.into();
283        let mut publish = Publish::new(&topic, qos, payload);
284        publish.retain = retain;
285        let publish = Request::Publish(publish);
286        if !valid_topic(&topic) {
287            return Err(ClientError::Request(publish));
288        }
289        self.client.request_tx.send(publish)?;
290        Ok(())
291    }
292
293    pub fn try_publish<S, V>(
294        &self,
295        topic: S,
296        qos: QoS,
297        retain: bool,
298        payload: V,
299    ) -> Result<(), ClientError>
300    where
301        S: Into<String>,
302        V: Into<Vec<u8>>,
303    {
304        self.client.try_publish(topic, qos, retain, payload)?;
305        Ok(())
306    }
307
308    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
309    pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
310        let ack = get_ack_req(publish);
311
312        if let Some(ack) = ack {
313            self.client.request_tx.send(ack)?;
314        }
315        Ok(())
316    }
317
318    /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set.
319    pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
320        self.client.try_ack(publish)?;
321        Ok(())
322    }
323
324    /// Sends a MQTT Subscribe to the `EventLoop`
325    pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
326        let subscribe = Subscribe::new(topic.into(), qos);
327        let request = Request::Subscribe(subscribe);
328        self.client.request_tx.send(request)?;
329        Ok(())
330    }
331
332    /// Sends a MQTT Subscribe to the `EventLoop`
333    pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
334        self.client.try_subscribe(topic, qos)?;
335        Ok(())
336    }
337
338    /// Sends a MQTT Subscribe for multiple topics to the `EventLoop`
339    pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
340    where
341        T: IntoIterator<Item = SubscribeFilter>,
342    {
343        let subscribe = Subscribe::new_many(topics);
344        let request = Request::Subscribe(subscribe);
345        self.client.request_tx.send(request)?;
346        Ok(())
347    }
348
349    pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
350    where
351        T: IntoIterator<Item = SubscribeFilter>,
352    {
353        self.client.try_subscribe_many(topics)
354    }
355
356    /// Sends a MQTT Unsubscribe to the `EventLoop`
357    pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
358        let unsubscribe = Unsubscribe::new(topic.into());
359        let request = Request::Unsubscribe(unsubscribe);
360        self.client.request_tx.send(request)?;
361        Ok(())
362    }
363
364    /// Sends a MQTT Unsubscribe to the `EventLoop`
365    pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
366        self.client.try_unsubscribe(topic)?;
367        Ok(())
368    }
369
370    /// Sends a MQTT disconnect to the `EventLoop`
371    pub fn disconnect(&self) -> Result<(), ClientError> {
372        let request = Request::Disconnect(Disconnect);
373        self.client.request_tx.send(request)?;
374        Ok(())
375    }
376
377    /// Sends a MQTT disconnect to the `EventLoop`
378    pub fn try_disconnect(&self) -> Result<(), ClientError> {
379        self.client.try_disconnect()?;
380        Ok(())
381    }
382}
383
384/// Error type returned by [`Connection::recv`]
385#[derive(Debug, Eq, PartialEq)]
386pub struct RecvError;
387
388/// Error type returned by [`Connection::try_recv`]
389#[derive(Debug, Eq, PartialEq)]
390pub enum TryRecvError {
391    /// User has closed requests channel
392    Disconnected,
393    /// Did not resolve
394    Empty,
395}
396
397/// Error type returned by [`Connection::recv_timeout`]
398#[derive(Debug, Eq, PartialEq)]
399pub enum RecvTimeoutError {
400    /// User has closed requests channel
401    Disconnected,
402    /// Recv request timedout
403    Timeout,
404}
405
406///  MQTT connection. Maintains all the necessary state
407pub struct Connection {
408    pub eventloop: EventLoop,
409    runtime: Runtime,
410}
411impl Connection {
412    fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
413        Connection { eventloop, runtime }
414    }
415
416    /// Returns an iterator over this connection. Iterating over this is all that's
417    /// necessary to make connection progress and maintain a robust connection.
418    /// Just continuing to loop will reconnect
419    /// **NOTE** Don't block this while iterating
420    // ideally this should be named iter_mut because it requires a mutable reference
421    // Also we can implement IntoIter for this to make it easy to iterate over it
422    #[must_use = "Connection should be iterated over a loop to make progress"]
423    pub fn iter(&mut self) -> Iter<'_> {
424        Iter { connection: self }
425    }
426
427    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
428    /// if all clients/users have closed requests channel.
429    ///
430    /// [`EvenLoop`]: super::EventLoop
431    pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
432        let f = self.eventloop.poll();
433        let event = self.runtime.block_on(f);
434
435        resolve_event(event).ok_or(RecvError)
436    }
437
438    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
439    /// if none immediately present or all clients/users have closed requests channel.
440    ///
441    /// [`EvenLoop`]: super::EventLoop
442    pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
443        let f = self.eventloop.poll();
444        // Enters the runtime context so we can poll the future, as required by `now_or_never()`.
445        // ref: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.enter
446        let _guard = self.runtime.enter();
447        let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
448
449        resolve_event(event).ok_or(TryRecvError::Disconnected)
450    }
451
452    /// Attempt to fetch an incoming [`Event`] on the [`EvenLoop`], returning an error
453    /// if all clients/users have closed requests channel or the timeout has expired.
454    ///
455    /// [`EvenLoop`]: super::EventLoop
456    pub fn recv_timeout(
457        &mut self,
458        duration: Duration,
459    ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
460        let f = self.eventloop.poll();
461        let event = self
462            .runtime
463            .block_on(async { timeout(duration, f).await })
464            .map_err(|_| RecvTimeoutError::Timeout)?;
465
466        resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
467    }
468}
469
470fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
471    match event {
472        Ok(v) => Some(Ok(v)),
473        // closing of request channel should stop the iterator
474        Err(ConnectionError::RequestsDone) => {
475            trace!("Done with requests");
476            None
477        }
478        Err(e) => Some(Err(e)),
479    }
480}
481
482/// Iterator which polls the `EventLoop` for connection progress
483pub struct Iter<'a> {
484    connection: &'a mut Connection,
485}
486
487impl Iterator for Iter<'_> {
488    type Item = Result<Event, ConnectionError>;
489
490    fn next(&mut self) -> Option<Self::Item> {
491        self.connection.recv().ok()
492    }
493}
494
495#[cfg(test)]
496mod test {
497    use super::*;
498
499    #[test]
500    fn calling_iter_twice_on_connection_shouldnt_panic() {
501        use std::time::Duration;
502
503        let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
504        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
505        mqttoptions
506            .set_keep_alive(Duration::from_secs(5))
507            .set_last_will(will);
508
509        let (_, mut connection) = Client::new(mqttoptions, 10);
510        let _ = connection.iter();
511        let _ = connection.iter();
512    }
513
514    #[test]
515    fn should_be_able_to_build_test_client_from_channel() {
516        let (tx, rx) = flume::bounded(1);
517        let client = Client::from_sender(tx);
518        client
519            .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
520            .expect("Should be able to publish");
521        let _ = rx.try_recv().expect("Should have message");
522    }
523}