lockwise_backend/mqtt.rs
1//! Módulo para comunicação MQTT.
2//!
3//! Este módulo gerencia a conexão MQTT com dispositivos, incluindo publicação de comandos,
4//! recebimento de mensagens de status e processamento de heartbeats.
5use anyhow::Result;
6use chrono::{TimeZone, Utc};
7use rumqttc::{AsyncClient, Event, Incoming, QoS};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10use uuid::Uuid;
11
12use super::device::LockStatusMessage;
13
14/// Estrutura de mensagem para relatórios de heartbeat de dispositivos via MQTT
15#[derive(Deserialize)]
16struct HeartbeatMessage {
17 /// Tipo de mensagem de heartbeat
18 heartbeat: String,
19 /// Tempo de atividade do dispositivo em milissegundos
20 uptime_ms: u64,
21 /// Timestamp da mensagem
22 #[allow(dead_code)]
23 timestamp: u64,
24 /// SSID WiFi ao qual o dispositivo está conectado
25 wifi_ssid: String,
26 /// URL do back-end configurada no dispositivo
27 backend_url: String,
28 /// URL do broker MQTT configurada no dispositivo
29 mqtt_broker_url: String,
30 /// Se o heartbeat MQTT está habilitado
31 mqtt_heartbeat_enable: bool,
32 /// Intervalo de heartbeat em segundos
33 mqtt_heartbeat_interval_sec: i32,
34 /// Tempo limite de gravação de áudio em segundos
35 audio_record_timeout_sec: i32,
36 /// Tempo limite de bloqueio em milissegundos
37 lock_timeout_ms: i32,
38 /// Tempo limite de pareamento em segundos
39 pairing_timeout_sec: i32,
40 /// ID do usuário associado ao dispositivo
41 user_id: String,
42 /// Estado atual de bloqueio
43 lock_state: Option<String>,
44 /// Se a detecção de voz está habilitada
45 voice_detection_enable: bool,
46 /// Limiar RMS para detecção de atividade de voz
47 vad_rms_threshold: i32,
48}
49
50/// Estrutura de mensagem para relatórios de eventos de dispositivos via MQTT
51#[derive(Deserialize)]
52struct EventMessage {
53 /// Tipo de evento (ex.: PONG, CONFIG_UPDATED, LOCKING_DOWN)
54 event: String,
55 /// Tempo de atividade do dispositivo em milissegundos
56 #[allow(dead_code)]
57 uptime_ms: u64,
58 /// Timestamp do evento
59 timestamp: u64,
60}
61
62/// Estrutura de mensagem para enviar comandos de controle aos dispositivos via MQTT
63#[derive(Serialize)]
64struct ControlMessage {
65 /// O comando a enviar (ex.: LOCK, UNLOCK, PING)
66 command: String,
67}
68
69/// Manipula eventos MQTT recebidos dos dispositivos.
70/// Processa mensagens de heartbeat, eventos (PONG, CONFIG_UPDATED, LOCKING_DOWN)
71/// e atualizações de status de bloqueio, atualizando o banco de dados conforme necessário.
72pub async fn handle_mqtt_events(db_pool: &PgPool, eventloop: &mut rumqttc::EventLoop) {
73 loop {
74 match eventloop.poll().await {
75 Ok(Event::Incoming(Incoming::Publish(publish))) => {
76 let topic = publish.topic;
77 if topic.starts_with("lockwise/") && topic.ends_with("/status") {
78 let uuid_str = &topic[9..topic.len() - 7]; // extract UUID
79 if let Ok(uuid) = Uuid::parse_str(uuid_str) {
80 // Try to parse as HeartbeatMessage first (has heartbeat field)
81 if let Ok(heartbeat_msg) =
82 serde_cbor::from_slice::<HeartbeatMessage>(&publish.payload)
83 {
84 if heartbeat_msg.heartbeat == "HEARTBEAT" {
85 // Handle HEARTBEAT
86 let now = Utc::now();
87 let lock_state =
88 heartbeat_msg.lock_state.as_deref().unwrap_or("UNKNOWN");
89
90 // Check if device is in lockdown and heartbeat is at least 10 seconds after lockdown
91 let should_clear_lockdown = {
92 let row: Option<(Option<chrono::DateTime<chrono::Utc>>,)> =
93 sqlx::query_as(
94 "SELECT locked_down_at FROM devices WHERE uuid = $1",
95 )
96 .bind(uuid)
97 .fetch_optional(db_pool)
98 .await
99 .unwrap_or(None);
100
101 if let Some((Some(locked_down_at),)) = row {
102 let duration_since_lockdown = now - locked_down_at;
103 duration_since_lockdown.num_seconds() >= 10
104 } else {
105 false
106 }
107 };
108
109 let update_query = if should_clear_lockdown {
110 "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase, locked_down_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL, NULL)
111 ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15, locked_down_at = NULL"
112 } else {
113 "INSERT INTO devices (uuid, user_id, last_heard, uptime_ms, wifi_ssid, backend_url, mqtt_broker_url, mqtt_heartbeat_enable, mqtt_heartbeat_interval_sec, audio_record_timeout_sec, lock_timeout_ms, pairing_timeout_sec, lock_state, voice_detection_enable, vad_rms_threshold, hashed_passphrase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NULL)
114 ON CONFLICT (uuid) DO UPDATE SET user_id = $2, last_heard = $3, uptime_ms = $4, wifi_ssid = $5, backend_url = $6, mqtt_broker_url = $7, mqtt_heartbeat_enable = $8, mqtt_heartbeat_interval_sec = $9, audio_record_timeout_sec = $10, lock_timeout_ms = $11, pairing_timeout_sec = $12, lock_state = $13, voice_detection_enable = $14, vad_rms_threshold = $15"
115 };
116
117 let _ = sqlx::query(update_query)
118 .bind(uuid)
119 .bind(&heartbeat_msg.user_id)
120 .bind(now)
121 .bind(heartbeat_msg.uptime_ms as i64)
122 .bind(&heartbeat_msg.wifi_ssid)
123 .bind(&heartbeat_msg.backend_url)
124 .bind(&heartbeat_msg.mqtt_broker_url)
125 .bind(heartbeat_msg.mqtt_heartbeat_enable)
126 .bind(heartbeat_msg.mqtt_heartbeat_interval_sec)
127 .bind(heartbeat_msg.audio_record_timeout_sec)
128 .bind(heartbeat_msg.lock_timeout_ms)
129 .bind(heartbeat_msg.pairing_timeout_sec)
130 .bind(lock_state)
131 .bind(heartbeat_msg.voice_detection_enable)
132 .bind(heartbeat_msg.vad_rms_threshold)
133 .execute(db_pool)
134 .await;
135 }
136 } else if let Ok(event_msg) =
137 serde_cbor::from_slice::<EventMessage>(&publish.payload)
138 {
139 if event_msg.event == "PONG" {
140 // Handle PONG
141 let pings_mutex = super::PENDING_PINGS.get().unwrap();
142 let mut pings = pings_mutex.lock().unwrap();
143 if let Some((_, tx)) = pings.remove(&uuid_str.to_string()) {
144 tx.send(()).ok();
145 }
146 } else if event_msg.event == "CONFIG_UPDATED" {
147 // Handle CONFIG_UPDATED
148 let updates_mutex = super::PENDING_CONFIG_UPDATES.get().unwrap();
149 let mut updates = updates_mutex.lock().unwrap();
150 if let Some(tx) = updates.remove(&uuid_str.to_string()) {
151 tx.send(()).ok();
152 }
153 } else if event_msg.event == "LOCKING_DOWN" {
154 // LOCKING_DOWN event - set locked_down_at
155 let timestamp = Utc
156 .timestamp_millis_opt(event_msg.timestamp as i64 * 1000)
157 .unwrap();
158 let result = sqlx::query(
159 "UPDATE devices SET locked_down_at = $1 WHERE uuid = $2",
160 )
161 .bind(timestamp)
162 .bind(uuid)
163 .execute(db_pool)
164 .await;
165 if result.is_ok() {}
166 }
167 } else if let Ok(lock_msg) =
168 serde_cbor::from_slice::<LockStatusMessage>(&publish.payload)
169 {
170 // LOCK/UNLOCK event
171 let event_type = if lock_msg.lock == "LOCKED" {
172 "LOCK"
173 } else {
174 "UNLOCK"
175 };
176 let reason = &lock_msg.reason;
177 let timestamp = Utc
178 .timestamp_millis_opt(lock_msg.timestamp as i64 * 1000)
179 .unwrap();
180
181 // Check for recent command
182 let user_id = {
183 let commands_mutex = super::RECENT_COMMANDS.get().unwrap();
184 let mut commands = commands_mutex.lock().unwrap();
185 if let Some((uid, cmd_time)) = commands.get(&uuid_str.to_string()) {
186 let now = Utc::now().timestamp();
187 if now - cmd_time < 5 {
188 // within 5 seconds
189 let uid = uid.clone();
190 commands.remove(&uuid_str.to_string());
191 Some(uid)
192 } else {
193 None
194 }
195 } else {
196 None
197 }
198 };
199
200 // Insert log
201 let _ = sqlx::query(
202 "INSERT INTO logs (device_id, timestamp, event_type, reason, user_id) VALUES ($1, $2, $3, $4, $5)"
203 )
204 .bind(uuid_str)
205 .bind(timestamp)
206 .bind(event_type)
207 .bind(reason)
208 .bind(&user_id)
209 .execute(db_pool)
210 .await;
211
212 // Update lock_state
213 let lock_state = if lock_msg.lock == "LOCKED" {
214 "LOCKED"
215 } else {
216 "UNLOCKED"
217 };
218 let _ =
219 sqlx::query("UPDATE devices SET lock_state = $1 WHERE uuid = $2")
220 .bind(lock_state)
221 .bind(uuid_str)
222 .execute(db_pool)
223 .await;
224 }
225 }
226 }
227 }
228 Ok(_) => {}
229 Err(_) => {}
230 }
231 }
232}
233
234/// Publica uma mensagem de controle para um dispositivo via MQTT.
235/// Envia um comando para o UUID do dispositivo especificado.
236pub async fn publish_control_message(
237 client: &AsyncClient,
238 uuid: Uuid,
239 command: String,
240) -> Result<()> {
241 let topic = format!("lockwise/{}/control", uuid);
242 let msg = ControlMessage { command };
243 let payload = serde_cbor::to_vec(&msg)?;
244 client
245 .publish(topic, QoS::AtMostOnce, false, payload)
246 .await?;
247 Ok(())
248}