1use std::time::Duration;
4
5use crate::mqttbytes::{v4::*, QoS};
6use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request};
7
8use bytes::Bytes;
9use flume::{SendError, Sender, TrySendError};
10use futures_util::FutureExt;
11use tokio::runtime::{self, Runtime};
12use tokio::time::timeout;
13
14#[derive(Debug, thiserror::Error)]
16pub enum ClientError {
17 #[error("Failed to send mqtt requests to eventloop")]
18 Request(Request),
19 #[error("Failed to send mqtt requests to eventloop")]
20 TryRequest(Request),
21}
22
23impl From<SendError<Request>> for ClientError {
24 fn from(e: SendError<Request>) -> Self {
25 Self::Request(e.into_inner())
26 }
27}
28
29impl From<TrySendError<Request>> for ClientError {
30 fn from(e: TrySendError<Request>) -> Self {
31 Self::TryRequest(e.into_inner())
32 }
33}
34
35#[derive(Clone, Debug)]
43pub struct AsyncClient {
44 request_tx: Sender<Request>,
45}
46
47impl AsyncClient {
48 pub fn new(options: MqttOptions, cap: usize) -> (AsyncClient, EventLoop) {
52 let eventloop = EventLoop::new(options, cap);
53 let request_tx = eventloop.requests_tx.clone();
54
55 let client = AsyncClient { request_tx };
56
57 (client, eventloop)
58 }
59
60 pub fn from_senders(request_tx: Sender<Request>) -> AsyncClient {
65 AsyncClient { request_tx }
66 }
67
68 pub async fn publish<S, V>(
70 &self,
71 topic: S,
72 qos: QoS,
73 retain: bool,
74 payload: V,
75 ) -> Result<(), ClientError>
76 where
77 S: Into<String>,
78 V: Into<Vec<u8>>,
79 {
80 let topic = topic.into();
81 let mut publish = Publish::new(&topic, qos, payload);
82 publish.retain = retain;
83 let publish = Request::Publish(publish);
84 if !valid_topic(&topic) {
85 return Err(ClientError::Request(publish));
86 }
87 self.request_tx.send_async(publish).await?;
88 Ok(())
89 }
90
91 pub fn try_publish<S, V>(
93 &self,
94 topic: S,
95 qos: QoS,
96 retain: bool,
97 payload: V,
98 ) -> Result<(), ClientError>
99 where
100 S: Into<String>,
101 V: Into<Vec<u8>>,
102 {
103 let topic = topic.into();
104 let mut publish = Publish::new(&topic, qos, payload);
105 publish.retain = retain;
106 let publish = Request::Publish(publish);
107 if !valid_topic(&topic) {
108 return Err(ClientError::TryRequest(publish));
109 }
110 self.request_tx.try_send(publish)?;
111 Ok(())
112 }
113
114 pub async fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
116 let ack = get_ack_req(publish);
117
118 if let Some(ack) = ack {
119 self.request_tx.send_async(ack).await?;
120 }
121 Ok(())
122 }
123
124 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
126 let ack = get_ack_req(publish);
127 if let Some(ack) = ack {
128 self.request_tx.try_send(ack)?;
129 }
130 Ok(())
131 }
132
133 pub async fn publish_bytes<S>(
135 &self,
136 topic: S,
137 qos: QoS,
138 retain: bool,
139 payload: Bytes,
140 ) -> Result<(), ClientError>
141 where
142 S: Into<String>,
143 {
144 let mut publish = Publish::from_bytes(topic, qos, payload);
145 publish.retain = retain;
146 let publish = Request::Publish(publish);
147 self.request_tx.send_async(publish).await?;
148 Ok(())
149 }
150
151 pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
153 let subscribe = Subscribe::new(topic.into(), qos);
154 let request = Request::Subscribe(subscribe);
155 self.request_tx.send_async(request).await?;
156 Ok(())
157 }
158
159 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
161 let subscribe = Subscribe::new(topic.into(), qos);
162 let request = Request::Subscribe(subscribe);
163 self.request_tx.try_send(request)?;
164 Ok(())
165 }
166
167 pub async fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
169 where
170 T: IntoIterator<Item = SubscribeFilter>,
171 {
172 let subscribe = Subscribe::new_many(topics);
173 let request = Request::Subscribe(subscribe);
174 self.request_tx.send_async(request).await?;
175 Ok(())
176 }
177
178 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
180 where
181 T: IntoIterator<Item = SubscribeFilter>,
182 {
183 let subscribe = Subscribe::new_many(topics);
184 let request = Request::Subscribe(subscribe);
185 self.request_tx.try_send(request)?;
186 Ok(())
187 }
188
189 pub async fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
191 let unsubscribe = Unsubscribe::new(topic.into());
192 let request = Request::Unsubscribe(unsubscribe);
193 self.request_tx.send_async(request).await?;
194 Ok(())
195 }
196
197 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
199 let unsubscribe = Unsubscribe::new(topic.into());
200 let request = Request::Unsubscribe(unsubscribe);
201 self.request_tx.try_send(request)?;
202 Ok(())
203 }
204
205 pub async fn disconnect(&self) -> Result<(), ClientError> {
207 let request = Request::Disconnect(Disconnect);
208 self.request_tx.send_async(request).await?;
209 Ok(())
210 }
211
212 pub fn try_disconnect(&self) -> Result<(), ClientError> {
214 let request = Request::Disconnect(Disconnect);
215 self.request_tx.try_send(request)?;
216 Ok(())
217 }
218}
219
220fn get_ack_req(publish: &Publish) -> Option<Request> {
221 let ack = match publish.qos {
222 QoS::AtMostOnce => return None,
223 QoS::AtLeastOnce => Request::PubAck(PubAck::new(publish.pkid)),
224 QoS::ExactlyOnce => Request::PubRec(PubRec::new(publish.pkid)),
225 };
226 Some(ack)
227}
228
229#[derive(Clone)]
240pub struct Client {
241 client: AsyncClient,
242}
243
244impl Client {
245 pub fn new(options: MqttOptions, cap: usize) -> (Client, Connection) {
249 let (client, eventloop) = AsyncClient::new(options, cap);
250 let client = Client { client };
251 let runtime = runtime::Builder::new_current_thread()
252 .enable_all()
253 .build()
254 .unwrap();
255
256 let connection = Connection::new(eventloop, runtime);
257 (client, connection)
258 }
259
260 pub fn from_sender(request_tx: Sender<Request>) -> Client {
265 Client {
266 client: AsyncClient::from_senders(request_tx),
267 }
268 }
269
270 pub fn publish<S, V>(
272 &self,
273 topic: S,
274 qos: QoS,
275 retain: bool,
276 payload: V,
277 ) -> Result<(), ClientError>
278 where
279 S: Into<String>,
280 V: Into<Vec<u8>>,
281 {
282 let topic = topic.into();
283 let mut publish = Publish::new(&topic, qos, payload);
284 publish.retain = retain;
285 let publish = Request::Publish(publish);
286 if !valid_topic(&topic) {
287 return Err(ClientError::Request(publish));
288 }
289 self.client.request_tx.send(publish)?;
290 Ok(())
291 }
292
293 pub fn try_publish<S, V>(
294 &self,
295 topic: S,
296 qos: QoS,
297 retain: bool,
298 payload: V,
299 ) -> Result<(), ClientError>
300 where
301 S: Into<String>,
302 V: Into<Vec<u8>>,
303 {
304 self.client.try_publish(topic, qos, retain, payload)?;
305 Ok(())
306 }
307
308 pub fn ack(&self, publish: &Publish) -> Result<(), ClientError> {
310 let ack = get_ack_req(publish);
311
312 if let Some(ack) = ack {
313 self.client.request_tx.send(ack)?;
314 }
315 Ok(())
316 }
317
318 pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
320 self.client.try_ack(publish)?;
321 Ok(())
322 }
323
324 pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
326 let subscribe = Subscribe::new(topic.into(), qos);
327 let request = Request::Subscribe(subscribe);
328 self.client.request_tx.send(request)?;
329 Ok(())
330 }
331
332 pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
334 self.client.try_subscribe(topic, qos)?;
335 Ok(())
336 }
337
338 pub fn subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
340 where
341 T: IntoIterator<Item = SubscribeFilter>,
342 {
343 let subscribe = Subscribe::new_many(topics);
344 let request = Request::Subscribe(subscribe);
345 self.client.request_tx.send(request)?;
346 Ok(())
347 }
348
349 pub fn try_subscribe_many<T>(&self, topics: T) -> Result<(), ClientError>
350 where
351 T: IntoIterator<Item = SubscribeFilter>,
352 {
353 self.client.try_subscribe_many(topics)
354 }
355
356 pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
358 let unsubscribe = Unsubscribe::new(topic.into());
359 let request = Request::Unsubscribe(unsubscribe);
360 self.client.request_tx.send(request)?;
361 Ok(())
362 }
363
364 pub fn try_unsubscribe<S: Into<String>>(&self, topic: S) -> Result<(), ClientError> {
366 self.client.try_unsubscribe(topic)?;
367 Ok(())
368 }
369
370 pub fn disconnect(&self) -> Result<(), ClientError> {
372 let request = Request::Disconnect(Disconnect);
373 self.client.request_tx.send(request)?;
374 Ok(())
375 }
376
377 pub fn try_disconnect(&self) -> Result<(), ClientError> {
379 self.client.try_disconnect()?;
380 Ok(())
381 }
382}
383
384#[derive(Debug, Eq, PartialEq)]
386pub struct RecvError;
387
388#[derive(Debug, Eq, PartialEq)]
390pub enum TryRecvError {
391 Disconnected,
393 Empty,
395}
396
397#[derive(Debug, Eq, PartialEq)]
399pub enum RecvTimeoutError {
400 Disconnected,
402 Timeout,
404}
405
406pub struct Connection {
408 pub eventloop: EventLoop,
409 runtime: Runtime,
410}
411impl Connection {
412 fn new(eventloop: EventLoop, runtime: Runtime) -> Connection {
413 Connection { eventloop, runtime }
414 }
415
416 #[must_use = "Connection should be iterated over a loop to make progress"]
423 pub fn iter(&mut self) -> Iter<'_> {
424 Iter { connection: self }
425 }
426
427 pub fn recv(&mut self) -> Result<Result<Event, ConnectionError>, RecvError> {
432 let f = self.eventloop.poll();
433 let event = self.runtime.block_on(f);
434
435 resolve_event(event).ok_or(RecvError)
436 }
437
438 pub fn try_recv(&mut self) -> Result<Result<Event, ConnectionError>, TryRecvError> {
443 let f = self.eventloop.poll();
444 let _guard = self.runtime.enter();
447 let event = f.now_or_never().ok_or(TryRecvError::Empty)?;
448
449 resolve_event(event).ok_or(TryRecvError::Disconnected)
450 }
451
452 pub fn recv_timeout(
457 &mut self,
458 duration: Duration,
459 ) -> Result<Result<Event, ConnectionError>, RecvTimeoutError> {
460 let f = self.eventloop.poll();
461 let event = self
462 .runtime
463 .block_on(async { timeout(duration, f).await })
464 .map_err(|_| RecvTimeoutError::Timeout)?;
465
466 resolve_event(event).ok_or(RecvTimeoutError::Disconnected)
467 }
468}
469
470fn resolve_event(event: Result<Event, ConnectionError>) -> Option<Result<Event, ConnectionError>> {
471 match event {
472 Ok(v) => Some(Ok(v)),
473 Err(ConnectionError::RequestsDone) => {
475 trace!("Done with requests");
476 None
477 }
478 Err(e) => Some(Err(e)),
479 }
480}
481
482pub struct Iter<'a> {
484 connection: &'a mut Connection,
485}
486
487impl Iterator for Iter<'_> {
488 type Item = Result<Event, ConnectionError>;
489
490 fn next(&mut self) -> Option<Self::Item> {
491 self.connection.recv().ok()
492 }
493}
494
495#[cfg(test)]
496mod test {
497 use super::*;
498
499 #[test]
500 fn calling_iter_twice_on_connection_shouldnt_panic() {
501 use std::time::Duration;
502
503 let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
504 let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
505 mqttoptions
506 .set_keep_alive(Duration::from_secs(5))
507 .set_last_will(will);
508
509 let (_, mut connection) = Client::new(mqttoptions, 10);
510 let _ = connection.iter();
511 let _ = connection.iter();
512 }
513
514 #[test]
515 fn should_be_able_to_build_test_client_from_channel() {
516 let (tx, rx) = flume::bounded(1);
517 let client = Client::from_sender(tx);
518 client
519 .publish("hello/world", QoS::ExactlyOnce, false, "good bye")
520 .expect("Should be able to publish");
521 let _ = rx.try_recv().expect("Should have message");
522 }
523}