iota_indexer/store/
pg_partition_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    time::Duration,
8};
9
10use diesel::{
11    QueryableByName, RunQueryDsl,
12    sql_types::{BigInt, VarChar},
13};
14use downcast::Any;
15use tracing::{error, info};
16
17use crate::{
18    db::ConnectionPool, errors::IndexerError, handlers::EpochToCommit,
19    models::epoch::StoredEpochInfo, store::diesel_macro::*,
20};
21
22const GET_PARTITION_SQL: &str = {
23    r"
24SELECT parent.relname                                            AS table_name,
25       MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
26       MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
27FROM pg_inherits
28         JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
29         JOIN pg_class child ON pg_inherits.inhrelid = child.oid
30         JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
31         JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
32WHERE parent.relkind = 'p'
33GROUP BY table_name;
34"
35};
36
37pub struct PgPartitionManager {
38    cp: ConnectionPool,
39    partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
40}
41
42impl Clone for PgPartitionManager {
43    fn clone(&self) -> PgPartitionManager {
44        Self {
45            cp: self.cp.clone(),
46            partition_strategies: self.partition_strategies.clone(),
47        }
48    }
49}
50
51#[derive(Clone, Copy)]
52pub enum PgPartitionStrategy {
53    CheckpointSequenceNumber,
54    TxSequenceNumber,
55    ObjectId,
56}
57
58impl PgPartitionStrategy {
59    pub fn is_epoch_partitioned(&self) -> bool {
60        matches!(
61            self,
62            Self::CheckpointSequenceNumber | Self::TxSequenceNumber
63        )
64    }
65}
66
67#[derive(Clone, Debug)]
68pub struct EpochPartitionData {
69    last_epoch: u64,
70    next_epoch: u64,
71    last_epoch_start_cp: u64,
72    next_epoch_start_cp: u64,
73    last_epoch_start_tx: u64,
74    next_epoch_start_tx: u64,
75}
76
77impl EpochPartitionData {
78    pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
79        let last_epoch = last_db_epoch.epoch as u64;
80        let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
81        let next_epoch = epoch.new_epoch.epoch;
82        let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id;
83
84        // Determining the tx_sequence_number range for the epoch partition differs from
85        // the checkpoint_sequence_number range, because the former is a sum of
86        // total transactions - this sum already addresses the off-by-one.
87        let next_epoch_start_tx = epoch.network_total_transactions;
88        let last_epoch_start_tx =
89            next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64;
90
91        Self {
92            last_epoch,
93            next_epoch,
94            last_epoch_start_cp,
95            next_epoch_start_cp,
96            last_epoch_start_tx,
97            next_epoch_start_tx,
98        }
99    }
100}
101
102impl PgPartitionManager {
103    pub fn new(cp: ConnectionPool) -> Result<Self, IndexerError> {
104        let mut partition_strategies = HashMap::new();
105        partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
106        partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
107        partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
108        let manager = Self {
109            cp,
110            partition_strategies,
111        };
112        let tables = manager.get_table_partitions()?;
113        info!(
114            "Found {} tables with partitions : [{:?}]",
115            tables.len(),
116            tables
117        );
118        Ok(manager)
119    }
120
121    pub fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
122        #[derive(QueryableByName, Debug, Clone)]
123        struct PartitionedTable {
124            #[diesel(sql_type = VarChar)]
125            table_name: String,
126            #[diesel(sql_type = BigInt)]
127            first_partition: i64,
128            #[diesel(sql_type = BigInt)]
129            last_partition: i64,
130        }
131
132        Ok(
133            read_only_blocking!(&self.cp, |conn| diesel::RunQueryDsl::load(
134                diesel::sql_query(GET_PARTITION_SQL),
135                conn
136            ))?
137            .into_iter()
138            .map(|table: PartitionedTable| {
139                (
140                    table.table_name,
141                    (table.first_partition as u64, table.last_partition as u64),
142                )
143            })
144            .collect(),
145        )
146    }
147
148    /// Tries to fetch the partitioning strategy for the given partitioned
149    /// table. Defaults to `CheckpointSequenceNumber` as the majority of our
150    /// tables are partitioned on an epoch's checkpoints today.
151    pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
152        self.partition_strategies
153            .get(table_name)
154            .copied()
155            .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
156    }
157
158    pub fn determine_epoch_partition_range(
159        &self,
160        table_name: &str,
161        data: &EpochPartitionData,
162    ) -> Option<(u64, u64)> {
163        match self.get_strategy(table_name) {
164            PgPartitionStrategy::CheckpointSequenceNumber => {
165                Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
166            }
167            PgPartitionStrategy::TxSequenceNumber => {
168                Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
169            }
170            PgPartitionStrategy::ObjectId => None,
171        }
172    }
173
174    pub fn advance_epoch(
175        &self,
176        table: String,
177        last_partition: u64,
178        data: &EpochPartitionData,
179    ) -> Result<(), IndexerError> {
180        let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
181            return Ok(());
182        };
183        if data.next_epoch == 0 {
184            tracing::info!("Epoch 0 partition has been created in the initial setup.");
185            return Ok(());
186        }
187        if last_partition == data.last_epoch {
188            transactional_blocking_with_retry!(
189                &self.cp,
190                |conn| {
191                    RunQueryDsl::execute(
192                        diesel::sql_query("CALL advance_partition($1, $2, $3, $4, $5)")
193                            .bind::<diesel::sql_types::Text, _>(table.clone())
194                            .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
195                            .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
196                            .bind::<diesel::sql_types::BigInt, _>(partition_range.0 as i64)
197                            .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
198                        conn,
199                    )
200                },
201                Duration::from_secs(10)
202            )?;
203
204            info!(
205                "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
206                table, last_partition, data.next_epoch, partition_range.0
207            );
208        } else if last_partition != data.next_epoch {
209            // skip when the partition is already advanced once, which is possible when
210            // indexer crashes and restarts; error otherwise.
211            error!(
212                "Epoch partition for table {} is not in sync with the last epoch {}.",
213                table, data.last_epoch
214            );
215        } else {
216            info!(
217                "Epoch has been advanced to {} already, skipping.",
218                data.next_epoch
219            );
220        }
221        Ok(())
222    }
223
224    pub fn drop_table_partition(&self, table: String, partition: u64) -> Result<(), IndexerError> {
225        transactional_blocking_with_retry!(
226            &self.cp,
227            |conn| {
228                RunQueryDsl::execute(
229                    diesel::sql_query("CALL drop_partition($1, $2)")
230                        .bind::<diesel::sql_types::Text, _>(table.clone())
231                        .bind::<diesel::sql_types::BigInt, _>(partition as i64),
232                    conn,
233                )
234            },
235            Duration::from_secs(10)
236        )?;
237        Ok(())
238    }
239}