lockwise_backend/mqtt.rs
1//! Módulo para comunicação MQTT.
2//!
3//! Este módulo gerencia a conexão MQTT com dispositivos, incluindo publicação de comandos,
4//! recebimento de mensagens de status e processamento de heartbeats.
5use anyhow::Result;
6use chrono::{TimeZone, Utc};
7use rumqttc::{AsyncClient, Event, Incoming, QoS};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10use uuid::Uuid;
11
12use super::device::LockStatusMessage;
13
14/// Estrutura de mensagem para relatórios de heartbeat de dispositivos via MQTT
15#[derive(Deserialize)]
16struct HeartbeatMessage {
17 /// Tipo de mensagem de heartbeat
18 heartbeat: String,
19 /// Tempo de atividade do dispositivo em milissegundos
20 uptime_ms: u64,
21 /// Timestamp da mensagem
22 #[allow(dead_code)]
23 timestamp: u64,
24 /// SSID WiFi ao qual o dispositivo está conectado
25 wifi_ssid: String,
26 /// URL do back-end configurada no dispositivo
27 backend_url: String,
28 /// URL do broker MQTT configurada no dispositivo
29 mqtt_broker_url: String,
30 /// Se o heartbeat MQTT está habilitado
31 mqtt_heartbeat_enable: bool,
32 /// Intervalo de heartbeat em segundos
33 mqtt_heartbeat_interval_sec: i32,
34 /// Tempo limite de gravação de áudio em segundos
35 audio_record_timeout_sec: i32,
36 /// Tempo limite de bloqueio em milissegundos
37 lock_timeout_ms: i32,
38 /// Tempo limite de pareamento em segundos
39 pairing_timeout_sec: i32,
40 /// ID do usuário associado ao dispositivo
41 user_id: String,
42 /// Estado atual de bloqueio
43 lock_state: Option<String>,
44 /// Se a detecção de voz está habilitada
45 voice_detection_enable: bool,
46 /// Limiar RMS para detecção de atividade de voz
47 vad_rms_threshold: i32,
48}
49
50/// Estrutura de mensagem para relatórios de eventos de dispositivos via MQTT
51#[derive(Deserialize)]
52struct EventMessage {
53 /// Tipo de evento (ex.: PONG, CONFIG_UPDATED, LOCKING_DOWN)
54 event: String,
55 /// Tempo de atividade do dispositivo em milissegundos
56 #[allow(dead_code)]
57 uptime_ms: u64,
58 /// Timestamp do evento
59 timestamp: u64,
60}
61
62/// Estrutura de mensagem para enviar comandos de controle aos dispositivos via MQTT
63#[derive(Serialize)]
64struct ControlMessage {
65 /// O comando a enviar (ex.: LOCK, UNLOCK, PING)
66 command: String,
67}
68
69/// Manipula eventos MQTT recebidos dos dispositivos.
70/// Processa mensagens de heartbeat, eventos (PONG, CONFIG_UPDATED, LOCKING_DOWN)
71/// e atualizações de status de bloqueio, atualizando o banco de dados conforme necessário.
72/// Também envia atualizações em tempo real via WebSocket para usuários conectados.
73pub async fn handle_mqtt_events(db_pool: &PgPool, eventloop: &mut rumqttc::EventLoop) {
74 loop {
75 match eventloop.poll().await {
76 Ok(Event::Incoming(Incoming::Publish(publish))) => {
77 let topic = publish.topic;
78 if topic.starts_with("lockwise/") && topic.ends_with("/status") {
79 let uuid_str = &topic[9..topic.len() - 7]; // extract UUID
80 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
81 // Try to parse as HeartbeatMessage first (has heartbeat field)
82 if let Ok(heartbeat_msg) =
83 serde_cbor::from_slice::<HeartbeatMessage>(&publish.payload)
84 {
85 if heartbeat_msg.heartbeat == "HEARTBEAT" {
86 // Handle HEARTBEAT
87 let now = Utc::now();
88 let lock_state =
89 heartbeat_msg.lock_state.as_deref().unwrap_or("UNKNOWN");
90
91 // Check if device is in lockdown and heartbeat is at least 10 seconds after lockdown
92 let should_clear_lockdown = {
93 let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> =
94 sqlx::query_as(
95 "SELECT locked_down_at FROM devices WHERE uuid = $1",
96 )
97 .bind(uuid)
98 .fetch_optional(db_pool)
99 .await
100 .unwrap_or(None);
101
102 if let Some((Some(locked_down_at),)) = row {
103 let duration_since_lockdown = now - locked_down_at;
104 duration_since_lockdown.num_seconds() >= 10
105 } else {
106 false
107 }
108 };
109
110 let update_query = if should_clear_lockdown {
111 "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase, locked_down_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL, NULL)
112 ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15, locked_down_at = NULL"
113 } else {
114 "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL)
115 ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15"
116 };
117
118 let _ = sqlx::query(update_query)
119 .bind(uuid)
120 .bind(&heartbeat_msg.user_id)
121 .bind(now)
122 .bind(heartbeat_msg.uptime_ms as i64)
123 .bind(&heartbeat_msg.wifi_ssid)
124 .bind(&heartbeat_msg.backend_url)
125 .bind(&heartbeat_msg.mqtt_broker_url)
126 .bind(heartbeat_msg.mqtt_heartbeat_enable)
127 .bind(heartbeat_msg.mqtt_heartbeat_interval_sec)
128 .bind(heartbeat_msg.audio_record_timeout_sec)
129 .bind(heartbeat_msg.lock_timeout_ms)
130 .bind(heartbeat_msg.pairing_timeout_sec)
131 .bind(lock_state)
132 .bind(heartbeat_msg.voice_detection_enable)
133 .bind(heartbeat_msg.vad_rms_threshold)
134 .execute(db_pool)
135 .await;
136
137 // Broadcast device online update to owner and invited users
138 if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
139 let update = serde_json::json!({
140 "type": "device_online",
141 "device_id": uuid_str,
142 "last_heard": now.timestamp_millis(),
143 "lock_state": lock_state,
144 "locked_down_at": null
145 })
146 .to_string();
147
148 // Get owner
149 let owner_row: Option<(String,)> = sqlx::query_as(
150 "SELECT user_id FROM devices WHERE uuid = $1",
151 )
152 .bind(uuid)
153 .fetch_optional(db_pool)
154 .await
155 .unwrap_or(None);
156
157 let mut recipients = Vec::new();
158 if let Some((owner_id,)) = owner_row {
159 recipients.push(owner_id);
160 }
161
162 // Get invited users with status = 1
163 let invited_rows: Vec<(String,)> = sqlx::query_as(
164 "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
165 )
166 .bind(uuid)
167 .fetch_all(db_pool)
168 .await
169 .unwrap_or_default();
170
171 for (receiver_id,) in invited_rows {
172 recipients.push(receiver_id);
173 }
174
175 // Send to each recipient
176 let broadcasts = user_broadcasts.lock().unwrap();
177 for recipient in recipients {
178 if let Some(tx) = broadcasts.get(&recipient) {
179 let _ = tx.send(update.clone());
180 }
181 }
182 }
183 }
184 } else if let Ok(event_msg) =
185 serde_cbor::from_slice::<EventMessage>(&publish.payload)
186 {
187 if event_msg.event == "PONG" {
188 // Handle PONG
189 let pings_mutex = super::PENDING_PINGS.get().unwrap();
190 let mut pings = pings_mutex.lock().unwrap();
191 if let Some((_, tx)) = pings.remove(&uuid_str.to_string()) {
192 tx.send(()).ok();
193 }
194 } else if event_msg.event == "CONFIG_UPDATED" {
195 // Handle CONFIG_UPDATED
196 let updates_mutex = super::PENDING_CONFIG_UPDATES.get().unwrap();
197 let mut updates = updates_mutex.lock().unwrap();
198 if let Some(tx) = updates.remove(&uuid_str.to_string()) {
199 tx.send(()).ok();
200 }
201 } else if event_msg.event == "LOCKING_DOWN" {
202 // LOCKING_DOWN event - set locked_down_at
203 let timestamp = Utc
204 .timestamp_millis_opt(event_msg.timestamp as i64 * 1000)
205 .unwrap();
206 let result = sqlx::query(
207 "UPDATE devices SET locked_down_at = $1 WHERE uuid = $2",
208 )
209 .bind(timestamp)
210 .bind(uuid)
211 .execute(db_pool)
212 .await;
213 if result.is_ok() {
214 // Broadcast device update to owner and invited users
215 if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
216 let update = serde_json::json!({
217 "type": "device_update",
218 "device_id": uuid_str,
219 "lock_state": "LOCKED",
220 "locked_down_at": timestamp.timestamp_millis()
221 })
222 .to_string();
223
224 // Get owner
225 let owner_row: Option<(String,)> = sqlx::query_as(
226 "SELECT user_id FROM devices WHERE uuid = $1",
227 )
228 .bind(uuid)
229 .fetch_optional(db_pool)
230 .await
231 .unwrap_or(None);
232
233 let mut recipients = Vec::new();
234 if let Some((owner_id,)) = owner_row {
235 recipients.push(owner_id);
236 }
237
238 // Get invited users with status = 1
239 let invited_rows: Vec<(String,)> = sqlx::query_as(
240 "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
241 )
242 .bind(uuid)
243 .fetch_all(db_pool)
244 .await
245 .unwrap_or_default();
246
247 for (receiver_id,) in invited_rows {
248 recipients.push(receiver_id);
249 }
250
251 // Send to each recipient
252 let broadcasts = user_broadcasts.lock().unwrap();
253 for recipient in recipients {
254 if let Some(tx) = broadcasts.get(&recipient) {
255 let _ = tx.send(update.clone());
256 }
257 }
258 }
259 }
260 }
261 } else if let Ok(lock_msg) =
262 serde_cbor::from_slice::<LockStatusMessage>(&publish.payload)
263 {
264 // LOCK/UNLOCK event
265 let event_type = if lock_msg.lock == "LOCKED" {
266 "LOCK"
267 } else {
268 "UNLOCK"
269 };
270 let reason = &lock_msg.reason;
271 let timestamp = Utc
272 .timestamp_millis_opt(lock_msg.timestamp as i64 * 1000)
273 .unwrap();
274
275 // Check for recent command
276 let user_id = {
277 let commands_mutex = super::RECENT_COMMANDS.get().unwrap();
278 let mut commands = commands_mutex.lock().unwrap();
279 if let Some((uid, cmd_time)) = commands.get(&uuid_str.to_string()) {
280 let now = Utc::now().timestamp();
281 if now - cmd_time < 5 {
282 // within 5 seconds
283 let uid = uid.clone();
284 commands.remove(&uuid_str.to_string());
285 Some(uid)
286 } else {
287 None
288 }
289 } else {
290 None
291 }
292 };
293
294 // Insert log
295 let _ = sqlx::query(
296 "INSERT INTO logs (device_id, timestamp, event_type, reason, user_id) VALUES ($1, $2, $3, $4, $5)"
297 )
298 .bind(uuid_str)
299 .bind(timestamp)
300 .bind(event_type)
301 .bind(reason)
302 .bind(&user_id)
303 .execute(db_pool)
304 .await;
305
306 // Broadcast log update to owner only
307 if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
308 // Get user name if user_id is present
309 let user_name = if let Some(ref uid) = user_id {
310 let row: Option<(String,)> = sqlx::query_as(
311 "SELECT name FROM users WHERE firebase_uid = $1",
312 )
313 .bind(uid)
314 .fetch_optional(db_pool)
315 .await
316 .unwrap_or(None);
317 row.map(|(name,)| name)
318 } else {
319 None
320 };
321
322 let log_update = serde_json::json!({
323 "type": "log_update",
324 "device_id": uuid_str,
325 "timestamp": timestamp.timestamp_millis(),
326 "event_type": event_type,
327 "reason": reason,
328 "user_id": user_id,
329 "user_name": user_name
330 })
331 .to_string();
332
333 // Get owner
334 let owner_row: Option<(String,)> =
335 sqlx::query_as("SELECT user_id FROM devices WHERE uuid = $1")
336 .bind(uuid)
337 .fetch_optional(db_pool)
338 .await
339 .unwrap_or(None);
340
341 if let Some((owner_id,)) = owner_row {
342 let broadcasts = user_broadcasts.lock().unwrap();
343 if let Some(tx) = broadcasts.get(&owner_id) {
344 let _ = tx.send(log_update);
345 }
346 }
347 }
348
349 // Update lock_state
350 let lock_state = if lock_msg.lock == "LOCKED" {
351 "LOCKED"
352 } else {
353 "UNLOCKED"
354 };
355 let _ =
356 sqlx::query("UPDATE devices SET lock_state = $1 WHERE uuid = $2")
357 .bind(lock_state)
358 .bind(uuid_str)
359 .execute(db_pool)
360 .await;
361
362 // Get locked_down_at
363 let locked_down_at: Option<i64> = {
364 let row: Option<(Option<chrono::DateTime<Utc>>,)> = sqlx::query_as(
365 "SELECT locked_down_at FROM devices WHERE uuid = $1",
366 )
367 .bind(uuid)
368 .fetch_optional(db_pool)
369 .await
370 .unwrap_or(None);
371 row.and_then(|(dt,)| dt.map(|d| d.timestamp_millis()))
372 };
373
374 // Broadcast update to owner and invited users
375 if let Some(user_broadcasts) = super::USER_BROADCASTS.get() {
376 let update = serde_json::json!({
377 "type": "device_update",
378 "device_id": uuid_str,
379 "lock_state": lock_state,
380 "timestamp": timestamp.timestamp_millis(),
381 "locked_down_at": locked_down_at
382 })
383 .to_string();
384
385 // Get owner
386 let owner_row: Option<(String,)> =
387 sqlx::query_as("SELECT user_id FROM devices WHERE uuid = $1")
388 .bind(uuid)
389 .fetch_optional(db_pool)
390 .await
391 .unwrap_or(None);
392
393 let mut recipients = Vec::new();
394 if let Some((owner_id,)) = owner_row {
395 recipients.push(owner_id);
396 }
397
398 // Get invited users with status = 1
399 let invited_rows: Vec<(String,)> = sqlx::query_as(
400 "SELECT receiver_id FROM invites WHERE device_id = $1 AND status = 1"
401 )
402 .bind(uuid)
403 .fetch_all(db_pool)
404 .await
405 .unwrap_or_default();
406
407 for (receiver_id,) in invited_rows {
408 recipients.push(receiver_id);
409 }
410
411 // Send to each recipient
412 let broadcasts = user_broadcasts.lock().unwrap();
413 for recipient in recipients {
414 if let Some(tx) = broadcasts.get(&recipient) {
415 let _ = tx.send(update.clone());
416 }
417 }
418 }
419 }
420 }
421 }
422 }
423 Ok(_) => {}
424 Err(_) => {}
425 }
426 }
427}
428
429/// Publica uma mensagem de controle para um dispositivo via MQTT.
430/// Envia um comando para o UUID do dispositivo especificado.
431pub async fn publish_control_message(
432 client: &AsyncClient,
433 uuid: Uuid,
434 command: String,
435) -> Result<()> {
436 let topic = format!("lockwise/{}/control", uuid);
437 let msg = ControlMessage { command };
438 let payload = serde_cbor::to_vec(&msg)?;
439 client
440 .publish(topic, QoS::AtMostOnce, false, payload)
441 .await?;
442 Ok(())
443}