rumqttc/v5/mqttbytes/
mod.rs

1use std::{str::Utf8Error, vec};
2
3/// This module is the place where all the protocol specifics gets abstracted
4/// out and creates a structures which are common across protocols. Since,
5/// MQTT is the core protocol that this broker supports, a lot of structs closely
6/// map to what MQTT specifies in its protocol
7pub mod v5;
8
9/// Quality of service
10#[repr(u8)]
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
12#[allow(clippy::enum_variant_names)]
13pub enum QoS {
14    AtMostOnce = 0,
15    AtLeastOnce = 1,
16    ExactlyOnce = 2,
17}
18
19impl Default for QoS {
20    fn default() -> Self {
21        Self::AtMostOnce
22    }
23}
24
25/// Maps a number to QoS
26pub fn qos(num: u8) -> Option<QoS> {
27    match num {
28        0 => Some(QoS::AtMostOnce),
29        1 => Some(QoS::AtLeastOnce),
30        2 => Some(QoS::ExactlyOnce),
31        _ => None,
32    }
33}
34
35/// Checks if a topic or topic filter has wildcards
36pub fn has_wildcards(s: &str) -> bool {
37    s.contains('+') || s.contains('#')
38}
39
40/// Checks if a topic is valid
41pub fn valid_topic(topic: &str) -> bool {
42    // topic can't contain wildcards
43    if topic.contains('+') || topic.contains('#') {
44        return false;
45    }
46
47    true
48}
49
50/// Checks if the filter is valid
51///
52/// <https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106>
53pub fn valid_filter(filter: &str) -> bool {
54    if filter.is_empty() {
55        return false;
56    }
57
58    // rev() is used so we can easily get the last entry
59    let mut hirerarchy = filter.split('/').rev();
60
61    // split will never return an empty iterator
62    // even if the pattern isn't matched, the original string will be there
63    // so it is safe to just unwrap here!
64    let last = hirerarchy.next().unwrap();
65
66    // only single '#" or '+' is allowed in last entry
67    // invalid: sport/tennis#
68    // invalid: sport/++
69    if last.len() != 1 && (last.contains('#') || last.contains('+')) {
70        return false;
71    }
72
73    // remaining entries
74    for entry in hirerarchy {
75        // # is not allowed in filter except as a last entry
76        // invalid: sport/tennis#/player
77        // invalid: sport/tennis/#/ranking
78        if entry.contains('#') {
79            return false;
80        }
81
82        // + must occupy an entire level of the filter
83        // invalid: sport+
84        if entry.len() > 1 && entry.contains('+') {
85            return false;
86        }
87    }
88
89    true
90}
91
92/// Checks if topic matches a filter. topic and filter validation isn't done here.
93///
94/// **NOTE**: 'topic' is a misnomer in the arg. this can also be used to match 2 wild subscriptions
95/// **NOTE**: make sure a topic is validated during a publish and filter is validated
96/// during a subscribe
97pub fn matches(topic: &str, filter: &str) -> bool {
98    if !topic.is_empty() && topic[..1].contains('$') {
99        return false;
100    }
101
102    let mut topics = topic.split('/');
103    let mut filters = filter.split('/');
104
105    for f in filters.by_ref() {
106        // "#" being the last element is validated by the broker with 'valid_filter'
107        if f == "#" {
108            return true;
109        }
110
111        // filter still has remaining elements
112        // filter = a/b/c/# should match topci = a/b/c
113        // filter = a/b/c/d should not match topic = a/b/c
114        let top = topics.next();
115        match top {
116            Some("#") => return false,
117            Some(_) if f == "+" => continue,
118            Some(t) if f != t => return false,
119            Some(_) => continue,
120            None => return false,
121        }
122    }
123
124    // topic has remaining elements and filter's last element isn't "#"
125    if topics.next().is_some() {
126        return false;
127    }
128
129    true
130}
131
132/// Error during serialization and deserialization
133#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
134pub enum Error {
135    #[error("Invalid return code received as response for connect = {0}")]
136    InvalidConnectReturnCode(u8),
137    #[error("Invalid reason = {0}")]
138    InvalidReason(u8),
139    #[error("Invalid remaining length = {0}")]
140    InvalidRemainingLength(usize),
141    #[error("Invalid protocol used")]
142    InvalidProtocol,
143    #[error("Invalid protocol level")]
144    InvalidProtocolLevel(u8),
145    #[error("Invalid packet format")]
146    IncorrectPacketFormat,
147    #[error("Invalid packet type = {0}")]
148    InvalidPacketType(u8),
149    #[error("Invalid retain forward rule = {0}")]
150    InvalidRetainForwardRule(u8),
151    #[error("Invalid QoS level = {0}")]
152    InvalidQoS(u8),
153    #[error("Invalid subscribe reason code = {0}")]
154    InvalidSubscribeReasonCode(u8),
155    #[error("Packet received has id Zero")]
156    PacketIdZero,
157    #[error("Empty Subscription")]
158    EmptySubscription,
159    #[error("Subscription had id Zero")]
160    SubscriptionIdZero,
161    #[error("Payload size is incorrect")]
162    PayloadSizeIncorrect,
163    #[error("Payload is too long")]
164    PayloadTooLong,
165    #[error("Max Payload size of {max:?} has been exceeded by packet of {pkt_size:?} bytes")]
166    PayloadSizeLimitExceeded { pkt_size: usize, max: usize },
167    #[error("Payload is required")]
168    PayloadRequired,
169    #[error("Payload is required = {0}")]
170    PayloadNotUtf8(#[from] Utf8Error),
171    #[error("Topic not utf-8")]
172    TopicNotUtf8,
173    #[error("Promised boundary crossed, contains {0} bytes")]
174    BoundaryCrossed(usize),
175    #[error("Packet is malformed")]
176    MalformedPacket,
177    #[error("Remaining length is malformed")]
178    MalformedRemainingLength,
179    #[error("Invalid property type = {0}")]
180    InvalidPropertyType(u8),
181    /// More bytes required to frame packet. Argument
182    /// implies minimum additional bytes required to
183    /// proceed further
184    #[error("Insufficient number of bytes to frame packet, {0} more bytes required")]
185    InsufficientBytes(usize),
186}