iota_indexer/store/
pg_partition_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    time::Duration,
8};
9
10use diesel::{
11    QueryableByName, RunQueryDsl,
12    sql_types::{BigInt, VarChar},
13};
14use downcast::Any;
15use tracing::{error, info};
16
17use crate::{
18    db::ConnectionPool, errors::IndexerError, handlers::EpochToCommit,
19    models::epoch::StoredEpochInfo, store::diesel_macro::*,
20};
21
22const GET_PARTITION_SQL: &str = {
23    r"
24SELECT parent.relname                                            AS table_name,
25       MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
26       MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
27FROM pg_inherits
28         JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
29         JOIN pg_class child ON pg_inherits.inhrelid = child.oid
30         JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
31         JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
32WHERE parent.relkind = 'p'
33GROUP BY table_name;
34"
35};
36
37pub struct PgPartitionManager {
38    cp: ConnectionPool,
39    partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
40}
41
42impl Clone for PgPartitionManager {
43    fn clone(&self) -> PgPartitionManager {
44        Self {
45            cp: self.cp.clone(),
46            partition_strategies: self.partition_strategies.clone(),
47        }
48    }
49}
50
51#[derive(Clone, Copy)]
52pub enum PgPartitionStrategy {
53    CheckpointSequenceNumber,
54    TxSequenceNumber,
55    ObjectId,
56}
57
58impl PgPartitionStrategy {
59    pub fn is_epoch_partitioned(&self) -> bool {
60        matches!(
61            self,
62            Self::CheckpointSequenceNumber | Self::TxSequenceNumber
63        )
64    }
65}
66
67#[derive(Clone, Debug)]
68pub struct EpochPartitionData {
69    last_epoch: u64,
70    next_epoch: u64,
71    last_epoch_start_cp: u64,
72    next_epoch_start_cp: u64,
73    last_epoch_start_tx: u64,
74    next_epoch_start_tx: u64,
75}
76
77impl EpochPartitionData {
78    pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
79        let last_epoch = last_db_epoch.epoch as u64;
80        let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
81        let next_epoch = epoch.new_epoch.epoch as u64;
82        let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id as u64;
83
84        let next_epoch_start_tx = epoch.new_epoch.first_tx_sequence_number as u64;
85        let last_epoch_start_tx = last_db_epoch.first_tx_sequence_number as u64;
86
87        Self {
88            last_epoch,
89            next_epoch,
90            last_epoch_start_cp,
91            next_epoch_start_cp,
92            last_epoch_start_tx,
93            next_epoch_start_tx,
94        }
95    }
96}
97
98impl PgPartitionManager {
99    pub fn new(cp: ConnectionPool) -> Result<Self, IndexerError> {
100        let mut partition_strategies = HashMap::new();
101        partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
102        partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
103        partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
104        let manager = Self {
105            cp,
106            partition_strategies,
107        };
108        let tables = manager.get_table_partitions()?;
109        info!(
110            "Found {} tables with partitions : [{:?}]",
111            tables.len(),
112            tables
113        );
114        Ok(manager)
115    }
116
117    pub fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
118        #[derive(QueryableByName, Debug, Clone)]
119        struct PartitionedTable {
120            #[diesel(sql_type = VarChar)]
121            table_name: String,
122            #[diesel(sql_type = BigInt)]
123            first_partition: i64,
124            #[diesel(sql_type = BigInt)]
125            last_partition: i64,
126        }
127
128        Ok(
129            read_only_blocking!(&self.cp, |conn| diesel::RunQueryDsl::load(
130                diesel::sql_query(GET_PARTITION_SQL),
131                conn
132            ))?
133            .into_iter()
134            .map(|table: PartitionedTable| {
135                (
136                    table.table_name,
137                    (table.first_partition as u64, table.last_partition as u64),
138                )
139            })
140            .collect(),
141        )
142    }
143
144    /// Tries to fetch the partitioning strategy for the given partitioned
145    /// table. Defaults to `CheckpointSequenceNumber` as the majority of our
146    /// tables are partitioned on an epoch's checkpoints today.
147    pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
148        self.partition_strategies
149            .get(table_name)
150            .copied()
151            .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
152    }
153
154    pub fn determine_epoch_partition_range(
155        &self,
156        table_name: &str,
157        data: &EpochPartitionData,
158    ) -> Option<(u64, u64)> {
159        match self.get_strategy(table_name) {
160            PgPartitionStrategy::CheckpointSequenceNumber => {
161                Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
162            }
163            PgPartitionStrategy::TxSequenceNumber => {
164                Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
165            }
166            PgPartitionStrategy::ObjectId => None,
167        }
168    }
169
170    pub fn advance_epoch(
171        &self,
172        table: String,
173        last_partition: u64,
174        data: &EpochPartitionData,
175    ) -> Result<(), IndexerError> {
176        let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
177            return Ok(());
178        };
179        if data.next_epoch == 0 {
180            tracing::info!("Epoch 0 partition has been created in the initial setup.");
181            return Ok(());
182        }
183        if last_partition == data.last_epoch {
184            transactional_blocking_with_retry!(
185                &self.cp,
186                |conn| {
187                    RunQueryDsl::execute(
188                        diesel::sql_query("CALL advance_partition($1, $2, $3, $4, $5)")
189                            .bind::<diesel::sql_types::Text, _>(table.clone())
190                            .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
191                            .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
192                            .bind::<diesel::sql_types::BigInt, _>(partition_range.0 as i64)
193                            .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
194                        conn,
195                    )
196                },
197                Duration::from_secs(10)
198            )?;
199
200            info!(
201                "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
202                table, last_partition, data.next_epoch, partition_range.0
203            );
204        } else if last_partition != data.next_epoch {
205            // skip when the partition is already advanced once, which is possible when
206            // indexer crashes and restarts; error otherwise.
207            error!(
208                "Epoch partition for table {} is not in sync with the last epoch {}.",
209                table, data.last_epoch
210            );
211        } else {
212            info!(
213                "Epoch has been advanced to {} already, skipping.",
214                data.next_epoch
215            );
216        }
217        Ok(())
218    }
219
220    pub fn drop_table_partition(&self, table: String, partition: u64) -> Result<(), IndexerError> {
221        transactional_blocking_with_retry!(
222            &self.cp,
223            |conn| {
224                RunQueryDsl::execute(
225                    diesel::sql_query("CALL drop_partition($1, $2)")
226                        .bind::<diesel::sql_types::Text, _>(table.clone())
227                        .bind::<diesel::sql_types::BigInt, _>(partition as i64),
228                    conn,
229                )
230            },
231            Duration::from_secs(10)
232        )?;
233        Ok(())
234    }
235}