1use crate::{
2 Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager,
3 PgTypeInfo, Postgres,
4};
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
8
9pub use sqlx_core::any::*;
10
11use crate::type_info::PgType;
12use sqlx_core::connection::Connection;
13use sqlx_core::database::Database;
14use sqlx_core::describe::Describe;
15use sqlx_core::executor::Executor;
16use sqlx_core::ext::ustr::UStr;
17use sqlx_core::transaction::TransactionManager;
18
19sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Postgres);
20
21impl AnyConnectionBackend for PgConnection {
22 fn name(&self) -> &str {
23 <Postgres as Database>::NAME
24 }
25
26 fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
27 Connection::close(*self)
28 }
29
30 fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
31 Connection::close_hard(*self)
32 }
33
34 fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
35 Connection::ping(self)
36 }
37
38 fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
39 PgTransactionManager::begin(self)
40 }
41
42 fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
43 PgTransactionManager::commit(self)
44 }
45
46 fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
47 PgTransactionManager::rollback(self)
48 }
49
50 fn start_rollback(&mut self) {
51 PgTransactionManager::start_rollback(self)
52 }
53
54 fn shrink_buffers(&mut self) {
55 Connection::shrink_buffers(self);
56 }
57
58 fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
59 Connection::flush(self)
60 }
61
62 fn should_flush(&self) -> bool {
63 Connection::should_flush(self)
64 }
65
66 #[cfg(feature = "migrate")]
67 fn as_migrate(
68 &mut self,
69 ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
70 Ok(self)
71 }
72
73 fn fetch_many<'q>(
74 &'q mut self,
75 query: &'q str,
76 arguments: Option<AnyArguments<'q>>,
77 ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
78 let persistent = arguments.is_some();
79 let args = arguments.as_ref().map(AnyArguments::convert_to);
80
81 Box::pin(
82 self.run(query, args, 0, persistent, None)
83 .try_flatten_stream()
84 .map(
85 move |res: sqlx_core::Result<Either<PgQueryResult, PgRow>>| match res? {
86 Either::Left(result) => Ok(Either::Left(map_result(result))),
87 Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
88 },
89 ),
90 )
91 }
92
93 fn fetch_optional<'q>(
94 &'q mut self,
95 query: &'q str,
96 arguments: Option<AnyArguments<'q>>,
97 ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
98 let persistent = arguments.is_some();
99 let args = arguments.as_ref().map(AnyArguments::convert_to);
100
101 Box::pin(async move {
102 let stream = self.run(query, args, 1, persistent, None).await?;
103 futures_util::pin_mut!(stream);
104
105 if let Some(Either::Right(row)) = stream.try_next().await? {
106 return Ok(Some(AnyRow::try_from(&row)?));
107 }
108
109 Ok(None)
110 })
111 }
112
113 fn prepare_with<'c, 'q: 'c>(
114 &'c mut self,
115 sql: &'q str,
116 _parameters: &[AnyTypeInfo],
117 ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
118 Box::pin(async move {
119 let statement = Executor::prepare_with(self, sql, &[]).await?;
120 AnyStatement::try_from_statement(
121 sql,
122 &statement,
123 statement.metadata.column_names.clone(),
124 )
125 })
126 }
127
128 fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
129 Box::pin(async move {
130 let describe = Executor::describe(self, sql).await?;
131
132 let columns = describe
133 .columns
134 .iter()
135 .map(AnyColumn::try_from)
136 .collect::<Result<Vec<_>, _>>()?;
137
138 let parameters = match describe.parameters {
139 Some(Either::Left(parameters)) => Some(Either::Left(
140 parameters
141 .iter()
142 .enumerate()
143 .map(|(i, type_info)| {
144 AnyTypeInfo::try_from(type_info).map_err(|_| {
145 sqlx_core::Error::AnyDriverError(
146 format!(
147 "Any driver does not support type {type_info} of parameter {i}"
148 )
149 .into(),
150 )
151 })
152 })
153 .collect::<Result<Vec<_>, _>>()?,
154 )),
155 Some(Either::Right(count)) => Some(Either::Right(count)),
156 None => None,
157 };
158
159 Ok(Describe {
160 columns,
161 parameters,
162 nullable: describe.nullable,
163 })
164 })
165 }
166}
167
168impl<'a> TryFrom<&'a PgTypeInfo> for AnyTypeInfo {
169 type Error = sqlx_core::Error;
170
171 fn try_from(pg_type: &'a PgTypeInfo) -> Result<Self, Self::Error> {
172 Ok(AnyTypeInfo {
173 kind: match &pg_type.0 {
174 PgType::Void => AnyTypeInfoKind::Null,
175 PgType::Int2 => AnyTypeInfoKind::SmallInt,
176 PgType::Int4 => AnyTypeInfoKind::Integer,
177 PgType::Int8 => AnyTypeInfoKind::BigInt,
178 PgType::Float4 => AnyTypeInfoKind::Real,
179 PgType::Float8 => AnyTypeInfoKind::Double,
180 PgType::Bytea => AnyTypeInfoKind::Blob,
181 PgType::Text | PgType::Varchar => AnyTypeInfoKind::Text,
182 PgType::DeclareWithName(UStr::Static("citext")) => AnyTypeInfoKind::Text,
183 _ => {
184 return Err(sqlx_core::Error::AnyDriverError(
185 format!("Any driver does not support the Postgres type {pg_type:?}").into(),
186 ))
187 }
188 },
189 })
190 }
191}
192
193impl<'a> TryFrom<&'a PgColumn> for AnyColumn {
194 type Error = sqlx_core::Error;
195
196 fn try_from(col: &'a PgColumn) -> Result<Self, Self::Error> {
197 let type_info =
198 AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
199 index: col.name.to_string(),
200 source: e.into(),
201 })?;
202
203 Ok(AnyColumn {
204 ordinal: col.ordinal,
205 name: col.name.clone(),
206 type_info,
207 })
208 }
209}
210
211impl<'a> TryFrom<&'a PgRow> for AnyRow {
212 type Error = sqlx_core::Error;
213
214 fn try_from(row: &'a PgRow) -> Result<Self, Self::Error> {
215 AnyRow::map_from(row, row.metadata.column_names.clone())
216 }
217}
218
219impl<'a> TryFrom<&'a AnyConnectOptions> for PgConnectOptions {
220 type Error = sqlx_core::Error;
221
222 fn try_from(value: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
223 let mut opts = PgConnectOptions::parse_from_url(&value.database_url)?;
224 opts.log_settings = value.log_settings.clone();
225 Ok(opts)
226 }
227}
228
229fn map_result(res: PgQueryResult) -> AnyQueryResult {
230 AnyQueryResult {
231 rows_affected: res.rows_affected(),
232 last_insert_id: None,
233 }
234}