lockwise_backend/
mqtt.rs

1//! Módulo para comunicação MQTT.
2//!
3//! Este módulo gerencia a conexão MQTT com dispositivos, incluindo publicação de comandos,
4//! recebimento de mensagens de status e processamento de heartbeats.
5use anyhow::Result;
6use chrono::{TimeZone, Utc};
7use rumqttc::{AsyncClient, Event, Incoming, QoS};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10use uuid::Uuid;
11
12use super::device::LockStatusMessage;
13
14/// Estrutura de mensagem para relatórios de heartbeat de dispositivos via MQTT
15#[derive(Deserialize)]
16struct HeartbeatMessage {
17    /// Tipo de mensagem de heartbeat
18    heartbeat: String,
19    /// Tempo de atividade do dispositivo em milissegundos
20    uptime_ms: u64,
21    /// Timestamp da mensagem
22    #[allow(dead_code)]
23    timestamp: u64,
24    /// SSID WiFi ao qual o dispositivo está conectado
25    wifi_ssid: String,
26    /// URL do back-end configurada no dispositivo
27    backend_url: String,
28    /// URL do broker MQTT configurada no dispositivo
29    mqtt_broker_url: String,
30    /// Se o heartbeat MQTT está habilitado
31    mqtt_heartbeat_enable: bool,
32    /// Intervalo de heartbeat em segundos
33    mqtt_heartbeat_interval_sec: i32,
34    /// Tempo limite de gravação de áudio em segundos
35    audio_record_timeout_sec: i32,
36    /// Tempo limite de bloqueio em milissegundos
37    lock_timeout_ms: i32,
38    /// Tempo limite de pareamento em segundos
39    pairing_timeout_sec: i32,
40    /// ID do usuário associado ao dispositivo
41    user_id: String,
42    /// Estado atual de bloqueio
43    lock_state: Option<String>,
44    /// Se a detecção de voz está habilitada
45    voice_detection_enable: bool,
46    /// Limiar RMS para detecção de atividade de voz
47    vad_rms_threshold: i32,
48}
49
50/// Estrutura de mensagem para relatórios de eventos de dispositivos via MQTT
51#[derive(Deserialize)]
52struct EventMessage {
53    /// Tipo de evento (ex.: PONG, CONFIG_UPDATED, LOCKING_DOWN)
54    event: String,
55    /// Tempo de atividade do dispositivo em milissegundos
56    #[allow(dead_code)]
57    uptime_ms: u64,
58    /// Timestamp do evento
59    timestamp: u64,
60}
61
62/// Estrutura de mensagem para enviar comandos de controle aos dispositivos via MQTT
63#[derive(Serialize)]
64struct ControlMessage {
65    /// O comando a enviar (ex.: LOCK, UNLOCK, PING)
66    command: String,
67}
68
69/// Manipula eventos MQTT recebidos dos dispositivos.
70/// Processa mensagens de heartbeat, eventos (PONG, CONFIG_UPDATED, LOCKING_DOWN)
71/// e atualizações de status de bloqueio, atualizando o banco de dados conforme necessário.
72pub async fn handle_mqtt_events(db_pool: &PgPool, eventloop: &mut rumqttc::EventLoop) {
73    loop {
74        match eventloop.poll().await {
75            Ok(Event::Incoming(Incoming::Publish(publish))) => {
76                let topic = publish.topic;
77                if topic.starts_with("lockwise/") && topic.ends_with("/status") {
78                    let uuid_str = &topic[9..topic.len() - 7]; // extract UUID
79                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
80                        // Try to parse as HeartbeatMessage first (has heartbeat field)
81                        if let Ok(heartbeat_msg) =
82                            serde_cbor::from_slice::<HeartbeatMessage>(&publish.payload)
83                        {
84                            if heartbeat_msg.heartbeat == "HEARTBEAT" {
85                                // Handle HEARTBEAT
86                                let now = Utc::now();
87                                let lock_state =
88                                    heartbeat_msg.lock_state.as_deref().unwrap_or("UNKNOWN");
89
90                                // Check if device is in lockdown and heartbeat is at least 10 seconds after lockdown
91                                let should_clear_lockdown = {
92                                    let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> =
93                                        sqlx::query_as(
94                                            "SELECT locked_down_at FROM devices WHERE uuid = $1",
95                                        )
96                                        .bind(uuid)
97                                        .fetch_optional(db_pool)
98                                        .await
99                                        .unwrap_or(None);
100
101                                    if let Some((Some(locked_down_at),)) = row {
102                                        let duration_since_lockdown = now - locked_down_at;
103                                        duration_since_lockdown.num_seconds() >= 10
104                                    } else {
105                                        false
106                                    }
107                                };
108
109                                let update_query = if should_clear_lockdown {
110                                    "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase, locked_down_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL, NULL)
111                                          ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15, locked_down_at = NULL"
112                                } else {
113                                    "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL)
114                                          ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15"
115                                };
116
117                                let _ = sqlx::query(update_query)
118                                    .bind(uuid)
119                                    .bind(&heartbeat_msg.user_id)
120                                    .bind(now)
121                                    .bind(heartbeat_msg.uptime_ms as i64)
122                                    .bind(&heartbeat_msg.wifi_ssid)
123                                    .bind(&heartbeat_msg.backend_url)
124                                    .bind(&heartbeat_msg.mqtt_broker_url)
125                                    .bind(heartbeat_msg.mqtt_heartbeat_enable)
126                                    .bind(heartbeat_msg.mqtt_heartbeat_interval_sec)
127                                    .bind(heartbeat_msg.audio_record_timeout_sec)
128                                    .bind(heartbeat_msg.lock_timeout_ms)
129                                    .bind(heartbeat_msg.pairing_timeout_sec)
130                                    .bind(lock_state)
131                                    .bind(heartbeat_msg.voice_detection_enable)
132                                    .bind(heartbeat_msg.vad_rms_threshold)
133                                    .execute(db_pool)
134                                    .await;
135                            }
136                        } else if let Ok(event_msg) =
137                            serde_cbor::from_slice::<EventMessage>(&publish.payload)
138                        {
139                            if event_msg.event == "PONG" {
140                                // Handle PONG
141                                let pings_mutex = super::PENDING_PINGS.get().unwrap();
142                                let mut pings = pings_mutex.lock().unwrap();
143                                if let Some((_, tx)) = pings.remove(&uuid_str.to_string()) {
144                                    tx.send(()).ok();
145                                }
146                            } else if event_msg.event == "CONFIG_UPDATED" {
147                                // Handle CONFIG_UPDATED
148                                let updates_mutex = super::PENDING_CONFIG_UPDATES.get().unwrap();
149                                let mut updates = updates_mutex.lock().unwrap();
150                                if let Some(tx) = updates.remove(&uuid_str.to_string()) {
151                                    tx.send(()).ok();
152                                }
153                            } else if event_msg.event == "LOCKING_DOWN" {
154                                // LOCKING_DOWN event - set locked_down_at
155                                let timestamp = Utc
156                                    .timestamp_millis_opt(event_msg.timestamp as i64 * 1000)
157                                    .unwrap();
158                                let result = sqlx::query(
159                                    "UPDATE devices SET locked_down_at = $1 WHERE uuid = $2",
160                                )
161                                .bind(timestamp)
162                                .bind(uuid)
163                                .execute(db_pool)
164                                .await;
165                                if result.is_ok() {}
166                            }
167                        } else if let Ok(lock_msg) =
168                            serde_cbor::from_slice::<LockStatusMessage>(&publish.payload)
169                        {
170                            // LOCK/UNLOCK event
171                            let event_type = if lock_msg.lock == "LOCKED" {
172                                "LOCK"
173                            } else {
174                                "UNLOCK"
175                            };
176                            let reason = &lock_msg.reason;
177                            let timestamp = Utc
178                                .timestamp_millis_opt(lock_msg.timestamp as i64 * 1000)
179                                .unwrap();
180
181                            // Check for recent command
182                            let user_id = {
183                                let commands_mutex = super::RECENT_COMMANDS.get().unwrap();
184                                let mut commands = commands_mutex.lock().unwrap();
185                                if let Some((uid, cmd_time)) = commands.get(&uuid_str.to_string()) {
186                                    let now = Utc::now().timestamp();
187                                    if now - cmd_time < 5 {
188                                        // within 5 seconds
189                                        let uid = uid.clone();
190                                        commands.remove(&uuid_str.to_string());
191                                        Some(uid)
192                                    } else {
193                                        None
194                                    }
195                                } else {
196                                    None
197                                }
198                            };
199
200                            // Insert log
201                            let _ = sqlx::query(
202                                       "INSERT INTO logs (device_id, timestamp, event_type, reason, user_id) VALUES ($1, $2, $3, $4, $5)"
203                                   )
204                                   .bind(uuid_str)
205                                   .bind(timestamp)
206                                   .bind(event_type)
207                                   .bind(reason)
208                                   .bind(&user_id)
209                                   .execute(db_pool)
210                                   .await;
211
212                            // Update lock_state
213                            let lock_state = if lock_msg.lock == "LOCKED" {
214                                "LOCKED"
215                            } else {
216                                "UNLOCKED"
217                            };
218                            let _ =
219                                sqlx::query("UPDATE devices SET lock_state = $1 WHERE uuid = $2")
220                                    .bind(lock_state)
221                                    .bind(uuid_str)
222                                    .execute(db_pool)
223                                    .await;
224                        }
225                    }
226                }
227            }
228            Ok(_) => {}
229            Err(_) => {}
230        }
231    }
232}
233
234/// Publica uma mensagem de controle para um dispositivo via MQTT.
235/// Envia um comando para o UUID do dispositivo especificado.
236pub async fn publish_control_message(
237    client: &AsyncClient,
238    uuid: Uuid,
239    command: String,
240) -> Result<()> {
241    let topic = format!("lockwise/{}/control", uuid);
242    let msg = ControlMessage { command };
243    let payload = serde_cbor::to_vec(&msg)?;
244    client
245        .publish(topic, QoS::AtMostOnce, false, payload)
246        .await?;
247    Ok(())
248}