1use std::time::Duration;
6
7use anyhow::anyhow;
8use diesel::{
9 PgConnection,
10 connection::BoxableConnection,
11 query_dsl::RunQueryDsl,
12 r2d2::{ConnectionManager, Pool, PooledConnection, R2D2Connection},
13};
14
15use crate::errors::IndexerError;
16
17pub type ConnectionPool = Pool<ConnectionManager<PgConnection>>;
18pub type PoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
19
20#[derive(Debug, Clone, Copy)]
21pub struct ConnectionPoolConfig {
22 pub pool_size: u32,
23 pub connection_timeout: Duration,
24 pub statement_timeout: Duration,
25}
26
27impl ConnectionPoolConfig {
28 const DEFAULT_POOL_SIZE: u32 = 100;
29 const DEFAULT_CONNECTION_TIMEOUT: u64 = 3600;
30 const DEFAULT_STATEMENT_TIMEOUT: u64 = 3600;
31
32 fn connection_config(&self) -> ConnectionConfig {
33 ConnectionConfig {
34 statement_timeout: self.statement_timeout,
35 read_only: false,
36 }
37 }
38
39 pub fn set_pool_size(&mut self, size: u32) {
40 self.pool_size = size;
41 }
42
43 pub fn set_connection_timeout(&mut self, timeout: Duration) {
44 self.connection_timeout = timeout;
45 }
46
47 pub fn set_statement_timeout(&mut self, timeout: Duration) {
48 self.statement_timeout = timeout;
49 }
50}
51
52impl Default for ConnectionPoolConfig {
53 fn default() -> Self {
54 let db_pool_size = std::env::var("DB_POOL_SIZE")
55 .ok()
56 .and_then(|s| s.parse::<u32>().ok())
57 .unwrap_or(Self::DEFAULT_POOL_SIZE);
58 let conn_timeout_secs = std::env::var("DB_CONNECTION_TIMEOUT")
59 .ok()
60 .and_then(|s| s.parse::<u64>().ok())
61 .unwrap_or(Self::DEFAULT_CONNECTION_TIMEOUT);
62 let statement_timeout_secs = std::env::var("DB_STATEMENT_TIMEOUT")
63 .ok()
64 .and_then(|s| s.parse::<u64>().ok())
65 .unwrap_or(Self::DEFAULT_STATEMENT_TIMEOUT);
66
67 Self {
68 pool_size: db_pool_size,
69 connection_timeout: Duration::from_secs(conn_timeout_secs),
70 statement_timeout: Duration::from_secs(statement_timeout_secs),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Copy)]
76pub struct ConnectionConfig {
77 pub statement_timeout: Duration,
78 pub read_only: bool,
79}
80
81impl<T: R2D2Connection + 'static> diesel::r2d2::CustomizeConnection<T, diesel::r2d2::Error>
82 for ConnectionConfig
83{
84 fn on_acquire(&self, _conn: &mut T) -> std::result::Result<(), diesel::r2d2::Error> {
85 _conn
86 .as_any_mut()
87 .downcast_mut::<diesel::PgConnection>()
88 .map_or_else(
89 || {
90 Err(diesel::r2d2::Error::QueryError(
91 diesel::result::Error::DeserializationError(
92 "Failed to downcast connection to PgConnection"
93 .to_string()
94 .into(),
95 ),
96 ))
97 },
98 |pg_conn| {
99 diesel::sql_query(format!(
100 "SET statement_timeout = {}",
101 self.statement_timeout.as_millis(),
102 ))
103 .execute(pg_conn)
104 .map_err(diesel::r2d2::Error::QueryError)?;
105
106 if self.read_only {
107 diesel::sql_query("SET default_transaction_read_only = 't'")
108 .execute(pg_conn)
109 .map_err(diesel::r2d2::Error::QueryError)?;
110 }
111 Ok(())
112 },
113 )?;
114 Ok(())
115 }
116}
117
118pub fn new_connection_pool(
119 db_url: &str,
120 pool_size: Option<u32>,
121) -> Result<ConnectionPool, IndexerError> {
122 let pool_config = ConnectionPoolConfig::default();
123 new_connection_pool_with_config(db_url, pool_size, pool_config)
124}
125
126pub fn new_connection_pool_with_config(
127 db_url: &str,
128 pool_size: Option<u32>,
129 pool_config: ConnectionPoolConfig,
130) -> Result<ConnectionPool, IndexerError> {
131 let manager = ConnectionManager::<PgConnection>::new(db_url);
132
133 let pool_size = pool_size.unwrap_or(pool_config.pool_size);
134 Pool::builder()
135 .max_size(pool_size)
136 .connection_timeout(pool_config.connection_timeout)
137 .connection_customizer(Box::new(pool_config.connection_config()))
138 .build(manager)
139 .map_err(|e| {
140 IndexerError::PgConnectionPoolInit(format!(
141 "Failed to initialize connection pool for {db_url} with error: {e:?}"
142 ))
143 })
144}
145
146pub fn get_pool_connection(pool: &ConnectionPool) -> Result<PoolConnection, IndexerError> {
147 pool.get().map_err(|e| {
148 IndexerError::PgPoolConnection(format!(
149 "Failed to get connection from PG connection pool with error: {:?}",
150 e
151 ))
152 })
153}
154
155pub fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
156 {
157 conn.as_any_mut()
158 .downcast_mut::<PoolConnection>()
159 .map_or_else(
160 || Err(anyhow!("Failed to downcast connection to PgConnection")),
161 |pg_conn| {
162 setup_postgres::reset_database(pg_conn)?;
163 Ok(())
164 },
165 )?;
166 }
167 Ok(())
168}
169
170pub mod setup_postgres {
171 use anyhow::anyhow;
172 use diesel::{RunQueryDsl, migration::MigrationSource};
173 use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
174 use prometheus::Registry;
175 use secrecy::ExposeSecret;
176 use tracing::{error, info};
177
178 use crate::{
179 IndexerConfig,
180 db::{PoolConnection, get_pool_connection, new_connection_pool},
181 errors::IndexerError,
182 indexer::Indexer,
183 metrics::IndexerMetrics,
184 store::{PgIndexerAnalyticalStore, PgIndexerStore},
185 };
186
187 const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg");
188
189 pub fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
190 info!("Resetting PG database ...");
191
192 let drop_all_tables = "
193 DO $$ DECLARE
194 r RECORD;
195 BEGIN
196 FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public')
197 LOOP
198 EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
199 END LOOP;
200 END $$;";
201 diesel::sql_query(drop_all_tables).execute(conn)?;
202 info!("Dropped all tables.");
203
204 let drop_all_procedures = "
205 DO $$ DECLARE
206 r RECORD;
207 BEGIN
208 FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
209 FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
210 WHERE ns.nspname = 'public' AND prokind = 'p')
211 LOOP
212 EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
213 END LOOP;
214 END $$;";
215 diesel::sql_query(drop_all_procedures).execute(conn)?;
216 info!("Dropped all procedures.");
217
218 let drop_all_functions = "
219 DO $$ DECLARE
220 r RECORD;
221 BEGIN
222 FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
223 FROM pg_proc INNER JOIN pg_namespace ON (pg_proc.pronamespace = pg_namespace.oid)
224 WHERE pg_namespace.nspname = 'public' AND prokind = 'f')
225 LOOP
226 EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
227 END LOOP;
228 END $$;";
229 diesel::sql_query(drop_all_functions).execute(conn)?;
230 info!("Dropped all functions.");
231
232 diesel::sql_query(
233 "
234 CREATE TABLE IF NOT EXISTS __diesel_schema_migrations (
235 version VARCHAR(50) PRIMARY KEY,
236 run_on TIMESTAMP NOT NULL DEFAULT NOW()
237 )",
238 )
239 .execute(conn)?;
240 info!("Created __diesel_schema_migrations table.");
241
242 conn.run_migrations(&MIGRATIONS.migrations().unwrap())
243 .map_err(|e| anyhow!("Failed to run migrations {e}"))?;
244 info!("Reset database complete.");
245 Ok(())
246 }
247
248 pub async fn setup(
249 indexer_config: IndexerConfig,
250 registry: Registry,
251 ) -> Result<(), IndexerError> {
252 let db_url_secret = indexer_config.get_db_url().map_err(|e| {
253 IndexerError::PgPoolConnection(format!(
254 "Failed parsing database url with error {:?}",
255 e
256 ))
257 })?;
258 let db_url = db_url_secret.expose_secret();
259 let blocking_cp = new_connection_pool(db_url, None).map_err(|e| {
260 error!(
261 "Failed creating Postgres connection pool with error {:?}",
262 e
263 );
264 e
265 })?;
266 info!("Postgres database connection pool is created at {}", db_url);
267 let mut conn = get_pool_connection(&blocking_cp).map_err(|e| {
268 error!(
269 "Failed getting Postgres connection from connection pool with error {:?}",
270 e
271 );
272 e
273 })?;
274 if indexer_config.reset_db {
275 reset_database(&mut conn).map_err(|e| {
276 let db_err_msg = format!(
277 "Failed resetting database with url: {:?} and error: {:?}",
278 db_url, e
279 );
280 error!("{}", db_err_msg);
281 IndexerError::PostgresReset(db_err_msg)
282 })?;
283 info!("Reset Postgres database complete.");
284 } else {
285 conn.run_pending_migrations(MIGRATIONS)
286 .map_err(|e| anyhow!("Failed to run pending migrations {e}"))?;
287 info!("Database migrations are up to date.");
288 }
289 let indexer_metrics = IndexerMetrics::new(®istry);
290 iota_metrics::init_metrics(®istry);
291
292 let report_cp = blocking_cp.clone();
293 let report_metrics = indexer_metrics.clone();
294 tokio::spawn(async move {
295 loop {
296 let cp_state = report_cp.state();
297 info!(
298 "DB connection pool size: {}, with idle conn: {}.",
299 cp_state.connections, cp_state.idle_connections
300 );
301 report_metrics
302 .db_conn_pool_size
303 .set(cp_state.connections as i64);
304 report_metrics
305 .idle_db_conn
306 .set(cp_state.idle_connections as i64);
307 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
308 }
309 });
310 if indexer_config.fullnode_sync_worker {
311 let store = PgIndexerStore::new(blocking_cp, indexer_metrics.clone());
312 return Indexer::start_writer(&indexer_config, store, indexer_metrics).await;
313 } else if indexer_config.rpc_server_worker {
314 return Indexer::start_reader(&indexer_config, ®istry, db_url.to_string()).await;
315 } else if indexer_config.analytical_worker {
316 let store = PgIndexerAnalyticalStore::new(blocking_cp);
317 return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await;
318 }
319 Ok(())
320 }
321}