rumqttc/v5/
mod.rs

1use bytes::Bytes;
2use std::fmt::{self, Debug, Formatter};
3use std::time::Duration;
4#[cfg(feature = "websocket")]
5use std::{
6    future::{Future, IntoFuture},
7    pin::Pin,
8    sync::Arc,
9};
10
11mod client;
12mod eventloop;
13mod framed;
14pub mod mqttbytes;
15mod state;
16
17use crate::Outgoing;
18use crate::{NetworkOptions, Transport};
19
20use mqttbytes::v5::*;
21
22pub use client::{AsyncClient, Client, ClientError, Connection, Iter};
23pub use eventloop::{ConnectionError, Event, EventLoop};
24pub use state::{MqttState, StateError};
25
26#[cfg(feature = "use-rustls")]
27pub use crate::tls::Error as TlsError;
28
29#[cfg(feature = "proxy")]
30pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
31
32pub type Incoming = Packet;
33
34/// Requests by the client to mqtt event loop. Request are
35/// handled one by one.
36#[derive(Clone, Debug, PartialEq, Eq)]
37pub enum Request {
38    Publish(Publish),
39    PubAck(PubAck),
40    PubRec(PubRec),
41    PubComp(PubComp),
42    PubRel(PubRel),
43    PingReq,
44    PingResp,
45    Subscribe(Subscribe),
46    SubAck(SubAck),
47    Unsubscribe(Unsubscribe),
48    UnsubAck(UnsubAck),
49    Disconnect,
50}
51
52#[cfg(feature = "websocket")]
53type RequestModifierFn = Arc<
54    dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
55        + Send
56        + Sync,
57>;
58
59// TODO: Should all the options be exposed as public? Drawback
60// would be loosing the ability to panic when the user options
61// are wrong (e.g empty client id) or aggressive (keep alive time)
62/// Options to configure the behaviour of MQTT connection
63#[derive(Clone)]
64pub struct MqttOptions {
65    /// broker address that you want to connect to
66    broker_addr: String,
67    /// broker port
68    port: u16,
69    // What transport protocol to use
70    transport: Transport,
71    /// keep alive time to send pingreq to broker when the connection is idle
72    keep_alive: Duration,
73    /// clean (or) persistent session
74    clean_start: bool,
75    /// client identifier
76    client_id: String,
77    /// username and password
78    credentials: Option<(String, String)>,
79    /// request (publish, subscribe) channel capacity
80    request_channel_capacity: usize,
81    /// Max internal request batching
82    max_request_batch: usize,
83    /// Minimum delay time between consecutive outgoing packets
84    /// while retransmitting pending packets
85    pending_throttle: Duration,
86    /// Last will that will be issued on unexpected disconnect
87    last_will: Option<LastWill>,
88    /// Connection timeout
89    conn_timeout: u64,
90    /// Default value of for maximum incoming packet size.
91    /// Used when `max_incomming_size` in `connect_properties` is NOT available.
92    default_max_incoming_size: usize,
93    /// Connect Properties
94    connect_properties: Option<ConnectProperties>,
95    /// If set to `true` MQTT acknowledgements are not sent automatically.
96    /// Every incoming publish packet must be manually acknowledged with `client.ack(...)` method.
97    manual_acks: bool,
98    network_options: NetworkOptions,
99    #[cfg(feature = "proxy")]
100    /// Proxy configuration.
101    proxy: Option<Proxy>,
102    /// Upper limit on maximum number of inflight requests.
103    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
104    outgoing_inflight_upper_limit: Option<u16>,
105    #[cfg(feature = "websocket")]
106    request_modifier: Option<RequestModifierFn>,
107}
108
109impl MqttOptions {
110    /// Create an [`MqttOptions`] object that contains default values for all settings other than
111    /// - id: A string to identify the device connecting to a broker
112    /// - host: The broker's domain name or IP address
113    /// - port: The port number on which broker must be listening for incoming connections
114    ///
115    /// ```
116    /// # use rumqttc::v5::MqttOptions;
117    /// let options = MqttOptions::new("123", "localhost", 1883);
118    /// ```
119    pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
120        MqttOptions {
121            broker_addr: host.into(),
122            port,
123            transport: Transport::tcp(),
124            keep_alive: Duration::from_secs(60),
125            clean_start: true,
126            client_id: id.into(),
127            credentials: None,
128            request_channel_capacity: 10,
129            max_request_batch: 0,
130            pending_throttle: Duration::from_micros(0),
131            last_will: None,
132            conn_timeout: 5,
133            default_max_incoming_size: 10 * 1024,
134            connect_properties: None,
135            manual_acks: false,
136            network_options: NetworkOptions::new(),
137            #[cfg(feature = "proxy")]
138            proxy: None,
139            outgoing_inflight_upper_limit: None,
140            #[cfg(feature = "websocket")]
141            request_modifier: None,
142        }
143    }
144
145    #[cfg(feature = "url")]
146    /// Creates an [`MqttOptions`] object by parsing provided string with the [url] crate's
147    /// [`Url::parse(url)`](url::Url::parse) method and is only enabled when run using the "url" feature.
148    ///
149    /// ```
150    /// # use rumqttc::MqttOptions;
151    /// let options = MqttOptions::parse_url("mqtt://example.com:1883?client_id=123").unwrap();
152    /// ```
153    ///
154    /// **NOTE:** A url must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`,
155    /// `ws://` or `wss://` to denote the protocol for establishing a connection with the broker.
156    ///
157    /// **NOTE:** Encrypted connections(i.e. `mqtts://`, `ssl://`, `wss://`) by default use the
158    /// system's root certificates. To configure with custom certificates, one may use the
159    /// [`set_transport`](MqttOptions::set_transport) method.
160    ///
161    /// ```ignore
162    /// # use rumqttc::{MqttOptions, Transport};
163    /// # use tokio_rustls::rustls::ClientConfig;
164    /// # let root_cert_store = rustls::RootCertStore::empty();
165    /// # let client_config = ClientConfig::builder()
166    /// #    .with_root_certificates(root_cert_store)
167    /// #    .with_no_client_auth();
168    /// let mut options = MqttOptions::parse_url("mqtts://example.com?client_id=123").unwrap();
169    /// options.set_transport(Transport::tls_with_config(client_config.into()));
170    /// ```
171    pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
172        let url = url::Url::parse(&url.into())?;
173        let options = MqttOptions::try_from(url)?;
174
175        Ok(options)
176    }
177
178    /// Broker address
179    pub fn broker_address(&self) -> (String, u16) {
180        (self.broker_addr.clone(), self.port)
181    }
182
183    pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
184        self.last_will = Some(will);
185        self
186    }
187
188    pub fn last_will(&self) -> Option<LastWill> {
189        self.last_will.clone()
190    }
191
192    #[cfg(feature = "websocket")]
193    pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
194    where
195        F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
196        O: IntoFuture<Output = http::Request<()>> + 'static,
197        O::IntoFuture: Send,
198    {
199        self.request_modifier = Some(Arc::new(move |request| {
200            let request_modifier = request_modifier(request).into_future();
201            Box::pin(request_modifier)
202        }));
203
204        self
205    }
206
207    #[cfg(feature = "websocket")]
208    pub fn request_modifier(&self) -> Option<RequestModifierFn> {
209        self.request_modifier.clone()
210    }
211
212    pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
213        self.transport = transport;
214        self
215    }
216
217    pub fn transport(&self) -> Transport {
218        self.transport.clone()
219    }
220
221    /// Set number of seconds after which client should ping the broker
222    /// if there is no other data exchange
223    pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
224        assert!(duration.as_secs() >= 5, "Keep alives should be >= 5 secs");
225
226        self.keep_alive = duration;
227        self
228    }
229
230    /// Keep alive time
231    pub fn keep_alive(&self) -> Duration {
232        self.keep_alive
233    }
234
235    /// Client identifier
236    pub fn client_id(&self) -> String {
237        self.client_id.clone()
238    }
239
240    /// `clean_start = true` removes all the state from queues & instructs the broker
241    /// to clean all the client state when client disconnects.
242    ///
243    /// When set `false`, broker will hold the client state and performs pending
244    /// operations on the client when reconnection with same `client_id`
245    /// happens. Local queue state is also held to retransmit packets after reconnection.
246    pub fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
247        self.clean_start = clean_start;
248        self
249    }
250
251    /// Clean session
252    pub fn clean_start(&self) -> bool {
253        self.clean_start
254    }
255
256    /// Username and password
257    pub fn set_credentials<U: Into<String>, P: Into<String>>(
258        &mut self,
259        username: U,
260        password: P,
261    ) -> &mut Self {
262        self.credentials = Some((username.into(), password.into()));
263        self
264    }
265
266    /// Security options
267    pub fn credentials(&self) -> Option<(String, String)> {
268        self.credentials.clone()
269    }
270
271    /// Set request channel capacity
272    pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
273        self.request_channel_capacity = capacity;
274        self
275    }
276
277    /// Request channel capacity
278    pub fn request_channel_capacity(&self) -> usize {
279        self.request_channel_capacity
280    }
281
282    /// Enables throttling and sets outoing message rate to the specified 'rate'
283    pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
284        self.pending_throttle = duration;
285        self
286    }
287
288    /// Outgoing message rate
289    pub fn pending_throttle(&self) -> Duration {
290        self.pending_throttle
291    }
292
293    /// set connection timeout in secs
294    pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
295        self.conn_timeout = timeout;
296        self
297    }
298
299    /// get timeout in secs
300    pub fn connection_timeout(&self) -> u64 {
301        self.conn_timeout
302    }
303
304    /// set connection properties
305    pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
306        self.connect_properties = Some(properties);
307        self
308    }
309
310    /// get connection properties
311    pub fn connect_properties(&self) -> Option<ConnectProperties> {
312        self.connect_properties.clone()
313    }
314
315    /// set receive maximum on connection properties
316    pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
317        if let Some(conn_props) = &mut self.connect_properties {
318            conn_props.receive_maximum = recv_max;
319            self
320        } else {
321            let mut conn_props = ConnectProperties::new();
322            conn_props.receive_maximum = recv_max;
323            self.set_connect_properties(conn_props)
324        }
325    }
326
327    /// get receive maximum from connection properties
328    pub fn receive_maximum(&self) -> Option<u16> {
329        if let Some(conn_props) = &self.connect_properties {
330            conn_props.receive_maximum
331        } else {
332            None
333        }
334    }
335
336    /// set max packet size on connection properties
337    pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
338        if let Some(conn_props) = &mut self.connect_properties {
339            conn_props.max_packet_size = max_size;
340            self
341        } else {
342            let mut conn_props = ConnectProperties::new();
343            conn_props.max_packet_size = max_size;
344            self.set_connect_properties(conn_props)
345        }
346    }
347
348    /// get max packet size from connection properties
349    pub fn max_packet_size(&self) -> Option<u32> {
350        if let Some(conn_props) = &self.connect_properties {
351            conn_props.max_packet_size
352        } else {
353            None
354        }
355    }
356
357    /// set max topic alias on connection properties
358    pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
359        if let Some(conn_props) = &mut self.connect_properties {
360            conn_props.topic_alias_max = topic_alias_max;
361            self
362        } else {
363            let mut conn_props = ConnectProperties::new();
364            conn_props.topic_alias_max = topic_alias_max;
365            self.set_connect_properties(conn_props)
366        }
367    }
368
369    /// get max topic alias from connection properties
370    pub fn topic_alias_max(&self) -> Option<u16> {
371        if let Some(conn_props) = &self.connect_properties {
372            conn_props.topic_alias_max
373        } else {
374            None
375        }
376    }
377
378    /// set request response info on connection properties
379    pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
380        if let Some(conn_props) = &mut self.connect_properties {
381            conn_props.request_response_info = request_response_info;
382            self
383        } else {
384            let mut conn_props = ConnectProperties::new();
385            conn_props.request_response_info = request_response_info;
386            self.set_connect_properties(conn_props)
387        }
388    }
389
390    /// get request response info from connection properties
391    pub fn request_response_info(&self) -> Option<u8> {
392        if let Some(conn_props) = &self.connect_properties {
393            conn_props.request_response_info
394        } else {
395            None
396        }
397    }
398
399    /// set request problem info on connection properties
400    pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
401        if let Some(conn_props) = &mut self.connect_properties {
402            conn_props.request_problem_info = request_problem_info;
403            self
404        } else {
405            let mut conn_props = ConnectProperties::new();
406            conn_props.request_problem_info = request_problem_info;
407            self.set_connect_properties(conn_props)
408        }
409    }
410
411    /// get request problem info from connection properties
412    pub fn request_problem_info(&self) -> Option<u8> {
413        if let Some(conn_props) = &self.connect_properties {
414            conn_props.request_problem_info
415        } else {
416            None
417        }
418    }
419
420    /// set user properties on connection properties
421    pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
422        if let Some(conn_props) = &mut self.connect_properties {
423            conn_props.user_properties = user_properties;
424            self
425        } else {
426            let mut conn_props = ConnectProperties::new();
427            conn_props.user_properties = user_properties;
428            self.set_connect_properties(conn_props)
429        }
430    }
431
432    /// get user properties from connection properties
433    pub fn user_properties(&self) -> Vec<(String, String)> {
434        if let Some(conn_props) = &self.connect_properties {
435            conn_props.user_properties.clone()
436        } else {
437            Vec::new()
438        }
439    }
440
441    /// set authentication method on connection properties
442    pub fn set_authentication_method(
443        &mut self,
444        authentication_method: Option<String>,
445    ) -> &mut Self {
446        if let Some(conn_props) = &mut self.connect_properties {
447            conn_props.authentication_method = authentication_method;
448            self
449        } else {
450            let mut conn_props = ConnectProperties::new();
451            conn_props.authentication_method = authentication_method;
452            self.set_connect_properties(conn_props)
453        }
454    }
455
456    /// get authentication method from connection properties
457    pub fn authentication_method(&self) -> Option<String> {
458        if let Some(conn_props) = &self.connect_properties {
459            conn_props.authentication_method.clone()
460        } else {
461            None
462        }
463    }
464
465    /// set authentication data on connection properties
466    pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
467        if let Some(conn_props) = &mut self.connect_properties {
468            conn_props.authentication_data = authentication_data;
469            self
470        } else {
471            let mut conn_props = ConnectProperties::new();
472            conn_props.authentication_data = authentication_data;
473            self.set_connect_properties(conn_props)
474        }
475    }
476
477    /// get authentication data from connection properties
478    pub fn authentication_data(&self) -> Option<Bytes> {
479        if let Some(conn_props) = &self.connect_properties {
480            conn_props.authentication_data.clone()
481        } else {
482            None
483        }
484    }
485
486    /// set manual acknowledgements
487    pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
488        self.manual_acks = manual_acks;
489        self
490    }
491
492    /// get manual acknowledgements
493    pub fn manual_acks(&self) -> bool {
494        self.manual_acks
495    }
496
497    pub fn network_options(&self) -> NetworkOptions {
498        self.network_options.clone()
499    }
500
501    pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
502        self.network_options = network_options;
503        self
504    }
505
506    #[cfg(feature = "proxy")]
507    pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
508        self.proxy = Some(proxy);
509        self
510    }
511
512    #[cfg(feature = "proxy")]
513    pub fn proxy(&self) -> Option<Proxy> {
514        self.proxy.clone()
515    }
516
517    /// Get the upper limit on maximum number of inflight outgoing publishes.
518    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
519    pub fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
520        self.outgoing_inflight_upper_limit = Some(limit);
521        self
522    }
523
524    /// Set the upper limit on maximum number of inflight outgoing publishes.
525    /// The server may set its own maximum inflight limit, the smaller of the two will be used.
526    pub fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
527        self.outgoing_inflight_upper_limit
528    }
529}
530
531#[cfg(feature = "url")]
532#[derive(Debug, PartialEq, Eq, thiserror::Error)]
533pub enum OptionError {
534    #[error("Unsupported URL scheme.")]
535    Scheme,
536
537    #[error("Missing client ID.")]
538    ClientId,
539
540    #[error("Invalid keep-alive value.")]
541    KeepAlive,
542
543    #[error("Invalid clean-start value.")]
544    CleanStart,
545
546    #[error("Invalid max-incoming-packet-size value.")]
547    MaxIncomingPacketSize,
548
549    #[error("Invalid max-outgoing-packet-size value.")]
550    MaxOutgoingPacketSize,
551
552    #[error("Invalid request-channel-capacity value.")]
553    RequestChannelCapacity,
554
555    #[error("Invalid max-request-batch value.")]
556    MaxRequestBatch,
557
558    #[error("Invalid pending-throttle value.")]
559    PendingThrottle,
560
561    #[error("Invalid inflight value.")]
562    Inflight,
563
564    #[error("Invalid conn-timeout value.")]
565    ConnTimeout,
566
567    #[error("Unknown option: {0}")]
568    Unknown(String),
569
570    #[error("Couldn't parse option from url: {0}")]
571    Parse(#[from] url::ParseError),
572}
573
574#[cfg(feature = "url")]
575impl std::convert::TryFrom<url::Url> for MqttOptions {
576    type Error = OptionError;
577
578    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
579        use std::collections::HashMap;
580
581        let host = url.host_str().unwrap_or_default().to_owned();
582
583        let (transport, default_port) = match url.scheme() {
584            // Encrypted connections are supported, but require explicit TLS configuration. We fall
585            // back to the unencrypted transport layer, so that `set_transport` can be used to
586            // configure the encrypted transport layer with the provided TLS configuration.
587            #[cfg(feature = "use-rustls")]
588            "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
589            "mqtt" | "tcp" => (Transport::Tcp, 1883),
590            #[cfg(feature = "websocket")]
591            "ws" => (Transport::Ws, 8000),
592            #[cfg(all(feature = "use-rustls", feature = "websocket"))]
593            "wss" => (Transport::wss_with_default_config(), 8000),
594            _ => return Err(OptionError::Scheme),
595        };
596
597        let port = url.port().unwrap_or(default_port);
598
599        let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
600
601        let id = queries
602            .remove("client_id")
603            .ok_or(OptionError::ClientId)?
604            .into_owned();
605
606        let mut options = MqttOptions::new(id, host, port);
607        let mut connect_props = ConnectProperties::new();
608        options.set_transport(transport);
609
610        if let Some(keep_alive) = queries
611            .remove("keep_alive_secs")
612            .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
613            .transpose()?
614        {
615            options.set_keep_alive(Duration::from_secs(keep_alive));
616        }
617
618        if let Some(clean_start) = queries
619            .remove("clean_start")
620            .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
621            .transpose()?
622        {
623            options.set_clean_start(clean_start);
624        }
625
626        if let Some((username, password)) = {
627            match url.username() {
628                "" => None,
629                username => Some((
630                    username.to_owned(),
631                    url.password().unwrap_or_default().to_owned(),
632                )),
633            }
634        } {
635            options.set_credentials(username, password);
636        }
637
638        connect_props.max_packet_size = queries
639            .remove("max_incoming_packet_size_bytes")
640            .map(|v| {
641                v.parse::<u32>()
642                    .map_err(|_| OptionError::MaxIncomingPacketSize)
643            })
644            .transpose()?;
645
646        if let Some(request_channel_capacity) = queries
647            .remove("request_channel_capacity_num")
648            .map(|v| {
649                v.parse::<usize>()
650                    .map_err(|_| OptionError::RequestChannelCapacity)
651            })
652            .transpose()?
653        {
654            options.request_channel_capacity = request_channel_capacity;
655        }
656
657        if let Some(max_request_batch) = queries
658            .remove("max_request_batch_num")
659            .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
660            .transpose()?
661        {
662            options.max_request_batch = max_request_batch;
663        }
664
665        if let Some(pending_throttle) = queries
666            .remove("pending_throttle_usecs")
667            .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
668            .transpose()?
669        {
670            options.set_pending_throttle(Duration::from_micros(pending_throttle));
671        }
672
673        connect_props.receive_maximum = queries
674            .remove("inflight_num")
675            .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
676            .transpose()?;
677
678        if let Some(conn_timeout) = queries
679            .remove("conn_timeout_secs")
680            .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
681            .transpose()?
682        {
683            options.set_connection_timeout(conn_timeout);
684        }
685
686        if let Some((opt, _)) = queries.into_iter().next() {
687            return Err(OptionError::Unknown(opt.into_owned()));
688        }
689
690        options.connect_properties = Some(connect_props);
691        Ok(options)
692    }
693}
694
695// Implement Debug manually because ClientConfig doesn't implement it, so derive(Debug) doesn't
696// work.
697impl Debug for MqttOptions {
698    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
699        f.debug_struct("MqttOptions")
700            .field("broker_addr", &self.broker_addr)
701            .field("port", &self.port)
702            .field("keep_alive", &self.keep_alive)
703            .field("clean_start", &self.clean_start)
704            .field("client_id", &self.client_id)
705            .field("credentials", &self.credentials)
706            .field("request_channel_capacity", &self.request_channel_capacity)
707            .field("max_request_batch", &self.max_request_batch)
708            .field("pending_throttle", &self.pending_throttle)
709            .field("last_will", &self.last_will)
710            .field("conn_timeout", &self.conn_timeout)
711            .field("manual_acks", &self.manual_acks)
712            .field("connect properties", &self.connect_properties)
713            .finish()
714    }
715}
716
717#[cfg(test)]
718mod test {
719    use super::*;
720
721    #[test]
722    #[cfg(all(feature = "use-rustls", feature = "websocket"))]
723    fn no_scheme() {
724        use crate::{TlsConfiguration, Transport};
725        let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
726
727        mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
728
729        if let Transport::Wss(TlsConfiguration::Simple {
730            ca,
731            client_auth,
732            alpn,
733        }) = mqttoptions.transport
734        {
735            assert_eq!(ca, Vec::from("Test CA"));
736            assert_eq!(client_auth, None);
737            assert_eq!(alpn, None);
738        } else {
739            panic!("Unexpected transport!");
740        }
741
742        assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
743    }
744
745    #[test]
746    #[cfg(feature = "url")]
747    fn from_url() {
748        fn opt(s: &str) -> Result<MqttOptions, OptionError> {
749            MqttOptions::parse_url(s)
750        }
751        fn ok(s: &str) -> MqttOptions {
752            opt(s).expect("valid options")
753        }
754        fn err(s: &str) -> OptionError {
755            opt(s).expect_err("invalid options")
756        }
757
758        let v = ok("mqtt://host:42?client_id=foo");
759        assert_eq!(v.broker_address(), ("host".to_owned(), 42));
760        assert_eq!(v.client_id(), "foo".to_owned());
761
762        let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
763        assert_eq!(v.keep_alive, Duration::from_secs(5));
764
765        assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
766        assert_eq!(
767            err("mqtt://host:42?client_id=foo&foo=bar"),
768            OptionError::Unknown("foo".to_owned())
769        );
770        assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
771        assert_eq!(
772            err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
773            OptionError::KeepAlive
774        );
775        assert_eq!(
776            err("mqtt://host:42?client_id=foo&clean_start=foo"),
777            OptionError::CleanStart
778        );
779        assert_eq!(
780            err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
781            OptionError::MaxIncomingPacketSize
782        );
783        assert_eq!(
784            err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
785            OptionError::RequestChannelCapacity
786        );
787        assert_eq!(
788            err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
789            OptionError::MaxRequestBatch
790        );
791        assert_eq!(
792            err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
793            OptionError::PendingThrottle
794        );
795        assert_eq!(
796            err("mqtt://host:42?client_id=foo&inflight_num=foo"),
797            OptionError::Inflight
798        );
799        assert_eq!(
800            err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
801            OptionError::ConnTimeout
802        );
803    }
804
805    #[test]
806    fn allow_empty_client_id() {
807        let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_start(true);
808    }
809}