lockwise_backend/
mqtt.rs

1//! Módulo para comunicação MQTT.
2//!
3//! Este módulo gerencia a conexão MQTT com dispositivos, incluindo publicação de comandos,
4//! recebimento de mensagens de status e processamento de heartbeats.
5use anyhow::Result;
6use chrono::{TimeZone, Utc};
7use rumqttc::{AsyncClient, Event, Incoming, QoS};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10use uuid::Uuid;
11
12use super::device::LockStatusMessage;
13
14/// Estrutura de mensagem para relatórios de heartbeat de dispositivos via MQTT
15#[derive(Deserialize)]
16struct HeartbeatMessage {
17    /// Tipo de mensagem de heartbeat
18    heartbeat: String,
19    /// Tempo de atividade do dispositivo em milissegundos
20    uptime_ms: u64,
21    /// Timestamp da mensagem
22    #[allow(dead_code)]
23    timestamp: u64,
24    /// SSID WiFi ao qual o dispositivo está conectado
25    wifi_ssid: String,
26    /// URL do back-end configurada no dispositivo
27    backend_url: String,
28    /// URL do broker MQTT configurada no dispositivo
29    mqtt_broker_url: String,
30    /// Se o heartbeat MQTT está habilitado
31    mqtt_heartbeat_enable: bool,
32    /// Intervalo de heartbeat em segundos
33    mqtt_heartbeat_interval_sec: i32,
34    /// Tempo limite de gravação de áudio em segundos
35    audio_record_timeout_sec: i32,
36    /// Tempo limite de bloqueio em milissegundos
37    lock_timeout_ms: i32,
38    /// Tempo limite de pareamento em segundos
39    pairing_timeout_sec: i32,
40    /// ID do usuário associado ao dispositivo
41    user_id: String,
42    /// Estado atual de bloqueio
43    lock_state: Option<String>,
44    /// Se a detecção de voz está habilitada
45    voice_detection_enable: bool,
46    /// Limiar RMS para detecção de atividade de voz
47    vad_rms_threshold: i32,
48}
49
50/// Estrutura de mensagem para relatórios de eventos de dispositivos via MQTT
51#[derive(Deserialize)]
52struct EventMessage {
53    /// Tipo de evento (ex.: PONG, CONFIG_UPDATED, LOCKING_DOWN)
54    event: String,
55    /// Tempo de atividade do dispositivo em milissegundos
56    #[allow(dead_code)]
57    uptime_ms: u64,
58    /// Timestamp do evento
59    timestamp: u64,
60}
61
62/// Estrutura de mensagem para enviar comandos de controle aos dispositivos via MQTT
63#[derive(Serialize)]
64struct ControlMessage {
65    /// O comando a enviar (ex.: LOCK, UNLOCK, PING)
66    command: String,
67}
68
69/// Manipula eventos MQTT recebidos dos dispositivos.
70/// Processa mensagens de heartbeat, eventos (PONG, CONFIG_UPDATED, LOCKING_DOWN)
71/// e atualizações de status de bloqueio, atualizando o banco de dados conforme necessário.
72/// Também envia atualizações em tempo real via WebSocket para usuários conectados.
73pub async fn handle_mqtt_events(db_pool: &PgPool, eventloop: &mut rumqttc::EventLoop) {
74    loop {
75        match eventloop.poll().await {
76            Ok(Event::Incoming(Incoming::Publish(publish))) => {
77                let topic = publish.topic;
78                if topic.starts_with("lockwise/") && topic.ends_with("/status") {
79                    let uuid_str = &topic[9..topic.len() - 7]; // extract UUID
80                    if let Ok(uuid) = Uuid::parse_str(uuid_str) {
81                        // Try to parse as HeartbeatMessage first (has heartbeat field)
82                        if let Ok(heartbeat_msg) =
83                            serde_cbor::from_slice::<HeartbeatMessage>(&publish.payload)
84                        {
85                            if heartbeat_msg.heartbeat == "HEARTBEAT" {
86                                // Handle HEARTBEAT
87                                let now = Utc::now();
88                                let lock_state =
89                                    heartbeat_msg.lock_state.as_deref().unwrap_or("UNKNOWN");
90
91                                // Check if device is in lockdown and heartbeat is at least 10 seconds after lockdown
92                                let should_clear_lockdown = {
93                                    let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> =
94                                        sqlx::query_as(
95                                            "SELECT locked_down_at FROM devices WHERE uuid = $1",
96                                        )
97                                        .bind(uuid)
98                                        .fetch_optional(db_pool)
99                                        .await
100                                        .unwrap_or(None);
101
102                                    if let Some((Some(locked_down_at),)) = row {
103                                        let duration_since_lockdown = now - locked_down_at;
104                                        duration_since_lockdown.num_seconds() >= 10
105                                    } else {
106                                        false
107                                    }
108                                };
109
110                                let update_query = if should_clear_lockdown {
111                                    "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase, locked_down_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL, NULL)
112                                          ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15, locked_down_at = NULL"
113                                } else {
114                                    "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL)
115                                          ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15"
116                                };
117
118                                let _ = sqlx::query(update_query)
119                                    .bind(uuid)
120                                    .bind(&heartbeat_msg.user_id)
121                                    .bind(now)
122                                    .bind(heartbeat_msg.uptime_ms as i64)
123                                    .bind(&heartbeat_msg.wifi_ssid)
124                                    .bind(&heartbeat_msg.backend_url)
125                                    .bind(&heartbeat_msg.mqtt_broker_url)
126                                    .bind(heartbeat_msg.mqtt_heartbeat_enable)
127                                    .bind(heartbeat_msg.mqtt_heartbeat_interval_sec)
128                                    .bind(heartbeat_msg.audio_record_timeout_sec)
129                                    .bind(heartbeat_msg.lock_timeout_ms)
130                                    .bind(heartbeat_msg.pairing_timeout_sec)
131                                    .bind(lock_state)
132                                    .bind(heartbeat_msg.voice_detection_enable)
133                                    .bind(heartbeat_msg.vad_rms_threshold)
134                                    .execute(db_pool)
135                                    .await;
136
137                                // Broadcast device online update to owner and invited users
138                                if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
139                                    let update = serde_json::json!({
140                                        "type": "device_online",
141                                        "device_id": uuid_str,
142                                        "last_heard": now.timestamp_millis(),
143                                        "lock_state": lock_state,
144                                        "locked_down_at": null
145                                    })
146                                    .to_string();
147
148                                    // Get owner
149                                    let owner_row: Option<(String,)> = sqlx::query_as(
150                                        "SELECT user_id FROM devices WHERE uuid = $1",
151                                    )
152                                    .bind(uuid)
153                                    .fetch_optional(db_pool)
154                                    .await
155                                    .unwrap_or(None);
156
157                                    let mut recipients = Vec::new();
158                                    if let Some((owner_id,)) = owner_row {
159                                        recipients.push(owner_id);
160                                    }
161
162                                    // Get invited users with status = 1
163                                    let invited_rows: Vec<(String,)> = sqlx::query_as(
164                                         "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
165                                     )
166                                     .bind(uuid)
167                                     .fetch_all(db_pool)
168                                     .await
169                                     .unwrap_or_default();
170
171                                    for (receiver_id,) in invited_rows {
172                                        recipients.push(receiver_id);
173                                    }
174
175                                    // Send to each recipient
176                                    let broadcasts = user_broadcasts.lock().unwrap();
177                                    for recipient in recipients {
178                                        if let Some(tx) = broadcasts.get(&recipient) {
179                                            let _ = tx.send(update.clone());
180                                        }
181                                    }
182                                }
183                            }
184                        } else if let Ok(event_msg) =
185                            serde_cbor::from_slice::<EventMessage>(&publish.payload)
186                        {
187                            if event_msg.event == "PONG" {
188                                // Handle PONG
189                                let pings_mutex = super::PENDING_PINGS.get().unwrap();
190                                let mut pings = pings_mutex.lock().unwrap();
191                                if let Some((_, tx)) = pings.remove(&uuid_str.to_string()) {
192                                    tx.send(()).ok();
193                                }
194                            } else if event_msg.event == "CONFIG_UPDATED" {
195                                // Handle CONFIG_UPDATED
196                                let updates_mutex = super::PENDING_CONFIG_UPDATES.get().unwrap();
197                                let mut updates = updates_mutex.lock().unwrap();
198                                if let Some(tx) = updates.remove(&uuid_str.to_string()) {
199                                    tx.send(()).ok();
200                                }
201                            } else if event_msg.event == "LOCKING_DOWN" {
202                                // LOCKING_DOWN event - set locked_down_at
203                                let timestamp = Utc
204                                    .timestamp_millis_opt(event_msg.timestamp as i64 * 1000)
205                                    .unwrap();
206                                let result = sqlx::query(
207                                    "UPDATE devices SET locked_down_at = $1 WHERE uuid = $2",
208                                )
209                                .bind(timestamp)
210                                .bind(uuid)
211                                .execute(db_pool)
212                                .await;
213                                if result.is_ok() {
214                                    // Broadcast device update to owner and invited users
215                                    if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
216                                        let update = serde_json::json!({
217                                            "type": "device_update",
218                                            "device_id": uuid_str,
219                                            "lock_state": "LOCKED",
220                                            "locked_down_at": timestamp.timestamp_millis()
221                                        })
222                                        .to_string();
223
224                                        // Get owner
225                                        let owner_row: Option<(String,)> = sqlx::query_as(
226                                            "SELECT user_id FROM devices WHERE uuid = $1",
227                                        )
228                                        .bind(uuid)
229                                        .fetch_optional(db_pool)
230                                        .await
231                                        .unwrap_or(None);
232
233                                        let mut recipients = Vec::new();
234                                        if let Some((owner_id,)) = owner_row {
235                                            recipients.push(owner_id);
236                                        }
237
238                                        // Get invited users with status = 1
239                                        let invited_rows: Vec<(String,)> = sqlx::query_as(
240                                              "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
241                                          )
242                                          .bind(uuid)
243                                          .fetch_all(db_pool)
244                                          .await
245                                          .unwrap_or_default();
246
247                                        for (receiver_id,) in invited_rows {
248                                            recipients.push(receiver_id);
249                                        }
250
251                                        // Send to each recipient
252                                        let broadcasts = user_broadcasts.lock().unwrap();
253                                        for recipient in recipients {
254                                            if let Some(tx) = broadcasts.get(&recipient) {
255                                                let _ = tx.send(update.clone());
256                                            }
257                                        }
258                                    }
259                                }
260                            }
261                        } else if let Ok(lock_msg) =
262                            serde_cbor::from_slice::<LockStatusMessage>(&publish.payload)
263                        {
264                            // LOCK/UNLOCK event
265                            let event_type = if lock_msg.lock == "LOCKED" {
266                                "LOCK"
267                            } else {
268                                "UNLOCK"
269                            };
270                            let reason = &lock_msg.reason;
271                            let timestamp = Utc
272                                .timestamp_millis_opt(lock_msg.timestamp as i64 * 1000)
273                                .unwrap();
274
275                            // Check for recent command
276                            let user_id = {
277                                let commands_mutex = super::RECENT_COMMANDS.get().unwrap();
278                                let mut commands = commands_mutex.lock().unwrap();
279                                if let Some((uid, cmd_time)) = commands.get(&uuid_str.to_string()) {
280                                    let now = Utc::now().timestamp();
281                                    if now - cmd_time < 5 {
282                                        // within 5 seconds
283                                        let uid = uid.clone();
284                                        commands.remove(&uuid_str.to_string());
285                                        Some(uid)
286                                    } else {
287                                        None
288                                    }
289                                } else {
290                                    None
291                                }
292                            };
293
294                            // Insert log
295                            let _ = sqlx::query(
296                                        "INSERT INTO logs (device_id, timestamp, event_type, reason, user_id) VALUES ($1, $2, $3, $4, $5)"
297                                    )
298                                    .bind(uuid_str)
299                                    .bind(timestamp)
300                                    .bind(event_type)
301                                    .bind(reason)
302                                    .bind(&user_id)
303                                    .execute(db_pool)
304                                    .await;
305
306                            // Broadcast log update to owner only
307                            if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
308                                // Get user name if user_id is present
309                                let user_name = if let Some(ref uid) = user_id {
310                                    let row: Option<(String,)> = sqlx::query_as(
311                                        "SELECT name FROM users WHERE firebase_uid = $1",
312                                    )
313                                    .bind(uid)
314                                    .fetch_optional(db_pool)
315                                    .await
316                                    .unwrap_or(None);
317                                    row.map(|(name,)| name)
318                                } else {
319                                    None
320                                };
321
322                                let log_update = serde_json::json!({
323                                    "type": "log_update",
324                                    "device_id": uuid_str,
325                                    "timestamp": timestamp.timestamp_millis(),
326                                    "event_type": event_type,
327                                    "reason": reason,
328                                    "user_id": user_id,
329                                    "user_name": user_name
330                                })
331                                .to_string();
332
333                                // Get owner
334                                let owner_row: Option<(String,)> =
335                                    sqlx::query_as("SELECT user_id FROM devices WHERE uuid = $1")
336                                        .bind(uuid)
337                                        .fetch_optional(db_pool)
338                                        .await
339                                        .unwrap_or(None);
340
341                                if let Some((owner_id,)) = owner_row {
342                                    let broadcasts = user_broadcasts.lock().unwrap();
343                                    if let Some(tx) = broadcasts.get(&owner_id) {
344                                        let _ = tx.send(log_update);
345                                    }
346                                }
347                            }
348
349                            // Update lock_state
350                            let lock_state = if lock_msg.lock == "LOCKED" {
351                                "LOCKED"
352                            } else {
353                                "UNLOCKED"
354                            };
355                            let _ =
356                                sqlx::query("UPDATE devices SET lock_state = $1 WHERE uuid = $2")
357                                    .bind(lock_state)
358                                    .bind(uuid_str)
359                                    .execute(db_pool)
360                                    .await;
361
362                            // Get locked_down_at
363                            let locked_down_at: Option<i64> = {
364                                let row: Option<(Option<chrono::DateTime<Utc>>,)> = sqlx::query_as(
365                                    "SELECT locked_down_at FROM devices WHERE uuid = $1",
366                                )
367                                .bind(uuid)
368                                .fetch_optional(db_pool)
369                                .await
370                                .unwrap_or(None);
371                                row.and_then(|(dt,)| dt.map(|d| d.timestamp_millis()))
372                            };
373
374                            // Broadcast update to owner and invited users
375                            if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
376                                let update = serde_json::json!({
377                                    "type": "device_update",
378                                    "device_id": uuid_str,
379                                    "lock_state": lock_state,
380                                    "timestamp": timestamp.timestamp_millis(),
381                                    "locked_down_at": locked_down_at
382                                })
383                                .to_string();
384
385                                // Get owner
386                                let owner_row: Option<(String,)> =
387                                    sqlx::query_as("SELECT user_id FROM devices WHERE uuid = $1")
388                                        .bind(uuid)
389                                        .fetch_optional(db_pool)
390                                        .await
391                                        .unwrap_or(None);
392
393                                let mut recipients = Vec::new();
394                                if let Some((owner_id,)) = owner_row {
395                                    recipients.push(owner_id);
396                                }
397
398                                // Get invited users with status = 1
399                                let invited_rows: Vec<(String,)> = sqlx::query_as(
400                                     "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
401                                 )
402                                 .bind(uuid)
403                                 .fetch_all(db_pool)
404                                 .await
405                                 .unwrap_or_default();
406
407                                for (receiver_id,) in invited_rows {
408                                    recipients.push(receiver_id);
409                                }
410
411                                // Send to each recipient
412                                let broadcasts = user_broadcasts.lock().unwrap();
413                                for recipient in recipients {
414                                    if let Some(tx) = broadcasts.get(&recipient) {
415                                        let _ = tx.send(update.clone());
416                                    }
417                                }
418                            }
419                        }
420                    }
421                }
422            }
423            Ok(_) => {}
424            Err(_) => {}
425        }
426    }
427}
428
429/// Publica uma mensagem de controle para um dispositivo via MQTT.
430/// Envia um comando para o UUID do dispositivo especificado.
431pub async fn publish_control_message(
432    client: &AsyncClient,
433    uuid: Uuid,
434    command: String,
435) -> Result<()> {
436    let topic = format!("lockwise/{}/control", uuid);
437    let msg = ControlMessage { command };
438    let payload = serde_cbor::to_vec(&msg)?;
439    client
440        .publish(topic, QoS::AtMostOnce, false, payload)
441        .await?;
442    Ok(())
443}