sqlx_postgres/
any.rs

1use crate::{
2    Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager,
3    PgTypeInfo, Postgres,
4};
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
8
9pub use sqlx_core::any::*;
10
11use crate::type_info::PgType;
12use sqlx_core::connection::Connection;
13use sqlx_core::database::Database;
14use sqlx_core::describe::Describe;
15use sqlx_core::executor::Executor;
16use sqlx_core::ext::ustr::UStr;
17use sqlx_core::transaction::TransactionManager;
18
19sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Postgres);
20
21impl AnyConnectionBackend for PgConnection {
22    fn name(&self) -> &str {
23        <Postgres as Database>::NAME
24    }
25
26    fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
27        Connection::close(*self)
28    }
29
30    fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
31        Connection::close_hard(*self)
32    }
33
34    fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
35        Connection::ping(self)
36    }
37
38    fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
39        PgTransactionManager::begin(self)
40    }
41
42    fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
43        PgTransactionManager::commit(self)
44    }
45
46    fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
47        PgTransactionManager::rollback(self)
48    }
49
50    fn start_rollback(&mut self) {
51        PgTransactionManager::start_rollback(self)
52    }
53
54    fn shrink_buffers(&mut self) {
55        Connection::shrink_buffers(self);
56    }
57
58    fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
59        Connection::flush(self)
60    }
61
62    fn should_flush(&self) -> bool {
63        Connection::should_flush(self)
64    }
65
66    #[cfg(feature = "migrate")]
67    fn as_migrate(
68        &mut self,
69    ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
70        Ok(self)
71    }
72
73    fn fetch_many<'q>(
74        &'q mut self,
75        query: &'q str,
76        arguments: Option<AnyArguments<'q>>,
77    ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
78        let persistent = arguments.is_some();
79        let args = arguments.as_ref().map(AnyArguments::convert_to);
80
81        Box::pin(
82            self.run(query, args, 0, persistent, None)
83                .try_flatten_stream()
84                .map(
85                    move |res: sqlx_core::Result<Either<PgQueryResult, PgRow>>| match res? {
86                        Either::Left(result) => Ok(Either::Left(map_result(result))),
87                        Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
88                    },
89                ),
90        )
91    }
92
93    fn fetch_optional<'q>(
94        &'q mut self,
95        query: &'q str,
96        arguments: Option<AnyArguments<'q>>,
97    ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
98        let persistent = arguments.is_some();
99        let args = arguments.as_ref().map(AnyArguments::convert_to);
100
101        Box::pin(async move {
102            let stream = self.run(query, args, 1, persistent, None).await?;
103            futures_util::pin_mut!(stream);
104
105            if let Some(Either::Right(row)) = stream.try_next().await? {
106                return Ok(Some(AnyRow::try_from(&row)?));
107            }
108
109            Ok(None)
110        })
111    }
112
113    fn prepare_with<'c, 'q: 'c>(
114        &'c mut self,
115        sql: &'q str,
116        _parameters: &[AnyTypeInfo],
117    ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
118        Box::pin(async move {
119            let statement = Executor::prepare_with(self, sql, &[]).await?;
120            AnyStatement::try_from_statement(
121                sql,
122                &statement,
123                statement.metadata.column_names.clone(),
124            )
125        })
126    }
127
128    fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
129        Box::pin(async move {
130            let describe = Executor::describe(self, sql).await?;
131
132            let columns = describe
133                .columns
134                .iter()
135                .map(AnyColumn::try_from)
136                .collect::<Result<Vec<_>, _>>()?;
137
138            let parameters = match describe.parameters {
139                Some(Either::Left(parameters)) => Some(Either::Left(
140                    parameters
141                        .iter()
142                        .enumerate()
143                        .map(|(i, type_info)| {
144                            AnyTypeInfo::try_from(type_info).map_err(|_| {
145                                sqlx_core::Error::AnyDriverError(
146                                    format!(
147                                        "Any driver does not support type {type_info} of parameter {i}"
148                                    )
149                                    .into(),
150                                )
151                            })
152                        })
153                        .collect::<Result<Vec<_>, _>>()?,
154                )),
155                Some(Either::Right(count)) => Some(Either::Right(count)),
156                None => None,
157            };
158
159            Ok(Describe {
160                columns,
161                parameters,
162                nullable: describe.nullable,
163            })
164        })
165    }
166}
167
168impl<'a> TryFrom<&'a PgTypeInfo> for AnyTypeInfo {
169    type Error = sqlx_core::Error;
170
171    fn try_from(pg_type: &'a PgTypeInfo) -> Result<Self, Self::Error> {
172        Ok(AnyTypeInfo {
173            kind: match &pg_type.0 {
174                PgType::Void => AnyTypeInfoKind::Null,
175                PgType::Int2 => AnyTypeInfoKind::SmallInt,
176                PgType::Int4 => AnyTypeInfoKind::Integer,
177                PgType::Int8 => AnyTypeInfoKind::BigInt,
178                PgType::Float4 => AnyTypeInfoKind::Real,
179                PgType::Float8 => AnyTypeInfoKind::Double,
180                PgType::Bytea => AnyTypeInfoKind::Blob,
181                PgType::Text | PgType::Varchar => AnyTypeInfoKind::Text,
182                PgType::DeclareWithName(UStr::Static("citext")) => AnyTypeInfoKind::Text,
183                _ => {
184                    return Err(sqlx_core::Error::AnyDriverError(
185                        format!("Any driver does not support the Postgres type {pg_type:?}").into(),
186                    ))
187                }
188            },
189        })
190    }
191}
192
193impl<'a> TryFrom<&'a PgColumn> for AnyColumn {
194    type Error = sqlx_core::Error;
195
196    fn try_from(col: &'a PgColumn) -> Result<Self, Self::Error> {
197        let type_info =
198            AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
199                index: col.name.to_string(),
200                source: e.into(),
201            })?;
202
203        Ok(AnyColumn {
204            ordinal: col.ordinal,
205            name: col.name.clone(),
206            type_info,
207        })
208    }
209}
210
211impl<'a> TryFrom<&'a PgRow> for AnyRow {
212    type Error = sqlx_core::Error;
213
214    fn try_from(row: &'a PgRow) -> Result<Self, Self::Error> {
215        AnyRow::map_from(row, row.metadata.column_names.clone())
216    }
217}
218
219impl<'a> TryFrom<&'a AnyConnectOptions> for PgConnectOptions {
220    type Error = sqlx_core::Error;
221
222    fn try_from(value: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
223        let mut opts = PgConnectOptions::parse_from_url(&value.database_url)?;
224        opts.log_settings = value.log_settings.clone();
225        Ok(opts)
226    }
227}
228
229fn map_result(res: PgQueryResult) -> AnyQueryResult {
230    AnyQueryResult {
231        rows_affected: res.rows_affected(),
232        last_insert_id: None,
233    }
234}