1use std::{
6 collections::{BTreeMap, HashMap},
7 time::Duration,
8};
9
10use diesel::{
11 QueryableByName, RunQueryDsl,
12 sql_types::{BigInt, VarChar},
13};
14use downcast::Any;
15use tracing::{error, info};
16
17use crate::{
18 db::ConnectionPool, errors::IndexerError, handlers::EpochToCommit,
19 models::epoch::StoredEpochInfo, store::diesel_macro::*,
20};
21
22const GET_PARTITION_SQL: &str = {
23 r"
24SELECT parent.relname AS table_name,
25 MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
26 MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
27FROM pg_inherits
28 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
29 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
30 JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
31 JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
32WHERE parent.relkind = 'p'
33GROUP BY table_name;
34"
35};
36
37pub struct PgPartitionManager {
38 cp: ConnectionPool,
39 partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
40}
41
42impl Clone for PgPartitionManager {
43 fn clone(&self) -> PgPartitionManager {
44 Self {
45 cp: self.cp.clone(),
46 partition_strategies: self.partition_strategies.clone(),
47 }
48 }
49}
50
51#[derive(Clone, Copy)]
52pub enum PgPartitionStrategy {
53 CheckpointSequenceNumber,
54 TxSequenceNumber,
55 ObjectId,
56}
57
58impl PgPartitionStrategy {
59 pub fn is_epoch_partitioned(&self) -> bool {
60 matches!(
61 self,
62 Self::CheckpointSequenceNumber | Self::TxSequenceNumber
63 )
64 }
65}
66
67#[derive(Clone, Debug)]
68pub struct EpochPartitionData {
69 last_epoch: u64,
70 next_epoch: u64,
71 last_epoch_start_cp: u64,
72 next_epoch_start_cp: u64,
73 last_epoch_start_tx: u64,
74 next_epoch_start_tx: u64,
75}
76
77impl EpochPartitionData {
78 pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
79 let last_epoch = last_db_epoch.epoch as u64;
80 let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
81 let next_epoch = epoch.new_epoch.epoch;
82 let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id;
83
84 let next_epoch_start_tx = epoch.network_total_transactions;
88 let last_epoch_start_tx =
89 next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64;
90
91 Self {
92 last_epoch,
93 next_epoch,
94 last_epoch_start_cp,
95 next_epoch_start_cp,
96 last_epoch_start_tx,
97 next_epoch_start_tx,
98 }
99 }
100}
101
102impl PgPartitionManager {
103 pub fn new(cp: ConnectionPool) -> Result<Self, IndexerError> {
104 let mut partition_strategies = HashMap::new();
105 partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
106 partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
107 partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
108 let manager = Self {
109 cp,
110 partition_strategies,
111 };
112 let tables = manager.get_table_partitions()?;
113 info!(
114 "Found {} tables with partitions : [{:?}]",
115 tables.len(),
116 tables
117 );
118 Ok(manager)
119 }
120
121 pub fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
122 #[derive(QueryableByName, Debug, Clone)]
123 struct PartitionedTable {
124 #[diesel(sql_type = VarChar)]
125 table_name: String,
126 #[diesel(sql_type = BigInt)]
127 first_partition: i64,
128 #[diesel(sql_type = BigInt)]
129 last_partition: i64,
130 }
131
132 Ok(
133 read_only_blocking!(&self.cp, |conn| diesel::RunQueryDsl::load(
134 diesel::sql_query(GET_PARTITION_SQL),
135 conn
136 ))?
137 .into_iter()
138 .map(|table: PartitionedTable| {
139 (
140 table.table_name,
141 (table.first_partition as u64, table.last_partition as u64),
142 )
143 })
144 .collect(),
145 )
146 }
147
148 pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
152 self.partition_strategies
153 .get(table_name)
154 .copied()
155 .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
156 }
157
158 pub fn determine_epoch_partition_range(
159 &self,
160 table_name: &str,
161 data: &EpochPartitionData,
162 ) -> Option<(u64, u64)> {
163 match self.get_strategy(table_name) {
164 PgPartitionStrategy::CheckpointSequenceNumber => {
165 Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
166 }
167 PgPartitionStrategy::TxSequenceNumber => {
168 Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
169 }
170 PgPartitionStrategy::ObjectId => None,
171 }
172 }
173
174 pub fn advance_epoch(
175 &self,
176 table: String,
177 last_partition: u64,
178 data: &EpochPartitionData,
179 ) -> Result<(), IndexerError> {
180 let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
181 return Ok(());
182 };
183 if data.next_epoch == 0 {
184 tracing::info!("Epoch 0 partition has been created in the initial setup.");
185 return Ok(());
186 }
187 if last_partition == data.last_epoch {
188 transactional_blocking_with_retry!(
189 &self.cp,
190 |conn| {
191 RunQueryDsl::execute(
192 diesel::sql_query("CALL advance_partition($1, $2, $3, $4, $5)")
193 .bind::<diesel::sql_types::Text, _>(table.clone())
194 .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
195 .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
196 .bind::<diesel::sql_types::BigInt, _>(partition_range.0 as i64)
197 .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
198 conn,
199 )
200 },
201 Duration::from_secs(10)
202 )?;
203
204 info!(
205 "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
206 table, last_partition, data.next_epoch, partition_range.0
207 );
208 } else if last_partition != data.next_epoch {
209 error!(
212 "Epoch partition for table {} is not in sync with the last epoch {}.",
213 table, data.last_epoch
214 );
215 } else {
216 info!(
217 "Epoch has been advanced to {} already, skipping.",
218 data.next_epoch
219 );
220 }
221 Ok(())
222 }
223
224 pub fn drop_table_partition(&self, table: String, partition: u64) -> Result<(), IndexerError> {
225 transactional_blocking_with_retry!(
226 &self.cp,
227 |conn| {
228 RunQueryDsl::execute(
229 diesel::sql_query("CALL drop_partition($1, $2)")
230 .bind::<diesel::sql_types::Text, _>(table.clone())
231 .bind::<diesel::sql_types::BigInt, _>(partition as i64),
232 conn,
233 )
234 },
235 Duration::from_secs(10)
236 )?;
237 Ok(())
238 }
239}