1#![cfg_attr(docsrs, feature(doc_cfg))]
97
98#[macro_use]
99extern crate log;
100
101use std::fmt::{self, Debug, Formatter};
102
103#[cfg(any(feature = "use-rustls", feature = "websocket"))]
104use std::sync::Arc;
105
106use std::time::Duration;
107
108mod client;
109mod eventloop;
110mod framed;
111pub mod mqttbytes;
112mod state;
113pub mod v5;
114
115#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
116mod tls;
117
118#[cfg(feature = "websocket")]
119mod websockets;
120
121#[cfg(feature = "websocket")]
122use std::{
123 future::{Future, IntoFuture},
124 pin::Pin,
125};
126
127#[cfg(feature = "websocket")]
128type RequestModifierFn = Arc<
129 dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
130 + Send
131 + Sync,
132>;
133
134#[cfg(feature = "proxy")]
135mod proxy;
136
137pub use client::{
138 AsyncClient, Client, ClientError, Connection, Iter, RecvError, RecvTimeoutError, TryRecvError,
139};
140pub use eventloop::{ConnectionError, Event, EventLoop};
141pub use mqttbytes::v4::*;
142pub use mqttbytes::*;
143#[cfg(feature = "use-rustls")]
144use rustls_native_certs::load_native_certs;
145pub use state::{MqttState, StateError};
146#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
147pub use tls::Error as TlsError;
148#[cfg(feature = "use-rustls")]
149pub use tokio_rustls;
150#[cfg(feature = "use-rustls")]
151use tokio_rustls::rustls::{ClientConfig, RootCertStore};
152
153#[cfg(feature = "proxy")]
154pub use proxy::{Proxy, ProxyAuth, ProxyType};
155
156pub type Incoming = Packet;
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub enum Outgoing {
161 Publish(u16),
163 Subscribe(u16),
165 Unsubscribe(u16),
167 PubAck(u16),
169 PubRec(u16),
171 PubRel(u16),
173 PubComp(u16),
175 PingReq,
177 PingResp,
179 Disconnect,
181 AwaitAck(u16),
183}
184
185#[derive(Clone, Debug, PartialEq, Eq)]
188pub enum Request {
189 Publish(Publish),
190 PubAck(PubAck),
191 PubRec(PubRec),
192 PubComp(PubComp),
193 PubRel(PubRel),
194 PingReq(PingReq),
195 PingResp(PingResp),
196 Subscribe(Subscribe),
197 SubAck(SubAck),
198 Unsubscribe(Unsubscribe),
199 UnsubAck(UnsubAck),
200 Disconnect(Disconnect),
201}
202
203impl Request {
204 fn size(&self) -> usize {
205 match &self {
206 Request::Publish(publish) => publish.size(),
207 Request::PubAck(puback) => puback.size(),
208 Request::PubRec(pubrec) => pubrec.size(),
209 Request::PubComp(pubcomp) => pubcomp.size(),
210 Request::PubRel(pubrel) => pubrel.size(),
211 Request::PingReq(pingreq) => pingreq.size(),
212 Request::PingResp(pingresp) => pingresp.size(),
213 Request::Subscribe(subscribe) => subscribe.size(),
214 Request::SubAck(suback) => suback.size(),
215 Request::Unsubscribe(unsubscribe) => unsubscribe.size(),
216 Request::UnsubAck(unsuback) => unsuback.size(),
217 Request::Disconnect(disconn) => disconn.size(),
218 }
219 }
220}
221
222impl From<Publish> for Request {
223 fn from(publish: Publish) -> Request {
224 Request::Publish(publish)
225 }
226}
227
228impl From<Subscribe> for Request {
229 fn from(subscribe: Subscribe) -> Request {
230 Request::Subscribe(subscribe)
231 }
232}
233
234impl From<Unsubscribe> for Request {
235 fn from(unsubscribe: Unsubscribe) -> Request {
236 Request::Unsubscribe(unsubscribe)
237 }
238}
239
240#[derive(Clone)]
242pub enum Transport {
243 Tcp,
244 #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
245 Tls(TlsConfiguration),
246 #[cfg(unix)]
247 Unix,
248 #[cfg(feature = "websocket")]
249 #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
250 Ws,
251 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
252 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
253 Wss(TlsConfiguration),
254}
255
256impl Default for Transport {
257 fn default() -> Self {
258 Self::tcp()
259 }
260}
261
262impl Transport {
263 pub fn tcp() -> Self {
265 Self::Tcp
266 }
267
268 #[cfg(feature = "use-rustls")]
269 pub fn tls_with_default_config() -> Self {
270 Self::tls_with_config(Default::default())
271 }
272
273 #[cfg(feature = "use-rustls")]
275 pub fn tls(
276 ca: Vec<u8>,
277 client_auth: Option<(Vec<u8>, Vec<u8>)>,
278 alpn: Option<Vec<Vec<u8>>>,
279 ) -> Self {
280 let config = TlsConfiguration::Simple {
281 ca,
282 alpn,
283 client_auth,
284 };
285
286 Self::tls_with_config(config)
287 }
288
289 #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
290 pub fn tls_with_config(tls_config: TlsConfiguration) -> Self {
291 Self::Tls(tls_config)
292 }
293
294 #[cfg(unix)]
295 pub fn unix() -> Self {
296 Self::Unix
297 }
298
299 #[cfg(feature = "websocket")]
301 #[cfg_attr(docsrs, doc(cfg(feature = "websocket")))]
302 pub fn ws() -> Self {
303 Self::Ws
304 }
305
306 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
308 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
309 pub fn wss(
310 ca: Vec<u8>,
311 client_auth: Option<(Vec<u8>, Vec<u8>)>,
312 alpn: Option<Vec<Vec<u8>>>,
313 ) -> Self {
314 let config = TlsConfiguration::Simple {
315 ca,
316 client_auth,
317 alpn,
318 };
319
320 Self::wss_with_config(config)
321 }
322
323 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
324 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
325 pub fn wss_with_config(tls_config: TlsConfiguration) -> Self {
326 Self::Wss(tls_config)
327 }
328
329 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
330 #[cfg_attr(docsrs, doc(cfg(all(feature = "use-rustls", feature = "websocket"))))]
331 pub fn wss_with_default_config() -> Self {
332 Self::Wss(Default::default())
333 }
334}
335
336#[derive(Clone, Debug)]
338#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
339pub enum TlsConfiguration {
340 #[cfg(feature = "use-rustls")]
341 Simple {
342 ca: Vec<u8>,
344 alpn: Option<Vec<Vec<u8>>>,
346 client_auth: Option<(Vec<u8>, Vec<u8>)>,
348 },
349 #[cfg(feature = "use-native-tls")]
350 SimpleNative {
351 ca: Vec<u8>,
353 client_auth: Option<(Vec<u8>, String)>,
356 },
357 #[cfg(feature = "use-rustls")]
358 Rustls(Arc<ClientConfig>),
360 #[cfg(feature = "use-native-tls")]
361 Native,
362}
363
364#[cfg(feature = "use-rustls")]
365impl Default for TlsConfiguration {
366 fn default() -> Self {
367 let mut root_cert_store = RootCertStore::empty();
368 for cert in load_native_certs().expect("could not load platform certs") {
369 root_cert_store.add(cert).unwrap();
370 }
371 let tls_config = ClientConfig::builder()
372 .with_root_certificates(root_cert_store)
373 .with_no_client_auth();
374
375 Self::Rustls(Arc::new(tls_config))
376 }
377}
378
379#[cfg(feature = "use-rustls")]
380impl From<ClientConfig> for TlsConfiguration {
381 fn from(config: ClientConfig) -> Self {
382 TlsConfiguration::Rustls(Arc::new(config))
383 }
384}
385
386#[derive(Clone, Default)]
388pub struct NetworkOptions {
389 tcp_send_buffer_size: Option<u32>,
390 tcp_recv_buffer_size: Option<u32>,
391 conn_timeout: u64,
392 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
393 bind_device: Option<String>,
394}
395
396impl NetworkOptions {
397 pub fn new() -> Self {
398 NetworkOptions {
399 tcp_send_buffer_size: None,
400 tcp_recv_buffer_size: None,
401 conn_timeout: 5,
402 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
403 bind_device: None,
404 }
405 }
406
407 pub fn set_tcp_send_buffer_size(&mut self, size: u32) {
408 self.tcp_send_buffer_size = Some(size);
409 }
410
411 pub fn set_tcp_recv_buffer_size(&mut self, size: u32) {
412 self.tcp_recv_buffer_size = Some(size);
413 }
414
415 pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
417 self.conn_timeout = timeout;
418 self
419 }
420
421 pub fn connection_timeout(&self) -> u64 {
423 self.conn_timeout
424 }
425
426 #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
428 #[cfg_attr(
429 docsrs,
430 doc(cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))
431 )]
432 pub fn set_bind_device(&mut self, bind_device: &str) -> &mut Self {
433 self.bind_device = Some(bind_device.to_string());
434 self
435 }
436}
437
438#[derive(Clone)]
443pub struct MqttOptions {
444 broker_addr: String,
446 port: u16,
448 transport: Transport,
450 keep_alive: Duration,
452 clean_session: bool,
454 client_id: String,
456 credentials: Option<(String, String)>,
458 max_incoming_packet_size: usize,
460 max_outgoing_packet_size: usize,
462 request_channel_capacity: usize,
464 max_request_batch: usize,
466 pending_throttle: Duration,
469 inflight: u16,
471 last_will: Option<LastWill>,
473 manual_acks: bool,
476 #[cfg(feature = "proxy")]
477 proxy: Option<Proxy>,
479 #[cfg(feature = "websocket")]
480 request_modifier: Option<RequestModifierFn>,
481}
482
483impl MqttOptions {
484 pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
494 MqttOptions {
495 broker_addr: host.into(),
496 port,
497 transport: Transport::tcp(),
498 keep_alive: Duration::from_secs(60),
499 clean_session: true,
500 client_id: id.into(),
501 credentials: None,
502 max_incoming_packet_size: 10 * 1024,
503 max_outgoing_packet_size: 10 * 1024,
504 request_channel_capacity: 10,
505 max_request_batch: 0,
506 pending_throttle: Duration::from_micros(0),
507 inflight: 100,
508 last_will: None,
509 manual_acks: false,
510 #[cfg(feature = "proxy")]
511 proxy: None,
512 #[cfg(feature = "websocket")]
513 request_modifier: None,
514 }
515 }
516
517 #[cfg(feature = "url")]
518 pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
544 let url = url::Url::parse(&url.into())?;
545 let options = MqttOptions::try_from(url)?;
546
547 Ok(options)
548 }
549
550 pub fn broker_address(&self) -> (String, u16) {
552 (self.broker_addr.clone(), self.port)
553 }
554
555 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
556 self.last_will = Some(will);
557 self
558 }
559
560 pub fn last_will(&self) -> Option<LastWill> {
561 self.last_will.clone()
562 }
563
564 pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
565 self.transport = transport;
566 self
567 }
568
569 pub fn transport(&self) -> Transport {
570 self.transport.clone()
571 }
572
573 pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
576 assert!(
577 duration.is_zero() || duration >= Duration::from_secs(1),
578 "Keep alives should be specified in seconds. Durations less than \
579 a second are not allowed, except for Duration::ZERO."
580 );
581
582 self.keep_alive = duration;
583 self
584 }
585
586 pub fn keep_alive(&self) -> Duration {
588 self.keep_alive
589 }
590
591 pub fn client_id(&self) -> String {
593 self.client_id.clone()
594 }
595
596 pub fn set_max_packet_size(&mut self, incoming: usize, outgoing: usize) -> &mut Self {
598 self.max_incoming_packet_size = incoming;
599 self.max_outgoing_packet_size = outgoing;
600 self
601 }
602
603 pub fn max_packet_size(&self) -> usize {
605 self.max_incoming_packet_size
606 }
607
608 pub fn set_clean_session(&mut self, clean_session: bool) -> &mut Self {
625 assert!(
626 !self.client_id.is_empty() || clean_session,
627 "Cannot unset clean session when client id is empty"
628 );
629 self.clean_session = clean_session;
630 self
631 }
632
633 pub fn clean_session(&self) -> bool {
635 self.clean_session
636 }
637
638 pub fn set_credentials<U: Into<String>, P: Into<String>>(
640 &mut self,
641 username: U,
642 password: P,
643 ) -> &mut Self {
644 self.credentials = Some((username.into(), password.into()));
645 self
646 }
647
648 pub fn credentials(&self) -> Option<(String, String)> {
650 self.credentials.clone()
651 }
652
653 pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
655 self.request_channel_capacity = capacity;
656 self
657 }
658
659 pub fn request_channel_capacity(&self) -> usize {
661 self.request_channel_capacity
662 }
663
664 pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
666 self.pending_throttle = duration;
667 self
668 }
669
670 pub fn pending_throttle(&self) -> Duration {
672 self.pending_throttle
673 }
674
675 pub fn set_inflight(&mut self, inflight: u16) -> &mut Self {
677 assert!(inflight != 0, "zero in flight is not allowed");
678
679 self.inflight = inflight;
680 self
681 }
682
683 pub fn inflight(&self) -> u16 {
685 self.inflight
686 }
687
688 pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
690 self.manual_acks = manual_acks;
691 self
692 }
693
694 pub fn manual_acks(&self) -> bool {
696 self.manual_acks
697 }
698
699 #[cfg(feature = "proxy")]
700 pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
701 self.proxy = Some(proxy);
702 self
703 }
704
705 #[cfg(feature = "proxy")]
706 pub fn proxy(&self) -> Option<Proxy> {
707 self.proxy.clone()
708 }
709
710 #[cfg(feature = "websocket")]
711 pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
712 where
713 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
714 O: IntoFuture<Output = http::Request<()>> + 'static,
715 O::IntoFuture: Send,
716 {
717 self.request_modifier = Some(Arc::new(move |request| {
718 let request_modifier = request_modifier(request).into_future();
719 Box::pin(request_modifier)
720 }));
721
722 self
723 }
724
725 #[cfg(feature = "websocket")]
726 pub fn request_modifier(&self) -> Option<RequestModifierFn> {
727 self.request_modifier.clone()
728 }
729}
730
731#[cfg(feature = "url")]
732#[derive(Debug, PartialEq, Eq, thiserror::Error)]
733pub enum OptionError {
734 #[error("Unsupported URL scheme.")]
735 Scheme,
736
737 #[error("Missing client ID.")]
738 ClientId,
739
740 #[error("Invalid keep-alive value.")]
741 KeepAlive,
742
743 #[error("Invalid clean-session value.")]
744 CleanSession,
745
746 #[error("Invalid max-incoming-packet-size value.")]
747 MaxIncomingPacketSize,
748
749 #[error("Invalid max-outgoing-packet-size value.")]
750 MaxOutgoingPacketSize,
751
752 #[error("Invalid request-channel-capacity value.")]
753 RequestChannelCapacity,
754
755 #[error("Invalid max-request-batch value.")]
756 MaxRequestBatch,
757
758 #[error("Invalid pending-throttle value.")]
759 PendingThrottle,
760
761 #[error("Invalid inflight value.")]
762 Inflight,
763
764 #[error("Unknown option: {0}")]
765 Unknown(String),
766
767 #[error("Couldn't parse option from url: {0}")]
768 Parse(#[from] url::ParseError),
769}
770
771#[cfg(feature = "url")]
772impl std::convert::TryFrom<url::Url> for MqttOptions {
773 type Error = OptionError;
774
775 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
776 use std::collections::HashMap;
777
778 let host = url.host_str().unwrap_or_default().to_owned();
779
780 let (transport, default_port) = match url.scheme() {
781 #[cfg(feature = "use-rustls")]
785 "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
786 "mqtt" | "tcp" => (Transport::Tcp, 1883),
787 #[cfg(feature = "websocket")]
788 "ws" => (Transport::Ws, 8000),
789 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
790 "wss" => (Transport::wss_with_default_config(), 8000),
791 _ => return Err(OptionError::Scheme),
792 };
793
794 let port = url.port().unwrap_or(default_port);
795
796 let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
797
798 let id = queries
799 .remove("client_id")
800 .ok_or(OptionError::ClientId)?
801 .into_owned();
802
803 let mut options = MqttOptions::new(id, host, port);
804 options.set_transport(transport);
805
806 if let Some(keep_alive) = queries
807 .remove("keep_alive_secs")
808 .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
809 .transpose()?
810 {
811 options.set_keep_alive(Duration::from_secs(keep_alive));
812 }
813
814 if let Some(clean_session) = queries
815 .remove("clean_session")
816 .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanSession))
817 .transpose()?
818 {
819 options.set_clean_session(clean_session);
820 }
821
822 if let Some((username, password)) = {
823 match url.username() {
824 "" => None,
825 username => Some((
826 username.to_owned(),
827 url.password().unwrap_or_default().to_owned(),
828 )),
829 }
830 } {
831 options.set_credentials(username, password);
832 }
833
834 if let (Some(incoming), Some(outgoing)) = (
835 queries
836 .remove("max_incoming_packet_size_bytes")
837 .map(|v| {
838 v.parse::<usize>()
839 .map_err(|_| OptionError::MaxIncomingPacketSize)
840 })
841 .transpose()?,
842 queries
843 .remove("max_outgoing_packet_size_bytes")
844 .map(|v| {
845 v.parse::<usize>()
846 .map_err(|_| OptionError::MaxOutgoingPacketSize)
847 })
848 .transpose()?,
849 ) {
850 options.set_max_packet_size(incoming, outgoing);
851 }
852
853 if let Some(request_channel_capacity) = queries
854 .remove("request_channel_capacity_num")
855 .map(|v| {
856 v.parse::<usize>()
857 .map_err(|_| OptionError::RequestChannelCapacity)
858 })
859 .transpose()?
860 {
861 options.request_channel_capacity = request_channel_capacity;
862 }
863
864 if let Some(max_request_batch) = queries
865 .remove("max_request_batch_num")
866 .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
867 .transpose()?
868 {
869 options.max_request_batch = max_request_batch;
870 }
871
872 if let Some(pending_throttle) = queries
873 .remove("pending_throttle_usecs")
874 .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
875 .transpose()?
876 {
877 options.set_pending_throttle(Duration::from_micros(pending_throttle));
878 }
879
880 if let Some(inflight) = queries
881 .remove("inflight_num")
882 .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
883 .transpose()?
884 {
885 options.set_inflight(inflight);
886 }
887
888 if let Some((opt, _)) = queries.into_iter().next() {
889 return Err(OptionError::Unknown(opt.into_owned()));
890 }
891
892 Ok(options)
893 }
894}
895
896impl Debug for MqttOptions {
899 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
900 f.debug_struct("MqttOptions")
901 .field("broker_addr", &self.broker_addr)
902 .field("port", &self.port)
903 .field("keep_alive", &self.keep_alive)
904 .field("clean_session", &self.clean_session)
905 .field("client_id", &self.client_id)
906 .field("credentials", &self.credentials)
907 .field("max_packet_size", &self.max_incoming_packet_size)
908 .field("request_channel_capacity", &self.request_channel_capacity)
909 .field("max_request_batch", &self.max_request_batch)
910 .field("pending_throttle", &self.pending_throttle)
911 .field("inflight", &self.inflight)
912 .field("last_will", &self.last_will)
913 .field("manual_acks", &self.manual_acks)
914 .finish()
915 }
916}
917
918#[cfg(test)]
919mod test {
920 use super::*;
921
922 #[test]
923 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
924 fn no_scheme() {
925 let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
926
927 mqttoptions.set_transport(crate::Transport::wss(Vec::from("Test CA"), None, None));
928
929 if let crate::Transport::Wss(TlsConfiguration::Simple {
930 ca,
931 client_auth,
932 alpn,
933 }) = mqttoptions.transport
934 {
935 assert_eq!(ca, Vec::from("Test CA"));
936 assert_eq!(client_auth, None);
937 assert_eq!(alpn, None);
938 } else {
939 panic!("Unexpected transport!");
940 }
941
942 assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
943 }
944
945 #[test]
946 #[cfg(feature = "url")]
947 fn from_url() {
948 fn opt(s: &str) -> Result<MqttOptions, OptionError> {
949 MqttOptions::parse_url(s)
950 }
951 fn ok(s: &str) -> MqttOptions {
952 opt(s).expect("valid options")
953 }
954 fn err(s: &str) -> OptionError {
955 opt(s).expect_err("invalid options")
956 }
957
958 let v = ok("mqtt://host:42?client_id=foo");
959 assert_eq!(v.broker_address(), ("host".to_owned(), 42));
960 assert_eq!(v.client_id(), "foo".to_owned());
961
962 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
963 assert_eq!(v.keep_alive, Duration::from_secs(5));
964
965 assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
966 assert_eq!(
967 err("mqtt://host:42?client_id=foo&foo=bar"),
968 OptionError::Unknown("foo".to_owned())
969 );
970 assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
971 assert_eq!(
972 err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
973 OptionError::KeepAlive
974 );
975 assert_eq!(
976 err("mqtt://host:42?client_id=foo&clean_session=foo"),
977 OptionError::CleanSession
978 );
979 assert_eq!(
980 err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
981 OptionError::MaxIncomingPacketSize
982 );
983 assert_eq!(
984 err("mqtt://host:42?client_id=foo&max_outgoing_packet_size_bytes=foo"),
985 OptionError::MaxOutgoingPacketSize
986 );
987 assert_eq!(
988 err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
989 OptionError::RequestChannelCapacity
990 );
991 assert_eq!(
992 err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
993 OptionError::MaxRequestBatch
994 );
995 assert_eq!(
996 err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
997 OptionError::PendingThrottle
998 );
999 assert_eq!(
1000 err("mqtt://host:42?client_id=foo&inflight_num=foo"),
1001 OptionError::Inflight
1002 );
1003 }
1004
1005 #[test]
1006 fn accept_empty_client_id() {
1007 let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_session(true);
1008 }
1009
1010 #[test]
1011 fn set_clean_session_when_client_id_present() {
1012 let mut options = MqttOptions::new("client_id", "127.0.0.1", 1883);
1013 options.set_clean_session(false);
1014 options.set_clean_session(true);
1015 }
1016}