iota_bridge_indexer/
postgres_manager.rs1use diesel::{
6 BoolExpressionMethods, Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl,
7 SelectableHelper,
8 pg::PgConnection,
9 r2d2::{ConnectionManager, Pool},
10 result::Error,
11};
12use iota_types::digests::TransactionDigest;
13
14use crate::{
15 ProcessedTxnData,
16 models::{IotaProgressStore, TokenTransfer as DBTokenTransfer},
17 schema,
18 schema::{
19 iota_error_transactions, iota_progress_store::txn_digest, token_transfer,
20 token_transfer_data,
21 },
22};
23
24pub(crate) type PgPool = Pool<ConnectionManager<PgConnection>>;
25
26const IOTA_PROGRESS_STORE_DUMMY_KEY: i32 = 1;
27
28pub fn get_connection_pool(database_url: String) -> PgPool {
29 let manager = ConnectionManager::<PgConnection>::new(database_url);
30 Pool::builder()
31 .test_on_check_out(true)
32 .build(manager)
33 .expect("Could not build Postgres DB connection pool")
34}
35
36pub fn write(pool: &PgPool, token_txns: Vec<ProcessedTxnData>) -> Result<(), anyhow::Error> {
38 if token_txns.is_empty() {
39 return Ok(());
40 }
41 let (transfers, data, errors) = token_txns.iter().fold(
42 (vec![], vec![], vec![]),
43 |(mut transfers, mut data, mut errors), d| {
44 match d {
45 ProcessedTxnData::TokenTransfer(t) => {
46 transfers.push(t.to_db());
47 if let Some(d) = t.to_data_maybe() {
48 data.push(d)
49 }
50 }
51 ProcessedTxnData::Error(e) => errors.push(e.to_db()),
52 }
53 (transfers, data, errors)
54 },
55 );
56
57 let connection = &mut pool.get()?;
58 connection.transaction(|conn| {
59 diesel::insert_into(token_transfer_data::table)
60 .values(&data)
61 .on_conflict_do_nothing()
62 .execute(conn)?;
63 diesel::insert_into(token_transfer::table)
64 .values(&transfers)
65 .on_conflict_do_nothing()
66 .execute(conn)?;
67 diesel::insert_into(iota_error_transactions::table)
68 .values(&errors)
69 .on_conflict_do_nothing()
70 .execute(conn)
71 })?;
72 Ok(())
73}
74
75pub fn update_iota_progress_store(
76 pool: &PgPool,
77 tx_digest: TransactionDigest,
78) -> Result<(), anyhow::Error> {
79 let mut conn = pool.get()?;
80 diesel::insert_into(schema::iota_progress_store::table)
81 .values(&IotaProgressStore {
82 id: IOTA_PROGRESS_STORE_DUMMY_KEY,
83 txn_digest: tx_digest.inner().to_vec(),
84 })
85 .on_conflict(schema::iota_progress_store::dsl::id)
86 .do_update()
87 .set(txn_digest.eq(tx_digest.inner().to_vec()))
88 .execute(&mut conn)?;
89 Ok(())
90}
91
92pub fn read_iota_progress_store(pool: &PgPool) -> anyhow::Result<Option<TransactionDigest>> {
93 let mut conn = pool.get()?;
94 let val: Option<IotaProgressStore> =
95 crate::schema::iota_progress_store::dsl::iota_progress_store
96 .select(IotaProgressStore::as_select())
97 .first(&mut conn)
98 .optional()?;
99 match val {
100 Some(val) => Ok(Some(TransactionDigest::try_from(
101 val.txn_digest.as_slice(),
102 )?)),
103 None => Ok(None),
104 }
105}
106
107pub fn get_latest_eth_token_transfer(
108 pool: &PgPool,
109 finalized: bool,
110) -> Result<Option<DBTokenTransfer>, Error> {
111 use crate::schema::token_transfer::dsl::*;
112
113 let connection = &mut pool.get().unwrap();
114
115 if finalized {
116 token_transfer
117 .filter(data_source.eq("ETH").and(status.eq("Deposited")))
118 .order(block_height.desc())
119 .first::<DBTokenTransfer>(connection)
120 .optional()
121 } else {
122 token_transfer
123 .filter(status.eq("DepositedUnfinalized"))
124 .order(block_height.desc())
125 .first::<DBTokenTransfer>(connection)
126 .optional()
127 }
128}