rumqttc/mqttbytes/v4/
mod.rs

1use super::*;
2
3mod connack;
4mod connect;
5mod disconnect;
6mod ping;
7mod puback;
8mod pubcomp;
9mod publish;
10mod pubrec;
11mod pubrel;
12mod suback;
13mod subscribe;
14mod unsuback;
15mod unsubscribe;
16
17pub use connack::*;
18pub use connect::*;
19pub use disconnect::*;
20pub use ping::*;
21pub use puback::*;
22pub use pubcomp::*;
23pub use publish::*;
24pub use pubrec::*;
25pub use pubrel::*;
26pub use suback::*;
27pub use subscribe::*;
28pub use unsuback::*;
29pub use unsubscribe::*;
30
31/// Encapsulates all MQTT packet types
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub enum Packet {
34    Connect(Connect),
35    ConnAck(ConnAck),
36    Publish(Publish),
37    PubAck(PubAck),
38    PubRec(PubRec),
39    PubRel(PubRel),
40    PubComp(PubComp),
41    Subscribe(Subscribe),
42    SubAck(SubAck),
43    Unsubscribe(Unsubscribe),
44    UnsubAck(UnsubAck),
45    PingReq,
46    PingResp,
47    Disconnect,
48}
49
50/// Reads a stream of bytes and extracts next MQTT packet out of it
51pub fn read(stream: &mut BytesMut, max_size: usize) -> Result<Packet, Error> {
52    let fixed_header = check(stream.iter(), max_size)?;
53
54    // Test with a stream with exactly the size to check border panics
55    let packet = stream.split_to(fixed_header.frame_length());
56    let packet_type = fixed_header.packet_type()?;
57
58    if fixed_header.remaining_len == 0 {
59        // no payload packets
60        return match packet_type {
61            PacketType::PingReq => Ok(Packet::PingReq),
62            PacketType::PingResp => Ok(Packet::PingResp),
63            PacketType::Disconnect => Ok(Packet::Disconnect),
64            _ => Err(Error::PayloadRequired),
65        };
66    }
67
68    let packet = packet.freeze();
69    let packet = match packet_type {
70        PacketType::Connect => Packet::Connect(Connect::read(fixed_header, packet)?),
71        PacketType::ConnAck => Packet::ConnAck(ConnAck::read(fixed_header, packet)?),
72        PacketType::Publish => Packet::Publish(Publish::read(fixed_header, packet)?),
73        PacketType::PubAck => Packet::PubAck(PubAck::read(fixed_header, packet)?),
74        PacketType::PubRec => Packet::PubRec(PubRec::read(fixed_header, packet)?),
75        PacketType::PubRel => Packet::PubRel(PubRel::read(fixed_header, packet)?),
76        PacketType::PubComp => Packet::PubComp(PubComp::read(fixed_header, packet)?),
77        PacketType::Subscribe => Packet::Subscribe(Subscribe::read(fixed_header, packet)?),
78        PacketType::SubAck => Packet::SubAck(SubAck::read(fixed_header, packet)?),
79        PacketType::Unsubscribe => Packet::Unsubscribe(Unsubscribe::read(fixed_header, packet)?),
80        PacketType::UnsubAck => Packet::UnsubAck(UnsubAck::read(fixed_header, packet)?),
81        PacketType::PingReq => Packet::PingReq,
82        PacketType::PingResp => Packet::PingResp,
83        PacketType::Disconnect => Packet::Disconnect,
84    };
85
86    Ok(packet)
87}
88
89/// Return number of remaining length bytes required for encoding length
90fn len_len(len: usize) -> usize {
91    if len >= 2_097_152 {
92        4
93    } else if len >= 16_384 {
94        3
95    } else if len >= 128 {
96        2
97    } else {
98        1
99    }
100}