iota_bridge_indexer/
postgres_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use diesel::{
6    BoolExpressionMethods, Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
7    SelectableHelper,
8    pg::PgConnection,
9    r2d2::{ConnectionManager, Pool},
10    result::Error,
11};
12use iota_types::digests::TransactionDigest;
13
14use crate::{
15    ProcessedTxnData,
16    models::{IotaProgressStore, TokenTransfer as DBTokenTransfer},
17    schema,
18    schema::{
19        iota_error_transactions, iota_progress_store::txn_digest, token_transfer,
20        token_transfer_data,
21    },
22};
23
24pub(crate) type PgPool = Pool<ConnectionManager<PgConnection>>;
25
26const IOTA_PROGRESS_STORE_DUMMY_KEY: i32 = 1;
27
28pub fn get_connection_pool(database_url: String) -> PgPool {
29    let manager = ConnectionManager::<PgConnection>::new(database_url);
30    Pool::builder()
31        .test_on_check_out(true)
32        .build(manager)
33        .expect("Could not build Postgres DB connection pool")
34}
35
36// TODO: add retry logic
37pub fn write(pool: &PgPool, token_txns: Vec<ProcessedTxnData>) -> Result<(), anyhow::Error> {
38    if token_txns.is_empty() {
39        return Ok(());
40    }
41    let (transfers, data, errors) = token_txns.iter().fold(
42        (vec![], vec![], vec![]),
43        |(mut transfers, mut data, mut errors), d| {
44            match d {
45                ProcessedTxnData::TokenTransfer(t) => {
46                    transfers.push(t.to_db());
47                    if let Some(d) = t.to_data_maybe() {
48                        data.push(d)
49                    }
50                }
51                ProcessedTxnData::Error(e) => errors.push(e.to_db()),
52            }
53            (transfers, data, errors)
54        },
55    );
56
57    let connection = &mut pool.get()?;
58    connection.transaction(|conn| {
59        diesel::insert_into(token_transfer_data::table)
60            .values(&data)
61            .on_conflict_do_nothing()
62            .execute(conn)?;
63        diesel::insert_into(token_transfer::table)
64            .values(&transfers)
65            .on_conflict_do_nothing()
66            .execute(conn)?;
67        diesel::insert_into(iota_error_transactions::table)
68            .values(&errors)
69            .on_conflict_do_nothing()
70            .execute(conn)
71    })?;
72    Ok(())
73}
74
75pub fn update_iota_progress_store(
76    pool: &PgPool,
77    tx_digest: TransactionDigest,
78) -> Result<(), anyhow::Error> {
79    let mut conn = pool.get()?;
80    diesel::insert_into(schema::iota_progress_store::table)
81        .values(&IotaProgressStore {
82            id: IOTA_PROGRESS_STORE_DUMMY_KEY,
83            txn_digest: tx_digest.inner().to_vec(),
84        })
85        .on_conflict(schema::iota_progress_store::dsl::id)
86        .do_update()
87        .set(txn_digest.eq(tx_digest.inner().to_vec()))
88        .execute(&mut conn)?;
89    Ok(())
90}
91
92pub fn read_iota_progress_store(pool: &PgPool) -> anyhow::Result<Option<TransactionDigest>> {
93    let mut conn = pool.get()?;
94    let val: Option<IotaProgressStore> =
95        crate::schema::iota_progress_store::dsl::iota_progress_store
96            .select(IotaProgressStore::as_select())
97            .first(&mut conn)
98            .optional()?;
99    match val {
100        Some(val) => Ok(Some(TransactionDigest::try_from(
101            val.txn_digest.as_slice(),
102        )?)),
103        None => Ok(None),
104    }
105}
106
107pub fn get_latest_eth_token_transfer(
108    pool: &PgPool,
109    finalized: bool,
110) -> Result<Option<DBTokenTransfer>, Error> {
111    use crate::schema::token_transfer::dsl::*;
112
113    let connection = &mut pool.get().unwrap();
114
115    if finalized {
116        token_transfer
117            .filter(data_source.eq("ETH").and(status.eq("Deposited")))
118            .order(block_height.desc())
119            .first::<DBTokenTransfer>(connection)
120            .optional()
121    } else {
122        token_transfer
123            .filter(status.eq("DepositedUnfinalized"))
124            .order(block_height.desc())
125            .first::<DBTokenTransfer>(connection)
126            .optional()
127    }
128}