rumqttc/
lib.rs

1//! A pure rust MQTT client which strives to be robust, efficient and easy to use.
2//! This library is backed by an async (tokio) eventloop which handles all the
3//! robustness and and efficiency parts of MQTT but naturally fits into both sync
4//! and async worlds as we'll see
5//!
6//! Let's jump into examples right away
7//!
8//! A simple synchronous publish and subscribe
9//! ----------------------------
10//!
11//! ```no_run
12//! use rumqttc::{MqttOptions, Client, QoS};
13//! use std::time::Duration;
14//! use std::thread;
15//!
16//! let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
17//! mqttoptions.set_keep_alive(Duration::from_secs(5));
18//!
19//! let (mut client, mut connection) = Client::new(mqttoptions, 10);
20//! client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
21//! thread::spawn(move || for i in 0..10 {
22//!    client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
23//!    thread::sleep(Duration::from_millis(100));
24//! });
25//!
26//! // Iterate to poll the eventloop for connection progress
27//! for (i, notification) in connection.iter().enumerate() {
28//!     println!("Notification = {:?}", notification);
29//! }
30//! ```
31//!
32//! A simple asynchronous publish and subscribe
33//! ------------------------------
34//!
35//! ```no_run
36//! use rumqttc::{MqttOptions, AsyncClient, QoS};
37//! use tokio::{task, time};
38//! use std::time::Duration;
39//! use std::error::Error;
40//!
41//! # #[tokio::main(flavor = "current_thread")]
42//! # async fn main() {
43//! let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
44//! mqttoptions.set_keep_alive(Duration::from_secs(5));
45//!
46//! let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
47//! client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();
48//!
49//! task::spawn(async move {
50//!     for i in 0..10 {
51//!         client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
52//!         time::sleep(Duration::from_millis(100)).await;
53//!     }
54//! });
55//!
56//! loop {
57//!     let notification = eventloop.poll().await.unwrap();
58//!     println!("Received = {:?}", notification);
59//! }
60//! # }
61//! ```
62//!
63//! Quick overview of features
64//! - Eventloop orchestrates outgoing/incoming packets concurrently and handles the state
65//! - Pings the broker when necessary and detects client side half open connections as well
66//! - Throttling of outgoing packets (todo)
67//! - Queue size based flow control on outgoing packets
68//! - Automatic reconnections by just continuing the `eventloop.poll()`/`connection.iter()` loop
69//! - Natural backpressure to client APIs during bad network
70//!
71//! In short, everything necessary to maintain a robust connection
72//!
73//! Since the eventloop is externally polled (with `iter()/poll()` in a loop)
74//! out side the library and `Eventloop` is accessible, users can
75//! - Distribute incoming messages based on topics
76//! - Stop it when required
77//! - Access internal state for use cases like graceful shutdown or to modify options before reconnection
78//!
79//! ## Important notes
80//!
81//! - Looping on `connection.iter()`/`eventloop.poll()` is necessary to run the
82//!   event loop and make progress. It yields incoming and outgoing activity
83//!   notifications which allows customization as you see fit.
84//!
85//! - Blocking inside the `connection.iter()`/`eventloop.poll()` loop will block
86//!   connection progress.
87//!
88//! ## FAQ
89//! **Connecting to a broker using raw ip doesn't work**
90//!
91//! You cannot create a TLS connection to a bare IP address with a self-signed
92//! certificate. This is a [limitation of rustls](https://github.com/ctz/rustls/issues/184).
93//! One workaround, which only works under *nix/BSD-like systems, is to add an
94//! entry to wherever your DNS resolver looks (e.g. `/etc/hosts`) for the bare IP
95//! address and use that name in your code.
96#![cfg_attr(docsrs, feature(doc_cfg))]
97
98#[macro_use]
99extern crate log;
100
101use std::fmt::{self, Debug, Formatter};
102
103#[cfg(any(feature = "use-rustls", feature = "websocket"))]
104use std::sync::Arc;
105
106use std::time::Duration;
107
108mod client;
109mod eventloop;
110mod framed;
111pub mod mqttbytes;
112mod state;
113pub mod v5;
114
115#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
116mod tls;
117
118#[cfg(feature = "websocket")]
119mod websockets;
120
121#[cfg(feature = "websocket")]
122use std::{
123    future::{Future, IntoFuture},
124    pin::Pin,
125};
126
127#[cfg(feature = "websocket")]
128type RequestModifierFn = Arc<
129    dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
130        + Send
131        + Sync,
132>;
133
134#[cfg(feature = "proxy")]
135mod proxy;
136
137pub use client::{
138    AsyncClient, Client, ClientError, Connection, Iter, RecvError, RecvTimeoutError, TryRecvError,
139};
140pub use eventloop::{ConnectionError, Event, EventLoop};
141pub use mqttbytes::v4::*;
142pub use mqttbytes::*;
143#[cfg(feature = "use-rustls")]
144use rustls_native_certs::load_native_certs;
145pub use state::{MqttState, StateError};
146#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
147pub use tls::Error as TlsError;
148#[cfg(feature = "use-rustls")]
149pub use tokio_rustls;
150#[cfg(feature = "use-rustls")]
151use tokio_rustls::rustls::{ClientConfig, RootCertStore};
152
153#[cfg(feature = "proxy")]
154pub use proxy::{Proxy, ProxyAuth, ProxyType};
155
156pub type Incoming = Packet;
157
158/// Current outgoing activity on the eventloop
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum Outgoing {
161    /// Publish packet with packet identifier. 0 implies QoS 0
162    Publish(u16),
163    /// Subscribe packet with packet identifier
164    Subscribe(u16),
165    /// Unsubscribe packet with packet identifier
166    Unsubscribe(u16),
167    /// PubAck packet
168    PubAck(u16),
169    /// PubRec packet
170    PubRec(u16),
171    /// PubRel packet
172    PubRel(u16),
173    /// PubComp packet
174    PubComp(u16),
175    /// Ping request packet
176    PingReq,
177    /// Ping response packet
178    PingResp,
179    /// Disconnect packet
180    Disconnect,
181    /// Await for an ack for more outgoing progress
182    AwaitAck(u16),
183}
184
185/// Requests by the client to mqtt event loop. Request are
186/// handled one by one.
187#[derive(Clone, Debug, PartialEq, Eq)]
188pub enum Request {
189    Publish(Publish),
190    PubAck(PubAck),
191    PubRec(PubRec),
192    PubComp(PubComp),
193    PubRel(PubRel),
194    PingReq(PingReq),
195    PingResp(PingResp),
196    Subscribe(Subscribe),
197    SubAck(SubAck),
198    Unsubscribe(Unsubscribe),
199    UnsubAck(UnsubAck),
200    Disconnect(Disconnect),
201}
202
203impl Request {
204    fn size(&self) -> usize {
205        match &self {
206            Request::Publish(publish) => publish.size(),
207            Request::PubAck(puback) => puback.size(),
208            Request::PubRec(pubrec) => pubrec.size(),
209            Request::PubComp(pubcomp) => pubcomp.size(),
210            Request::PubRel(pubrel) => pubrel.size(),
211            Request::PingReq(pingreq) => pingreq.size(),
212            Request::PingResp(pingresp) => pingresp.size(),
213            Request::Subscribe(subscribe) => subscribe.size(),
214            Request::SubAck(suback) => suback.size(),
215            Request::Unsubscribe(unsubscribe) => unsubscribe.size(),
216            Request::UnsubAck(unsuback) => unsuback.size(),
217            Request::Disconnect(disconn) => disconn.size(),
218        }
219    }
220}
221
222impl From<Publish> for Request {
223    fn from(publish: Publish) -> Request {
224        Request::Publish(publish)
225    }
226}
227
228impl From<Subscribe> for Request {
229    fn from(subscribe: Subscribe) -> Request {
230        Request::Subscribe(subscribe)
231    }
232}
233
234impl From<Unsubscribe> for Request {
235    fn from(unsubscribe: Unsubscribe) -> Request {
236        Request::Unsubscribe(unsubscribe)
237    }
238}
239
240/// Transport methods. Defaults to TCP.
241#[derive(Clone)]
242pub enum Transport {
243    Tcp,
244    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
245    Tls(TlsConfiguration),
246    #[cfg(unix)]
247    Unix,
248    #[cfg(feature = "websocket")]
249    #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
250    Ws,
251    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
252    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
253    Wss(TlsConfiguration),
254}
255
256impl Default for Transport {
257    fn default() -> Self {
258        Self::tcp()
259    }
260}
261
262impl Transport {
263    /// Use regular tcp as transport (default)
264    pub fn tcp() -> Self {
265        Self::Tcp
266    }
267
268    #[cfg(feature = "use-rustls")]
269    pub fn tls_with_default_config() -> Self {
270        Self::tls_with_config(Default::default())
271    }
272
273    /// Use secure tcp with tls as transport
274    #[cfg(feature = "use-rustls")]
275    pub fn tls(
276        ca: Vec<u8>,
277        client_auth: Option<(Vec<u8>, Vec<u8>)>,
278        alpn: Option<Vec<Vec<u8>>>,
279    ) -> Self {
280        let config = TlsConfiguration::Simple {
281            ca,
282            alpn,
283            client_auth,
284        };
285
286        Self::tls_with_config(config)
287    }
288
289    #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
290    pub fn tls_with_config(tls_config: TlsConfiguration) -> Self {
291        Self::Tls(tls_config)
292    }
293
294    #[cfg(unix)]
295    pub fn unix() -> Self {
296        Self::Unix
297    }
298
299    /// Use websockets as transport
300    #[cfg(feature = "websocket")]
301    #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
302    pub fn ws() -> Self {
303        Self::Ws
304    }
305
306    /// Use secure websockets with tls as transport
307    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
308    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
309    pub fn wss(
310        ca: Vec<u8>,
311        client_auth: Option<(Vec<u8>, Vec<u8>)>,
312        alpn: Option<Vec<Vec<u8>>>,
313    ) -> Self {
314        let config = TlsConfiguration::Simple {
315            ca,
316            client_auth,
317            alpn,
318        };
319
320        Self::wss_with_config(config)
321    }
322
323    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
324    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
325    pub fn wss_with_config(tls_config: TlsConfiguration) -> Self {
326        Self::Wss(tls_config)
327    }
328
329    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
330    #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
331    pub fn wss_with_default_config() -> Self {
332        Self::Wss(Default::default())
333    }
334}
335
336/// TLS configuration method
337#[derive(Clone, Debug)]
338#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
339pub enum TlsConfiguration {
340    #[cfg(feature = "use-rustls")]
341    Simple {
342        /// connection method
343        ca: Vec<u8>,
344        /// alpn settings
345        alpn: Option<Vec<Vec<u8>>>,
346        /// tls client_authentication
347        client_auth: Option<(Vec<u8>, Vec<u8>)>,
348    },
349    #[cfg(feature = "use-native-tls")]
350    SimpleNative {
351        /// ca certificate
352        ca: Vec<u8>,
353        /// pkcs12 binary der and
354        /// password for use with der
355        client_auth: Option<(Vec<u8>, String)>,
356    },
357    #[cfg(feature = "use-rustls")]
358    /// Injected rustls ClientConfig for TLS, to allow more customisation.
359    Rustls(Arc<ClientConfig>),
360    #[cfg(feature = "use-native-tls")]
361    Native,
362}
363
364#[cfg(feature = "use-rustls")]
365impl Default for TlsConfiguration {
366    fn default() -> Self {
367        let mut root_cert_store = RootCertStore::empty();
368        for cert in load_native_certs().expect("could not load platform certs") {
369            root_cert_store.add(cert).unwrap();
370        }
371        let tls_config = ClientConfig::builder()
372            .with_root_certificates(root_cert_store)
373            .with_no_client_auth();
374
375        Self::Rustls(Arc::new(tls_config))
376    }
377}
378
379#[cfg(feature = "use-rustls")]
380impl From<ClientConfig> for TlsConfiguration {
381    fn from(config: ClientConfig) -> Self {
382        TlsConfiguration::Rustls(Arc::new(config))
383    }
384}
385
386/// Provides a way to configure low level network connection configurations
387#[derive(Clone, Default)]
388pub struct NetworkOptions {
389    tcp_send_buffer_size: Option<u32>,
390    tcp_recv_buffer_size: Option<u32>,
391    conn_timeout: u64,
392    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
393    bind_device: Option<String>,
394}
395
396impl NetworkOptions {
397    pub fn new() -> Self {
398        NetworkOptions {
399            tcp_send_buffer_size: None,
400            tcp_recv_buffer_size: None,
401            conn_timeout: 5,
402            #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
403            bind_device: None,
404        }
405    }
406
407    pub fn set_tcp_send_buffer_size(&mut self, size: u32) {
408        self.tcp_send_buffer_size = Some(size);
409    }
410
411    pub fn set_tcp_recv_buffer_size(&mut self, size: u32) {
412        self.tcp_recv_buffer_size = Some(size);
413    }
414
415    /// set connection timeout in secs
416    pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
417        self.conn_timeout = timeout;
418        self
419    }
420
421    /// get timeout in secs
422    pub fn connection_timeout(&self) -> u64 {
423        self.conn_timeout
424    }
425
426    /// bind connection to a specific network device by name
427    #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
428    #[cfg_attr(
429        docsrs,
430        doc(cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))
431    )]
432    pub fn set_bind_device(&mut self, bind_device: &str) -> &mut Self {
433        self.bind_device = Some(bind_device.to_string());
434        self
435    }
436}
437
438// TODO: Should all the options be exposed as public? Drawback
439// would be loosing the ability to panic when the user options
440// are wrong (e.g empty client id) or aggressive (keep alive time)
441/// Options to configure the behaviour of MQTT connection
442#[derive(Clone)]
443pub struct MqttOptions {
444    /// broker address that you want to connect to
445    broker_addr: String,
446    /// broker port
447    port: u16,
448    // What transport protocol to use
449    transport: Transport,
450    /// keep alive time to send pingreq to broker when the connection is idle
451    keep_alive: Duration,
452    /// clean (or) persistent session
453    clean_session: bool,
454    /// client identifier
455    client_id: String,
456    /// username and password
457    credentials: Option<(String, String)>,
458    /// maximum incoming packet size (verifies remaining length of the packet)
459    max_incoming_packet_size: usize,
460    /// Maximum outgoing packet size (only verifies publish payload size)
461    max_outgoing_packet_size: usize,
462    /// request (publish, subscribe) channel capacity
463    request_channel_capacity: usize,
464    /// Max internal request batching
465    max_request_batch: usize,
466    /// Minimum delay time between consecutive outgoing packets
467    /// while retransmitting pending packets
468    pending_throttle: Duration,
469    /// maximum number of outgoing inflight messages
470    inflight: u16,
471    /// Last will that will be issued on unexpected disconnect
472    last_will: Option<LastWill>,
473    /// If set to `true` MQTT acknowledgements are not sent automatically.
474    /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
475    manual_acks: bool,
476    #[cfg(feature = "proxy")]
477    /// Proxy configuration.
478    proxy: Option<Proxy>,
479    #[cfg(feature = "websocket")]
480    request_modifier: Option<RequestModifierFn>,
481}
482
483impl MqttOptions {
484    /// Create an [`MqttOptions`] object that contains default values for all settings other than
485    /// - id: A string to identify the device connecting to a broker
486    /// - host: The broker's domain name or IP address
487    /// - port: The port number on which broker must be listening for incoming connections
488    ///
489    /// ```
490    /// # use rumqttc::MqttOptions;
491    /// let options = MqttOptions::new("123", "localhost", 1883);
492    /// ```
493    pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
494        MqttOptions {
495            broker_addr: host.into(),
496            port,
497            transport: Transport::tcp(),
498            keep_alive: Duration::from_secs(60),
499            clean_session: true,
500            client_id: id.into(),
501            credentials: None,
502            max_incoming_packet_size: 10 * 1024,
503            max_outgoing_packet_size: 10 * 1024,
504            request_channel_capacity: 10,
505            max_request_batch: 0,
506            pending_throttle: Duration::from_micros(0),
507            inflight: 100,
508            last_will: None,
509            manual_acks: false,
510            #[cfg(feature = "proxy")]
511            proxy: None,
512            #[cfg(feature = "websocket")]
513            request_modifier: None,
514        }
515    }
516
517    #[cfg(feature = "url")]
518    /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's
519    /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature.
520    ///
521    /// ```
522    /// # use rumqttc::MqttOptions;
523    /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap();
524    /// ```
525    ///
526    /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`,
527    /// `ws://` or `wss://` to denote the protocol for establishing a connection with the broker.
528    ///
529    /// **NOTE:** Encrypted connections(i.e. `mqtts://`, `ssl://`, `wss://`) by default use the
530    /// system's root certificates. To configure with custom certificates, one may use the
531    /// [`set_transport`](MqttOptions::set_transport) method.
532    ///
533    /// ```ignore
534    /// # use rumqttc::{MqttOptions, Transport};
535    /// # use tokio_rustls::rustls::ClientConfig;
536    /// # let root_cert_store = rustls::RootCertStore::empty();
537    /// # let client_config = ClientConfig::builder()
538    /// #    .with_root_certificates(root_cert_store)
539    /// #    .with_no_client_auth();
540    /// let mut options = MqttOptions::parse_url("mqtts://example.com?client_id=123").unwrap();
541    /// options.set_transport(Transport::tls_with_config(client_config.into()));
542    /// ```
543    pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
544        let url = url::Url::parse(&url.into())?;
545        let options = MqttOptions::try_from(url)?;
546
547        Ok(options)
548    }
549
550    /// Broker address
551    pub fn broker_address(&self) -> (String, u16) {
552        (self.broker_addr.clone(), self.port)
553    }
554
555    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
556        self.last_will = Some(will);
557        self
558    }
559
560    pub fn last_will(&self) -> Option<LastWill> {
561        self.last_will.clone()
562    }
563
564    pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
565        self.transport = transport;
566        self
567    }
568
569    pub fn transport(&self) -> Transport {
570        self.transport.clone()
571    }
572
573    /// Set number of seconds after which client should ping the broker
574    /// if there is no other data exchange
575    pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
576        assert!(
577            duration.is_zero() || duration >= Duration::from_secs(1),
578            "Keep alives should be specified in seconds. Durations less than \
579            a second are not allowed, except for Duration::ZERO."
580        );
581
582        self.keep_alive = duration;
583        self
584    }
585
586    /// Keep alive time
587    pub fn keep_alive(&self) -> Duration {
588        self.keep_alive
589    }
590
591    /// Client identifier
592    pub fn client_id(&self) -> String {
593        self.client_id.clone()
594    }
595
596    /// Set packet size limit for outgoing an incoming packets
597    pub fn set_max_packet_size(&mut self, incoming: usize, outgoing: usize) -> &mut Self {
598        self.max_incoming_packet_size = incoming;
599        self.max_outgoing_packet_size = outgoing;
600        self
601    }
602
603    /// Maximum packet size
604    pub fn max_packet_size(&self) -> usize {
605        self.max_incoming_packet_size
606    }
607
608    /// `clean_session = true` removes all the state from queues & instructs the broker
609    /// to clean all the client state when client disconnects.
610    ///
611    /// When set `false`, broker will hold the client state and performs pending
612    /// operations on the client when reconnection with same `client_id`
613    /// happens. Local queue state is also held to retransmit packets after reconnection.
614    ///
615    /// # Panic
616    ///
617    /// Panics if `clean_session` is false when `client_id` is empty.
618    ///
619    /// ```should_panic
620    /// # use rumqttc::MqttOptions;
621    /// let mut options = MqttOptions::new("", "localhost", 1883);
622    /// options.set_clean_session(false);
623    /// ```
624    pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
625        assert!(
626            !self.client_id.is_empty() || clean_session,
627            "Cannot unset clean session when client id is empty"
628        );
629        self.clean_session = clean_session;
630        self
631    }
632
633    /// Clean session
634    pub fn clean_session(&self) -> bool {
635        self.clean_session
636    }
637
638    /// Username and password
639    pub fn set_credentials<U: Into<String>, P: Into<String>>(
640        &mut self,
641        username: U,
642        password: P,
643    ) -> &mut Self {
644        self.credentials = Some((username.into(), password.into()));
645        self
646    }
647
648    /// Security options
649    pub fn credentials(&self) -> Option<(String, String)> {
650        self.credentials.clone()
651    }
652
653    /// Set request channel capacity
654    pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
655        self.request_channel_capacity = capacity;
656        self
657    }
658
659    /// Request channel capacity
660    pub fn request_channel_capacity(&self) -> usize {
661        self.request_channel_capacity
662    }
663
664    /// Enables throttling and sets outoing message rate to the specified 'rate'
665    pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
666        self.pending_throttle = duration;
667        self
668    }
669
670    /// Outgoing message rate
671    pub fn pending_throttle(&self) -> Duration {
672        self.pending_throttle
673    }
674
675    /// Set number of concurrent in flight messages
676    pub fn set_inflight(&mut self, inflight: u16) -> &mut Self {
677        assert!(inflight != 0, "zero in flight is not allowed");
678
679        self.inflight = inflight;
680        self
681    }
682
683    /// Number of concurrent in flight messages
684    pub fn inflight(&self) -> u16 {
685        self.inflight
686    }
687
688    /// set manual acknowledgements
689    pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
690        self.manual_acks = manual_acks;
691        self
692    }
693
694    /// get manual acknowledgements
695    pub fn manual_acks(&self) -> bool {
696        self.manual_acks
697    }
698
699    #[cfg(feature = "proxy")]
700    pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
701        self.proxy = Some(proxy);
702        self
703    }
704
705    #[cfg(feature = "proxy")]
706    pub fn proxy(&self) -> Option<Proxy> {
707        self.proxy.clone()
708    }
709
710    #[cfg(feature = "websocket")]
711    pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
712    where
713        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
714        O: IntoFuture<Output = http::Request<()>> + 'static,
715        O::IntoFuture: Send,
716    {
717        self.request_modifier = Some(Arc::new(move |request| {
718            let request_modifier = request_modifier(request).into_future();
719            Box::pin(request_modifier)
720        }));
721
722        self
723    }
724
725    #[cfg(feature = "websocket")]
726    pub fn request_modifier(&self) -> Option<RequestModifierFn> {
727        self.request_modifier.clone()
728    }
729}
730
731#[cfg(feature = "url")]
732#[derive(Debug, PartialEq, Eq, thiserror::Error)]
733pub enum OptionError {
734    #[error("Unsupported URL scheme.")]
735    Scheme,
736
737    #[error("Missing client ID.")]
738    ClientId,
739
740    #[error("Invalid keep-alive value.")]
741    KeepAlive,
742
743    #[error("Invalid clean-session value.")]
744    CleanSession,
745
746    #[error("Invalid max-incoming-packet-size value.")]
747    MaxIncomingPacketSize,
748
749    #[error("Invalid max-outgoing-packet-size value.")]
750    MaxOutgoingPacketSize,
751
752    #[error("Invalid request-channel-capacity value.")]
753    RequestChannelCapacity,
754
755    #[error("Invalid max-request-batch value.")]
756    MaxRequestBatch,
757
758    #[error("Invalid pending-throttle value.")]
759    PendingThrottle,
760
761    #[error("Invalid inflight value.")]
762    Inflight,
763
764    #[error("Unknown option: {0}")]
765    Unknown(String),
766
767    #[error("Couldn't parse option from url: {0}")]
768    Parse(#[from] url::ParseError),
769}
770
771#[cfg(feature = "url")]
772impl std::convert::TryFrom<url::Url> for MqttOptions {
773    type Error = OptionError;
774
775    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
776        use std::collections::HashMap;
777
778        let host = url.host_str().unwrap_or_default().to_owned();
779
780        let (transport, default_port) = match url.scheme() {
781            // Encrypted connections are supported, but require explicit TLS configuration. We fall
782            // back to the unencrypted transport layer, so that `set_transport` can be used to
783            // configure the encrypted transport layer with the provided TLS configuration.
784            #[cfg(feature = "use-rustls")]
785            "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
786            "mqtt" | "tcp" => (Transport::Tcp, 1883),
787            #[cfg(feature = "websocket")]
788            "ws" => (Transport::Ws, 8000),
789            #[cfg(all(feature = "use-rustls", feature = "websocket"))]
790            "wss" => (Transport::wss_with_default_config(), 8000),
791            _ => return Err(OptionError::Scheme),
792        };
793
794        let port = url.port().unwrap_or(default_port);
795
796        let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
797
798        let id = queries
799            .remove("client_id")
800            .ok_or(OptionError::ClientId)?
801            .into_owned();
802
803        let mut options = MqttOptions::new(id, host, port);
804        options.set_transport(transport);
805
806        if let Some(keep_alive) = queries
807            .remove("keep_alive_secs")
808            .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
809            .transpose()?
810        {
811            options.set_keep_alive(Duration::from_secs(keep_alive));
812        }
813
814        if let Some(clean_session) = queries
815            .remove("clean_session")
816            .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanSession))
817            .transpose()?
818        {
819            options.set_clean_session(clean_session);
820        }
821
822        if let Some((username, password)) = {
823            match url.username() {
824                "" => None,
825                username => Some((
826                    username.to_owned(),
827                    url.password().unwrap_or_default().to_owned(),
828                )),
829            }
830        } {
831            options.set_credentials(username, password);
832        }
833
834        if let (Some(incoming), Some(outgoing)) = (
835            queries
836                .remove("max_incoming_packet_size_bytes")
837                .map(|v| {
838                    v.parse::<usize>()
839                        .map_err(|_| OptionError::MaxIncomingPacketSize)
840                })
841                .transpose()?,
842            queries
843                .remove("max_outgoing_packet_size_bytes")
844                .map(|v| {
845                    v.parse::<usize>()
846                        .map_err(|_| OptionError::MaxOutgoingPacketSize)
847                })
848                .transpose()?,
849        ) {
850            options.set_max_packet_size(incoming, outgoing);
851        }
852
853        if let Some(request_channel_capacity) = queries
854            .remove("request_channel_capacity_num")
855            .map(|v| {
856                v.parse::<usize>()
857                    .map_err(|_| OptionError::RequestChannelCapacity)
858            })
859            .transpose()?
860        {
861            options.request_channel_capacity = request_channel_capacity;
862        }
863
864        if let Some(max_request_batch) = queries
865            .remove("max_request_batch_num")
866            .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
867            .transpose()?
868        {
869            options.max_request_batch = max_request_batch;
870        }
871
872        if let Some(pending_throttle) = queries
873            .remove("pending_throttle_usecs")
874            .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
875            .transpose()?
876        {
877            options.set_pending_throttle(Duration::from_micros(pending_throttle));
878        }
879
880        if let Some(inflight) = queries
881            .remove("inflight_num")
882            .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
883            .transpose()?
884        {
885            options.set_inflight(inflight);
886        }
887
888        if let Some((opt, _)) = queries.into_iter().next() {
889            return Err(OptionError::Unknown(opt.into_owned()));
890        }
891
892        Ok(options)
893    }
894}
895
896// Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't
897// work.
898impl Debug for MqttOptions {
899    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
900        f.debug_struct("MqttOptions")
901            .field("broker_addr", &self.broker_addr)
902            .field("port", &self.port)
903            .field("keep_alive", &self.keep_alive)
904            .field("clean_session", &self.clean_session)
905            .field("client_id", &self.client_id)
906            .field("credentials", &self.credentials)
907            .field("max_packet_size", &self.max_incoming_packet_size)
908            .field("request_channel_capacity", &self.request_channel_capacity)
909            .field("max_request_batch", &self.max_request_batch)
910            .field("pending_throttle", &self.pending_throttle)
911            .field("inflight", &self.inflight)
912            .field("last_will", &self.last_will)
913            .field("manual_acks", &self.manual_acks)
914            .finish()
915    }
916}
917
918#[cfg(test)]
919mod test {
920    use super::*;
921
922    #[test]
923    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
924    fn no_scheme() {
925        let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
926
927        mqttoptions.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None));
928
929        if let crate::Transport::Wss(TlsConfiguration::Simple {
930            ca,
931            client_auth,
932            alpn,
933        }) = mqttoptions.transport
934        {
935            assert_eq!(ca, Vec::from("Test CA"));
936            assert_eq!(client_auth, None);
937            assert_eq!(alpn, None);
938        } else {
939            panic!("Unexpected transport!");
940        }
941
942        assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
943    }
944
945    #[test]
946    #[cfg(feature = "url")]
947    fn from_url() {
948        fn opt(s: &str) -> Result<MqttOptions, OptionError> {
949            MqttOptions::parse_url(s)
950        }
951        fn ok(s: &str) -> MqttOptions {
952            opt(s).expect("valid options")
953        }
954        fn err(s: &str) -> OptionError {
955            opt(s).expect_err("invalid options")
956        }
957
958        let v = ok("mqtt://host:42?client_id=foo");
959        assert_eq!(v.broker_address(), ("host".to_owned(), 42));
960        assert_eq!(v.client_id(), "foo".to_owned());
961
962        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
963        assert_eq!(v.keep_alive, Duration::from_secs(5));
964
965        assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
966        assert_eq!(
967            err("mqtt://host:42?client_id=foo&foo=bar"),
968            OptionError::Unknown("foo".to_owned())
969        );
970        assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
971        assert_eq!(
972            err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
973            OptionError::KeepAlive
974        );
975        assert_eq!(
976            err("mqtt://host:42?client_id=foo&clean_session=foo"),
977            OptionError::CleanSession
978        );
979        assert_eq!(
980            err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
981            OptionError::MaxIncomingPacketSize
982        );
983        assert_eq!(
984            err("mqtt://host:42?client_id=foo&max_outgoing_packet_size_bytes=foo"),
985            OptionError::MaxOutgoingPacketSize
986        );
987        assert_eq!(
988            err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
989            OptionError::RequestChannelCapacity
990        );
991        assert_eq!(
992            err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
993            OptionError::MaxRequestBatch
994        );
995        assert_eq!(
996            err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
997            OptionError::PendingThrottle
998        );
999        assert_eq!(
1000            err("mqtt://host:42?client_id=foo&inflight_num=foo"),
1001            OptionError::Inflight
1002        );
1003    }
1004
1005    #[test]
1006    fn accept_empty_client_id() {
1007        let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
1008    }
1009
1010    #[test]
1011    fn set_clean_session_when_client_id_present() {
1012        let mut options = MqttOptions::new("client_id", "127.0.0.1", 1883);
1013        options.set_clean_session(false);
1014        options.set_clean_session(true);
1015    }
1016}