sqlx_postgres/
migrate.rs

1use std::str::FromStr;
2use std::time::Duration;
3use std::time::Instant;
4
5use futures_core::future::BoxFuture;
6
7pub(crate) use sqlx_core::migrate::MigrateError;
8pub(crate) use sqlx_core::migrate::{AppliedMigration, Migration};
9pub(crate) use sqlx_core::migrate::{Migrate, MigrateDatabase};
10
11use crate::connection::{ConnectOptions, Connection};
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::query::query;
15use crate::query_as::query_as;
16use crate::query_scalar::query_scalar;
17use crate::{PgConnectOptions, PgConnection, Postgres};
18
19fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
20    let mut options = PgConnectOptions::from_str(url)?;
21
22    // pull out the name of the database to create
23    let database = options
24        .database
25        .as_deref()
26        .unwrap_or(&options.username)
27        .to_owned();
28
29    // switch us to the maintenance database
30    // use `postgres` _unless_ the database is postgres, in which case, use `template1`
31    // this matches the behavior of the `createdb` util
32    options.database = if database == "postgres" {
33        Some("template1".into())
34    } else {
35        Some("postgres".into())
36    };
37
38    Ok((options, database))
39}
40
41impl MigrateDatabase for Postgres {
42    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            let (options, database) = parse_for_maintenance(url)?;
45            let mut conn = options.connect().await?;
46
47            let _ = conn
48                .execute(&*format!(
49                    "CREATE DATABASE \"{}\"",
50                    database.replace('"', "\"\"")
51                ))
52                .await?;
53
54            Ok(())
55        })
56    }
57
58    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
59        Box::pin(async move {
60            let (options, database) = parse_for_maintenance(url)?;
61            let mut conn = options.connect().await?;
62
63            let exists: bool =
64                query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
65                    .bind(database)
66                    .fetch_one(&mut conn)
67                    .await?;
68
69            Ok(exists)
70        })
71    }
72
73    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
74        Box::pin(async move {
75            let (options, database) = parse_for_maintenance(url)?;
76            let mut conn = options.connect().await?;
77
78            let _ = conn
79                .execute(&*format!(
80                    "DROP DATABASE IF EXISTS \"{}\"",
81                    database.replace('"', "\"\"")
82                ))
83                .await?;
84
85            Ok(())
86        })
87    }
88
89    fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
90        Box::pin(async move {
91            let (options, database) = parse_for_maintenance(url)?;
92            let mut conn = options.connect().await?;
93
94            let row: (String,) = query_as("SELECT current_setting('server_version_num')")
95                .fetch_one(&mut conn)
96                .await?;
97
98            let version = row.0.parse::<i32>().unwrap();
99
100            let pid_type = if version >= 90200 { "pid" } else { "procpid" };
101
102            conn.execute(&*format!(
103                "SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \
104                 WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()"
105            ))
106            .await?;
107
108            Self::drop_database(url).await
109        })
110    }
111}
112
113impl Migrate for PgConnection {
114    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
115        Box::pin(async move {
116            // language=SQL
117            self.execute(
118                r#"
119CREATE TABLE IF NOT EXISTS _sqlx_migrations (
120    version BIGINT PRIMARY KEY,
121    description TEXT NOT NULL,
122    installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
123    success BOOLEAN NOT NULL,
124    checksum BYTEA NOT NULL,
125    execution_time BIGINT NOT NULL
126);
127                "#,
128            )
129            .await?;
130
131            Ok(())
132        })
133    }
134
135    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
136        Box::pin(async move {
137            // language=SQL
138            let row: Option<(i64,)> = query_as(
139                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
140            )
141            .fetch_optional(self)
142            .await?;
143
144            Ok(row.map(|r| r.0))
145        })
146    }
147
148    fn list_applied_migrations(
149        &mut self,
150    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
151        Box::pin(async move {
152            // language=SQL
153            let rows: Vec<(i64, Vec<u8>)> =
154                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
155                    .fetch_all(self)
156                    .await?;
157
158            let migrations = rows
159                .into_iter()
160                .map(|(version, checksum)| AppliedMigration {
161                    version,
162                    checksum: checksum.into(),
163                })
164                .collect();
165
166            Ok(migrations)
167        })
168    }
169
170    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
171        Box::pin(async move {
172            let database_name = current_database(self).await?;
173            let lock_id = generate_lock_id(&database_name);
174
175            // create an application lock over the database
176            // this function will not return until the lock is acquired
177
178            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
179            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
180
181            // language=SQL
182            let _ = query("SELECT pg_advisory_lock($1)")
183                .bind(lock_id)
184                .execute(self)
185                .await?;
186
187            Ok(())
188        })
189    }
190
191    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
192        Box::pin(async move {
193            let database_name = current_database(self).await?;
194            let lock_id = generate_lock_id(&database_name);
195
196            // language=SQL
197            let _ = query("SELECT pg_advisory_unlock($1)")
198                .bind(lock_id)
199                .execute(self)
200                .await?;
201
202            Ok(())
203        })
204    }
205
206    fn apply<'e: 'm, 'm>(
207        &'e mut self,
208        migration: &'m Migration,
209    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
210        Box::pin(async move {
211            let mut tx = self.begin().await?;
212            let start = Instant::now();
213
214            // Use a single transaction for the actual migration script and the essential bookeeping so we never
215            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
216            // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
217            // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
218            // and update it once the actual transaction completed.
219            let _ = tx.execute(&*migration.sql).await?;
220
221            // language=SQL
222            let _ = query(
223                r#"
224    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
225    VALUES ( $1, $2, TRUE, $3, -1 )
226                "#,
227            )
228            .bind(migration.version)
229            .bind(&*migration.description)
230            .bind(&*migration.checksum)
231            .execute(&mut *tx)
232            .await?;
233
234            tx.commit().await?;
235
236            // Update `elapsed_time`.
237            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
238            //       this small risk since this value is not super important.
239
240            let elapsed = start.elapsed();
241
242            // language=SQL
243            let _ = query(
244                r#"
245    UPDATE _sqlx_migrations
246    SET execution_time = $1
247    WHERE version = $2
248                "#,
249            )
250            .bind(elapsed.as_nanos() as i64)
251            .bind(migration.version)
252            .execute(self)
253            .await?;
254
255            Ok(elapsed)
256        })
257    }
258
259    fn revert<'e: 'm, 'm>(
260        &'e mut self,
261        migration: &'m Migration,
262    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
263        Box::pin(async move {
264            // Use a single transaction for the actual migration script and the essential bookeeping so we never
265            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
266            let mut tx = self.begin().await?;
267            let start = Instant::now();
268
269            let _ = tx.execute(&*migration.sql).await?;
270
271            // language=SQL
272            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
273                .bind(migration.version)
274                .execute(&mut *tx)
275                .await?;
276
277            tx.commit().await?;
278
279            let elapsed = start.elapsed();
280
281            Ok(elapsed)
282        })
283    }
284}
285
286async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
287    // language=SQL
288    Ok(query_scalar("SELECT current_database()")
289        .fetch_one(conn)
290        .await?)
291}
292
293// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
294fn generate_lock_id(database_name: &str) -> i64 {
295    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
296    // 0x3d32ad9e chosen by fair dice roll
297    0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
298}