1use bytes::Bytes;
2use std::fmt::{self, Debug, Formatter};
3use std::time::Duration;
4#[cfg(feature = "websocket")]
5use std::{
6 future::{Future, IntoFuture},
7 pin::Pin,
8 sync::Arc,
9};
10
11mod client;
12mod eventloop;
13mod framed;
14pub mod mqttbytes;
15mod state;
16
17use crate::Outgoing;
18use crate::{NetworkOptions, Transport};
19
20use mqttbytes::v5::*;
21
22pub use client::{AsyncClient, Client, ClientError, Connection, Iter};
23pub use eventloop::{ConnectionError, Event, EventLoop};
24pub use state::{MqttState, StateError};
25
26#[cfg(feature = "use-rustls")]
27pub use crate::tls::Error as TlsError;
28
29#[cfg(feature = "proxy")]
30pub use crate::proxy::{Proxy, ProxyAuth, ProxyType};
31
32pub type Incoming = Packet;
33
34#[derive(Clone, Debug, PartialEq, Eq)]
37pub enum Request {
38 Publish(Publish),
39 PubAck(PubAck),
40 PubRec(PubRec),
41 PubComp(PubComp),
42 PubRel(PubRel),
43 PingReq,
44 PingResp,
45 Subscribe(Subscribe),
46 SubAck(SubAck),
47 Unsubscribe(Unsubscribe),
48 UnsubAck(UnsubAck),
49 Disconnect,
50}
51
52#[cfg(feature = "websocket")]
53type RequestModifierFn = Arc<
54 dyn Fn(http::Request<()>) -> Pin<Box<dyn Future<Output = http::Request<()>> + Send>>
55 + Send
56 + Sync,
57>;
58
59#[derive(Clone)]
64pub struct MqttOptions {
65 broker_addr: String,
67 port: u16,
69 transport: Transport,
71 keep_alive: Duration,
73 clean_start: bool,
75 client_id: String,
77 credentials: Option<(String, String)>,
79 request_channel_capacity: usize,
81 max_request_batch: usize,
83 pending_throttle: Duration,
86 last_will: Option<LastWill>,
88 conn_timeout: u64,
90 default_max_incoming_size: usize,
93 connect_properties: Option<ConnectProperties>,
95 manual_acks: bool,
98 network_options: NetworkOptions,
99 #[cfg(feature = "proxy")]
100 proxy: Option<Proxy>,
102 outgoing_inflight_upper_limit: Option<u16>,
105 #[cfg(feature = "websocket")]
106 request_modifier: Option<RequestModifierFn>,
107}
108
109impl MqttOptions {
110 pub fn new<S: Into<String>, T: Into<String>>(id: S, host: T, port: u16) -> MqttOptions {
120 MqttOptions {
121 broker_addr: host.into(),
122 port,
123 transport: Transport::tcp(),
124 keep_alive: Duration::from_secs(60),
125 clean_start: true,
126 client_id: id.into(),
127 credentials: None,
128 request_channel_capacity: 10,
129 max_request_batch: 0,
130 pending_throttle: Duration::from_micros(0),
131 last_will: None,
132 conn_timeout: 5,
133 default_max_incoming_size: 10 * 1024,
134 connect_properties: None,
135 manual_acks: false,
136 network_options: NetworkOptions::new(),
137 #[cfg(feature = "proxy")]
138 proxy: None,
139 outgoing_inflight_upper_limit: None,
140 #[cfg(feature = "websocket")]
141 request_modifier: None,
142 }
143 }
144
145 #[cfg(feature = "url")]
146 pub fn parse_url<S: Into<String>>(url: S) -> Result<MqttOptions, OptionError> {
172 let url = url::Url::parse(&url.into())?;
173 let options = MqttOptions::try_from(url)?;
174
175 Ok(options)
176 }
177
178 pub fn broker_address(&self) -> (String, u16) {
180 (self.broker_addr.clone(), self.port)
181 }
182
183 pub fn set_last_will(&mut self, will: LastWill) -> &mut Self {
184 self.last_will = Some(will);
185 self
186 }
187
188 pub fn last_will(&self) -> Option<LastWill> {
189 self.last_will.clone()
190 }
191
192 #[cfg(feature = "websocket")]
193 pub fn set_request_modifier<F, O>(&mut self, request_modifier: F) -> &mut Self
194 where
195 F: Fn(http::Request<()>) -> O + Send + Sync + 'static,
196 O: IntoFuture<Output = http::Request<()>> + 'static,
197 O::IntoFuture: Send,
198 {
199 self.request_modifier = Some(Arc::new(move |request| {
200 let request_modifier = request_modifier(request).into_future();
201 Box::pin(request_modifier)
202 }));
203
204 self
205 }
206
207 #[cfg(feature = "websocket")]
208 pub fn request_modifier(&self) -> Option<RequestModifierFn> {
209 self.request_modifier.clone()
210 }
211
212 pub fn set_transport(&mut self, transport: Transport) -> &mut Self {
213 self.transport = transport;
214 self
215 }
216
217 pub fn transport(&self) -> Transport {
218 self.transport.clone()
219 }
220
221 pub fn set_keep_alive(&mut self, duration: Duration) -> &mut Self {
224 assert!(duration.as_secs() >= 5, "Keep alives should be >= 5 secs");
225
226 self.keep_alive = duration;
227 self
228 }
229
230 pub fn keep_alive(&self) -> Duration {
232 self.keep_alive
233 }
234
235 pub fn client_id(&self) -> String {
237 self.client_id.clone()
238 }
239
240 pub fn set_clean_start(&mut self, clean_start: bool) -> &mut Self {
247 self.clean_start = clean_start;
248 self
249 }
250
251 pub fn clean_start(&self) -> bool {
253 self.clean_start
254 }
255
256 pub fn set_credentials<U: Into<String>, P: Into<String>>(
258 &mut self,
259 username: U,
260 password: P,
261 ) -> &mut Self {
262 self.credentials = Some((username.into(), password.into()));
263 self
264 }
265
266 pub fn credentials(&self) -> Option<(String, String)> {
268 self.credentials.clone()
269 }
270
271 pub fn set_request_channel_capacity(&mut self, capacity: usize) -> &mut Self {
273 self.request_channel_capacity = capacity;
274 self
275 }
276
277 pub fn request_channel_capacity(&self) -> usize {
279 self.request_channel_capacity
280 }
281
282 pub fn set_pending_throttle(&mut self, duration: Duration) -> &mut Self {
284 self.pending_throttle = duration;
285 self
286 }
287
288 pub fn pending_throttle(&self) -> Duration {
290 self.pending_throttle
291 }
292
293 pub fn set_connection_timeout(&mut self, timeout: u64) -> &mut Self {
295 self.conn_timeout = timeout;
296 self
297 }
298
299 pub fn connection_timeout(&self) -> u64 {
301 self.conn_timeout
302 }
303
304 pub fn set_connect_properties(&mut self, properties: ConnectProperties) -> &mut Self {
306 self.connect_properties = Some(properties);
307 self
308 }
309
310 pub fn connect_properties(&self) -> Option<ConnectProperties> {
312 self.connect_properties.clone()
313 }
314
315 pub fn set_receive_maximum(&mut self, recv_max: Option<u16>) -> &mut Self {
317 if let Some(conn_props) = &mut self.connect_properties {
318 conn_props.receive_maximum = recv_max;
319 self
320 } else {
321 let mut conn_props = ConnectProperties::new();
322 conn_props.receive_maximum = recv_max;
323 self.set_connect_properties(conn_props)
324 }
325 }
326
327 pub fn receive_maximum(&self) -> Option<u16> {
329 if let Some(conn_props) = &self.connect_properties {
330 conn_props.receive_maximum
331 } else {
332 None
333 }
334 }
335
336 pub fn set_max_packet_size(&mut self, max_size: Option<u32>) -> &mut Self {
338 if let Some(conn_props) = &mut self.connect_properties {
339 conn_props.max_packet_size = max_size;
340 self
341 } else {
342 let mut conn_props = ConnectProperties::new();
343 conn_props.max_packet_size = max_size;
344 self.set_connect_properties(conn_props)
345 }
346 }
347
348 pub fn max_packet_size(&self) -> Option<u32> {
350 if let Some(conn_props) = &self.connect_properties {
351 conn_props.max_packet_size
352 } else {
353 None
354 }
355 }
356
357 pub fn set_topic_alias_max(&mut self, topic_alias_max: Option<u16>) -> &mut Self {
359 if let Some(conn_props) = &mut self.connect_properties {
360 conn_props.topic_alias_max = topic_alias_max;
361 self
362 } else {
363 let mut conn_props = ConnectProperties::new();
364 conn_props.topic_alias_max = topic_alias_max;
365 self.set_connect_properties(conn_props)
366 }
367 }
368
369 pub fn topic_alias_max(&self) -> Option<u16> {
371 if let Some(conn_props) = &self.connect_properties {
372 conn_props.topic_alias_max
373 } else {
374 None
375 }
376 }
377
378 pub fn set_request_response_info(&mut self, request_response_info: Option<u8>) -> &mut Self {
380 if let Some(conn_props) = &mut self.connect_properties {
381 conn_props.request_response_info = request_response_info;
382 self
383 } else {
384 let mut conn_props = ConnectProperties::new();
385 conn_props.request_response_info = request_response_info;
386 self.set_connect_properties(conn_props)
387 }
388 }
389
390 pub fn request_response_info(&self) -> Option<u8> {
392 if let Some(conn_props) = &self.connect_properties {
393 conn_props.request_response_info
394 } else {
395 None
396 }
397 }
398
399 pub fn set_request_problem_info(&mut self, request_problem_info: Option<u8>) -> &mut Self {
401 if let Some(conn_props) = &mut self.connect_properties {
402 conn_props.request_problem_info = request_problem_info;
403 self
404 } else {
405 let mut conn_props = ConnectProperties::new();
406 conn_props.request_problem_info = request_problem_info;
407 self.set_connect_properties(conn_props)
408 }
409 }
410
411 pub fn request_problem_info(&self) -> Option<u8> {
413 if let Some(conn_props) = &self.connect_properties {
414 conn_props.request_problem_info
415 } else {
416 None
417 }
418 }
419
420 pub fn set_user_properties(&mut self, user_properties: Vec<(String, String)>) -> &mut Self {
422 if let Some(conn_props) = &mut self.connect_properties {
423 conn_props.user_properties = user_properties;
424 self
425 } else {
426 let mut conn_props = ConnectProperties::new();
427 conn_props.user_properties = user_properties;
428 self.set_connect_properties(conn_props)
429 }
430 }
431
432 pub fn user_properties(&self) -> Vec<(String, String)> {
434 if let Some(conn_props) = &self.connect_properties {
435 conn_props.user_properties.clone()
436 } else {
437 Vec::new()
438 }
439 }
440
441 pub fn set_authentication_method(
443 &mut self,
444 authentication_method: Option<String>,
445 ) -> &mut Self {
446 if let Some(conn_props) = &mut self.connect_properties {
447 conn_props.authentication_method = authentication_method;
448 self
449 } else {
450 let mut conn_props = ConnectProperties::new();
451 conn_props.authentication_method = authentication_method;
452 self.set_connect_properties(conn_props)
453 }
454 }
455
456 pub fn authentication_method(&self) -> Option<String> {
458 if let Some(conn_props) = &self.connect_properties {
459 conn_props.authentication_method.clone()
460 } else {
461 None
462 }
463 }
464
465 pub fn set_authentication_data(&mut self, authentication_data: Option<Bytes>) -> &mut Self {
467 if let Some(conn_props) = &mut self.connect_properties {
468 conn_props.authentication_data = authentication_data;
469 self
470 } else {
471 let mut conn_props = ConnectProperties::new();
472 conn_props.authentication_data = authentication_data;
473 self.set_connect_properties(conn_props)
474 }
475 }
476
477 pub fn authentication_data(&self) -> Option<Bytes> {
479 if let Some(conn_props) = &self.connect_properties {
480 conn_props.authentication_data.clone()
481 } else {
482 None
483 }
484 }
485
486 pub fn set_manual_acks(&mut self, manual_acks: bool) -> &mut Self {
488 self.manual_acks = manual_acks;
489 self
490 }
491
492 pub fn manual_acks(&self) -> bool {
494 self.manual_acks
495 }
496
497 pub fn network_options(&self) -> NetworkOptions {
498 self.network_options.clone()
499 }
500
501 pub fn set_network_options(&mut self, network_options: NetworkOptions) -> &mut Self {
502 self.network_options = network_options;
503 self
504 }
505
506 #[cfg(feature = "proxy")]
507 pub fn set_proxy(&mut self, proxy: Proxy) -> &mut Self {
508 self.proxy = Some(proxy);
509 self
510 }
511
512 #[cfg(feature = "proxy")]
513 pub fn proxy(&self) -> Option<Proxy> {
514 self.proxy.clone()
515 }
516
517 pub fn set_outgoing_inflight_upper_limit(&mut self, limit: u16) -> &mut Self {
520 self.outgoing_inflight_upper_limit = Some(limit);
521 self
522 }
523
524 pub fn get_outgoing_inflight_upper_limit(&self) -> Option<u16> {
527 self.outgoing_inflight_upper_limit
528 }
529}
530
531#[cfg(feature = "url")]
532#[derive(Debug, PartialEq, Eq, thiserror::Error)]
533pub enum OptionError {
534 #[error("Unsupported URL scheme.")]
535 Scheme,
536
537 #[error("Missing client ID.")]
538 ClientId,
539
540 #[error("Invalid keep-alive value.")]
541 KeepAlive,
542
543 #[error("Invalid clean-start value.")]
544 CleanStart,
545
546 #[error("Invalid max-incoming-packet-size value.")]
547 MaxIncomingPacketSize,
548
549 #[error("Invalid max-outgoing-packet-size value.")]
550 MaxOutgoingPacketSize,
551
552 #[error("Invalid request-channel-capacity value.")]
553 RequestChannelCapacity,
554
555 #[error("Invalid max-request-batch value.")]
556 MaxRequestBatch,
557
558 #[error("Invalid pending-throttle value.")]
559 PendingThrottle,
560
561 #[error("Invalid inflight value.")]
562 Inflight,
563
564 #[error("Invalid conn-timeout value.")]
565 ConnTimeout,
566
567 #[error("Unknown option: {0}")]
568 Unknown(String),
569
570 #[error("Couldn't parse option from url: {0}")]
571 Parse(#[from] url::ParseError),
572}
573
574#[cfg(feature = "url")]
575impl std::convert::TryFrom<url::Url> for MqttOptions {
576 type Error = OptionError;
577
578 fn try_from(url: url::Url) -> Result<Self, Self::Error> {
579 use std::collections::HashMap;
580
581 let host = url.host_str().unwrap_or_default().to_owned();
582
583 let (transport, default_port) = match url.scheme() {
584 #[cfg(feature = "use-rustls")]
588 "mqtts" | "ssl" => (Transport::tls_with_default_config(), 8883),
589 "mqtt" | "tcp" => (Transport::Tcp, 1883),
590 #[cfg(feature = "websocket")]
591 "ws" => (Transport::Ws, 8000),
592 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
593 "wss" => (Transport::wss_with_default_config(), 8000),
594 _ => return Err(OptionError::Scheme),
595 };
596
597 let port = url.port().unwrap_or(default_port);
598
599 let mut queries = url.query_pairs().collect::<HashMap<_, _>>();
600
601 let id = queries
602 .remove("client_id")
603 .ok_or(OptionError::ClientId)?
604 .into_owned();
605
606 let mut options = MqttOptions::new(id, host, port);
607 let mut connect_props = ConnectProperties::new();
608 options.set_transport(transport);
609
610 if let Some(keep_alive) = queries
611 .remove("keep_alive_secs")
612 .map(|v| v.parse::<u64>().map_err(|_| OptionError::KeepAlive))
613 .transpose()?
614 {
615 options.set_keep_alive(Duration::from_secs(keep_alive));
616 }
617
618 if let Some(clean_start) = queries
619 .remove("clean_start")
620 .map(|v| v.parse::<bool>().map_err(|_| OptionError::CleanStart))
621 .transpose()?
622 {
623 options.set_clean_start(clean_start);
624 }
625
626 if let Some((username, password)) = {
627 match url.username() {
628 "" => None,
629 username => Some((
630 username.to_owned(),
631 url.password().unwrap_or_default().to_owned(),
632 )),
633 }
634 } {
635 options.set_credentials(username, password);
636 }
637
638 connect_props.max_packet_size = queries
639 .remove("max_incoming_packet_size_bytes")
640 .map(|v| {
641 v.parse::<u32>()
642 .map_err(|_| OptionError::MaxIncomingPacketSize)
643 })
644 .transpose()?;
645
646 if let Some(request_channel_capacity) = queries
647 .remove("request_channel_capacity_num")
648 .map(|v| {
649 v.parse::<usize>()
650 .map_err(|_| OptionError::RequestChannelCapacity)
651 })
652 .transpose()?
653 {
654 options.request_channel_capacity = request_channel_capacity;
655 }
656
657 if let Some(max_request_batch) = queries
658 .remove("max_request_batch_num")
659 .map(|v| v.parse::<usize>().map_err(|_| OptionError::MaxRequestBatch))
660 .transpose()?
661 {
662 options.max_request_batch = max_request_batch;
663 }
664
665 if let Some(pending_throttle) = queries
666 .remove("pending_throttle_usecs")
667 .map(|v| v.parse::<u64>().map_err(|_| OptionError::PendingThrottle))
668 .transpose()?
669 {
670 options.set_pending_throttle(Duration::from_micros(pending_throttle));
671 }
672
673 connect_props.receive_maximum = queries
674 .remove("inflight_num")
675 .map(|v| v.parse::<u16>().map_err(|_| OptionError::Inflight))
676 .transpose()?;
677
678 if let Some(conn_timeout) = queries
679 .remove("conn_timeout_secs")
680 .map(|v| v.parse::<u64>().map_err(|_| OptionError::ConnTimeout))
681 .transpose()?
682 {
683 options.set_connection_timeout(conn_timeout);
684 }
685
686 if let Some((opt, _)) = queries.into_iter().next() {
687 return Err(OptionError::Unknown(opt.into_owned()));
688 }
689
690 options.connect_properties = Some(connect_props);
691 Ok(options)
692 }
693}
694
695impl Debug for MqttOptions {
698 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
699 f.debug_struct("MqttOptions")
700 .field("broker_addr", &self.broker_addr)
701 .field("port", &self.port)
702 .field("keep_alive", &self.keep_alive)
703 .field("clean_start", &self.clean_start)
704 .field("client_id", &self.client_id)
705 .field("credentials", &self.credentials)
706 .field("request_channel_capacity", &self.request_channel_capacity)
707 .field("max_request_batch", &self.max_request_batch)
708 .field("pending_throttle", &self.pending_throttle)
709 .field("last_will", &self.last_will)
710 .field("conn_timeout", &self.conn_timeout)
711 .field("manual_acks", &self.manual_acks)
712 .field("connect properties", &self.connect_properties)
713 .finish()
714 }
715}
716
717#[cfg(test)]
718mod test {
719 use super::*;
720
721 #[test]
722 #[cfg(all(feature = "use-rustls", feature = "websocket"))]
723 fn no_scheme() {
724 use crate::{TlsConfiguration, Transport};
725 let mut mqttoptions = MqttOptions::new("client_a", "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host", 443);
726
727 mqttoptions.set_transport(Transport::wss(Vec::from("Test CA"), None, None));
728
729 if let Transport::Wss(TlsConfiguration::Simple {
730 ca,
731 client_auth,
732 alpn,
733 }) = mqttoptions.transport
734 {
735 assert_eq!(ca, Vec::from("Test CA"));
736 assert_eq!(client_auth, None);
737 assert_eq!(alpn, None);
738 } else {
739 panic!("Unexpected transport!");
740 }
741
742 assert_eq!(mqttoptions.broker_addr, "a3f8czas.iot.eu-west-1.amazonaws.com/mqtt?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=MyCreds%2F20201001%2Feu-west-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Date=20201001T130812Z&X-Amz-Expires=7200&X-Amz-Signature=9ae09b49896f44270f2707551581953e6cac71a4ccf34c7c3415555be751b2d1&X-Amz-SignedHeaders=host");
743 }
744
745 #[test]
746 #[cfg(feature = "url")]
747 fn from_url() {
748 fn opt(s: &str) -> Result<MqttOptions, OptionError> {
749 MqttOptions::parse_url(s)
750 }
751 fn ok(s: &str) -> MqttOptions {
752 opt(s).expect("valid options")
753 }
754 fn err(s: &str) -> OptionError {
755 opt(s).expect_err("invalid options")
756 }
757
758 let v = ok("mqtt://host:42?client_id=foo");
759 assert_eq!(v.broker_address(), ("host".to_owned(), 42));
760 assert_eq!(v.client_id(), "foo".to_owned());
761
762 let v = ok("mqtt://host:42?client_id=foo&keep_alive_secs=5");
763 assert_eq!(v.keep_alive, Duration::from_secs(5));
764
765 assert_eq!(err("mqtt://host:42"), OptionError::ClientId);
766 assert_eq!(
767 err("mqtt://host:42?client_id=foo&foo=bar"),
768 OptionError::Unknown("foo".to_owned())
769 );
770 assert_eq!(err("mqt://host:42?client_id=foo"), OptionError::Scheme);
771 assert_eq!(
772 err("mqtt://host:42?client_id=foo&keep_alive_secs=foo"),
773 OptionError::KeepAlive
774 );
775 assert_eq!(
776 err("mqtt://host:42?client_id=foo&clean_start=foo"),
777 OptionError::CleanStart
778 );
779 assert_eq!(
780 err("mqtt://host:42?client_id=foo&max_incoming_packet_size_bytes=foo"),
781 OptionError::MaxIncomingPacketSize
782 );
783 assert_eq!(
784 err("mqtt://host:42?client_id=foo&request_channel_capacity_num=foo"),
785 OptionError::RequestChannelCapacity
786 );
787 assert_eq!(
788 err("mqtt://host:42?client_id=foo&max_request_batch_num=foo"),
789 OptionError::MaxRequestBatch
790 );
791 assert_eq!(
792 err("mqtt://host:42?client_id=foo&pending_throttle_usecs=foo"),
793 OptionError::PendingThrottle
794 );
795 assert_eq!(
796 err("mqtt://host:42?client_id=foo&inflight_num=foo"),
797 OptionError::Inflight
798 );
799 assert_eq!(
800 err("mqtt://host:42?client_id=foo&conn_timeout_secs=foo"),
801 OptionError::ConnTimeout
802 );
803 }
804
805 #[test]
806 fn allow_empty_client_id() {
807 let _mqtt_opts = MqttOptions::new("", "127.0.0.1", 1883).set_clean_start(true);
808 }
809}