iota_indexer/
db.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::time::Duration;
6
7use anyhow::anyhow;
8use diesel::{
9    PgConnection,
10    connection::BoxableConnection,
11    query_dsl::RunQueryDsl,
12    r2d2::{ConnectionManager, Pool, PooledConnection, R2D2Connection},
13};
14
15use crate::errors::IndexerError;
16
17pub type ConnectionPool = Pool<ConnectionManager<PgConnection>>;
18pub type PoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
19
20#[derive(Debug, Clone, Copy)]
21pub struct ConnectionPoolConfig {
22    pub pool_size: u32,
23    pub connection_timeout: Duration,
24    pub statement_timeout: Duration,
25}
26
27impl ConnectionPoolConfig {
28    const DEFAULT_POOL_SIZE: u32 = 100;
29    const DEFAULT_CONNECTION_TIMEOUT: u64 = 3600;
30    const DEFAULT_STATEMENT_TIMEOUT: u64 = 3600;
31
32    fn connection_config(&self) -> ConnectionConfig {
33        ConnectionConfig {
34            statement_timeout: self.statement_timeout,
35            read_only: false,
36        }
37    }
38
39    pub fn set_pool_size(&mut self, size: u32) {
40        self.pool_size = size;
41    }
42
43    pub fn set_connection_timeout(&mut self, timeout: Duration) {
44        self.connection_timeout = timeout;
45    }
46
47    pub fn set_statement_timeout(&mut self, timeout: Duration) {
48        self.statement_timeout = timeout;
49    }
50}
51
52impl Default for ConnectionPoolConfig {
53    fn default() -> Self {
54        let db_pool_size = std::env::var("DB_POOL_SIZE")
55            .ok()
56            .and_then(|s| s.parse::<u32>().ok())
57            .unwrap_or(Self::DEFAULT_POOL_SIZE);
58        let conn_timeout_secs = std::env::var("DB_CONNECTION_TIMEOUT")
59            .ok()
60            .and_then(|s| s.parse::<u64>().ok())
61            .unwrap_or(Self::DEFAULT_CONNECTION_TIMEOUT);
62        let statement_timeout_secs = std::env::var("DB_STATEMENT_TIMEOUT")
63            .ok()
64            .and_then(|s| s.parse::<u64>().ok())
65            .unwrap_or(Self::DEFAULT_STATEMENT_TIMEOUT);
66
67        Self {
68            pool_size: db_pool_size,
69            connection_timeout: Duration::from_secs(conn_timeout_secs),
70            statement_timeout: Duration::from_secs(statement_timeout_secs),
71        }
72    }
73}
74
75#[derive(Debug, Clone, Copy)]
76pub struct ConnectionConfig {
77    pub statement_timeout: Duration,
78    pub read_only: bool,
79}
80
81impl<T: R2D2Connection + 'static> diesel::r2d2::CustomizeConnection<T, diesel::r2d2::Error>
82    for ConnectionConfig
83{
84    fn on_acquire(&self, _conn: &mut T) -> std::result::Result<(), diesel::r2d2::Error> {
85        _conn
86            .as_any_mut()
87            .downcast_mut::<diesel::PgConnection>()
88            .map_or_else(
89                || {
90                    Err(diesel::r2d2::Error::QueryError(
91                        diesel::result::Error::DeserializationError(
92                            "Failed to downcast connection to PgConnection"
93                                .to_string()
94                                .into(),
95                        ),
96                    ))
97                },
98                |pg_conn| {
99                    diesel::sql_query(format!(
100                        "SET statement_timeout = {}",
101                        self.statement_timeout.as_millis(),
102                    ))
103                    .execute(pg_conn)
104                    .map_err(diesel::r2d2::Error::QueryError)?;
105
106                    if self.read_only {
107                        diesel::sql_query("SET default_transaction_read_only = 't'")
108                            .execute(pg_conn)
109                            .map_err(diesel::r2d2::Error::QueryError)?;
110                    }
111                    Ok(())
112                },
113            )?;
114        Ok(())
115    }
116}
117
118pub fn new_connection_pool(
119    db_url: &str,
120    pool_size: Option<u32>,
121) -> Result<ConnectionPool, IndexerError> {
122    let pool_config = ConnectionPoolConfig::default();
123    new_connection_pool_with_config(db_url, pool_size, pool_config)
124}
125
126pub fn new_connection_pool_with_config(
127    db_url: &str,
128    pool_size: Option<u32>,
129    pool_config: ConnectionPoolConfig,
130) -> Result<ConnectionPool, IndexerError> {
131    let manager = ConnectionManager::<PgConnection>::new(db_url);
132
133    let pool_size = pool_size.unwrap_or(pool_config.pool_size);
134    Pool::builder()
135        .max_size(pool_size)
136        .connection_timeout(pool_config.connection_timeout)
137        .connection_customizer(Box::new(pool_config.connection_config()))
138        .build(manager)
139        .map_err(|e| {
140            IndexerError::PgConnectionPoolInit(format!(
141                "Failed to initialize connection pool for {db_url} with error: {e:?}"
142            ))
143        })
144}
145
146pub fn get_pool_connection(pool: &ConnectionPool) -> Result<PoolConnection, IndexerError> {
147    pool.get().map_err(|e| {
148        IndexerError::PgPoolConnection(format!(
149            "Failed to get connection from PG connection pool with error: {:?}",
150            e
151        ))
152    })
153}
154
155pub fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
156    {
157        conn.as_any_mut()
158            .downcast_mut::<PoolConnection>()
159            .map_or_else(
160                || Err(anyhow!("Failed to downcast connection to PgConnection")),
161                |pg_conn| {
162                    setup_postgres::reset_database(pg_conn)?;
163                    Ok(())
164                },
165            )?;
166    }
167    Ok(())
168}
169
170pub mod setup_postgres {
171    use anyhow::anyhow;
172    use diesel::{RunQueryDsl, migration::MigrationSource};
173    use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
174    use prometheus::Registry;
175    use secrecy::ExposeSecret;
176    use tracing::{error, info};
177
178    use crate::{
179        IndexerConfig,
180        db::{PoolConnection, get_pool_connection, new_connection_pool},
181        errors::IndexerError,
182        indexer::Indexer,
183        metrics::IndexerMetrics,
184        store::{PgIndexerAnalyticalStore, PgIndexerStore},
185    };
186
187    const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg");
188
189    pub fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
190        info!("Resetting PG database ...");
191
192        let drop_all_tables = "
193        DO $$ DECLARE
194            r RECORD;
195        BEGIN
196        FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public')
197            LOOP
198                EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
199            END LOOP;
200        END $$;";
201        diesel::sql_query(drop_all_tables).execute(conn)?;
202        info!("Dropped all tables.");
203
204        let drop_all_procedures = "
205        DO $$ DECLARE
206            r RECORD;
207        BEGIN
208            FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
209                      FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid)
210                      WHERE ns.nspname = 'public' AND prokind = 'p')
211            LOOP
212                EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
213            END LOOP;
214        END $$;";
215        diesel::sql_query(drop_all_procedures).execute(conn)?;
216        info!("Dropped all procedures.");
217
218        let drop_all_functions = "
219        DO $$ DECLARE
220            r RECORD;
221        BEGIN
222            FOR r IN (SELECT proname, oidvectortypes(proargtypes) as argtypes
223                      FROM pg_proc INNER JOIN pg_namespace ON (pg_proc.pronamespace = pg_namespace.oid)
224                      WHERE pg_namespace.nspname = 'public' AND prokind = 'f')
225            LOOP
226                EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
227            END LOOP;
228        END $$;";
229        diesel::sql_query(drop_all_functions).execute(conn)?;
230        info!("Dropped all functions.");
231
232        diesel::sql_query(
233            "
234        CREATE TABLE IF NOT EXISTS __diesel_schema_migrations (
235            version VARCHAR(50) PRIMARY KEY,
236            run_on TIMESTAMP NOT NULL DEFAULT NOW()
237        )",
238        )
239        .execute(conn)?;
240        info!("Created __diesel_schema_migrations table.");
241
242        conn.run_migrations(&MIGRATIONS.migrations().unwrap())
243            .map_err(|e| anyhow!("Failed to run migrations {e}"))?;
244        info!("Reset database complete.");
245        Ok(())
246    }
247
248    pub async fn setup(
249        indexer_config: IndexerConfig,
250        registry: Registry,
251    ) -> Result<(), IndexerError> {
252        let db_url_secret = indexer_config.get_db_url().map_err(|e| {
253            IndexerError::PgPoolConnection(format!(
254                "Failed parsing database url with error {:?}",
255                e
256            ))
257        })?;
258        let db_url = db_url_secret.expose_secret();
259        let blocking_cp = new_connection_pool(db_url, None).map_err(|e| {
260            error!(
261                "Failed creating Postgres connection pool with error {:?}",
262                e
263            );
264            e
265        })?;
266        info!("Postgres database connection pool is created at {}", db_url);
267        let mut conn = get_pool_connection(&blocking_cp).map_err(|e| {
268            error!(
269                "Failed getting Postgres connection from connection pool with error {:?}",
270                e
271            );
272            e
273        })?;
274        if indexer_config.reset_db {
275            reset_database(&mut conn).map_err(|e| {
276                let db_err_msg = format!(
277                    "Failed resetting database with url: {:?} and error: {:?}",
278                    db_url, e
279                );
280                error!("{}", db_err_msg);
281                IndexerError::PostgresReset(db_err_msg)
282            })?;
283            info!("Reset Postgres database complete.");
284        } else {
285            conn.run_pending_migrations(MIGRATIONS)
286                .map_err(|e| anyhow!("Failed to run pending migrations {e}"))?;
287            info!("Database migrations are up to date.");
288        }
289        let indexer_metrics = IndexerMetrics::new(&registry);
290        iota_metrics::init_metrics(&registry);
291
292        let report_cp = blocking_cp.clone();
293        let report_metrics = indexer_metrics.clone();
294        tokio::spawn(async move {
295            loop {
296                let cp_state = report_cp.state();
297                info!(
298                    "DB connection pool size: {}, with idle conn: {}.",
299                    cp_state.connections, cp_state.idle_connections
300                );
301                report_metrics
302                    .db_conn_pool_size
303                    .set(cp_state.connections as i64);
304                report_metrics
305                    .idle_db_conn
306                    .set(cp_state.idle_connections as i64);
307                tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
308            }
309        });
310        if indexer_config.fullnode_sync_worker {
311            let store = PgIndexerStore::new(blocking_cp, indexer_metrics.clone());
312            return Indexer::start_writer(&indexer_config, store, indexer_metrics).await;
313        } else if indexer_config.rpc_server_worker {
314            return Indexer::start_reader(&indexer_config, &registry, db_url.to_string()).await;
315        } else if indexer_config.analytical_worker {
316            let store = PgIndexerAnalyticalStore::new(blocking_cp);
317            return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await;
318        }
319        Ok(())
320    }
321}