1use std::{
6 collections::{BTreeMap, HashMap},
7 time::Duration,
8};
9
10use diesel::{
11 QueryableByName, RunQueryDsl,
12 sql_types::{BigInt, VarChar},
13};
14use downcast::Any;
15use tracing::{error, info};
16
17use crate::{
18 db::ConnectionPool, errors::IndexerError, handlers::EpochToCommit,
19 models::epoch::StoredEpochInfo, store::diesel_macro::*,
20};
21
22const GET_PARTITION_SQL: &str = {
23 r"
24SELECT parent.relname AS table_name,
25 MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
26 MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
27FROM pg_inherits
28 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
29 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
30 JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
31 JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
32WHERE parent.relkind = 'p'
33GROUP BY table_name;
34"
35};
36
37pub struct PgPartitionManager {
38 cp: ConnectionPool,
39 partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
40}
41
42impl Clone for PgPartitionManager {
43 fn clone(&self) -> PgPartitionManager {
44 Self {
45 cp: self.cp.clone(),
46 partition_strategies: self.partition_strategies.clone(),
47 }
48 }
49}
50
51#[derive(Clone, Copy)]
52pub enum PgPartitionStrategy {
53 CheckpointSequenceNumber,
54 TxSequenceNumber,
55 ObjectId,
56}
57
58impl PgPartitionStrategy {
59 pub fn is_epoch_partitioned(&self) -> bool {
60 matches!(
61 self,
62 Self::CheckpointSequenceNumber | Self::TxSequenceNumber
63 )
64 }
65}
66
67#[derive(Clone, Debug)]
68pub struct EpochPartitionData {
69 last_epoch: u64,
70 next_epoch: u64,
71 last_epoch_start_cp: u64,
72 next_epoch_start_cp: u64,
73 last_epoch_start_tx: u64,
74 next_epoch_start_tx: u64,
75}
76
77impl EpochPartitionData {
78 pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
79 let last_epoch = last_db_epoch.epoch as u64;
80 let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
81 let next_epoch = epoch.new_epoch.epoch as u64;
82 let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id as u64;
83
84 let next_epoch_start_tx = epoch.new_epoch.first_tx_sequence_number as u64;
85 let last_epoch_start_tx = last_db_epoch.first_tx_sequence_number as u64;
86
87 Self {
88 last_epoch,
89 next_epoch,
90 last_epoch_start_cp,
91 next_epoch_start_cp,
92 last_epoch_start_tx,
93 next_epoch_start_tx,
94 }
95 }
96}
97
98impl PgPartitionManager {
99 pub fn new(cp: ConnectionPool) -> Result<Self, IndexerError> {
100 let mut partition_strategies = HashMap::new();
101 partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
102 partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
103 partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
104 let manager = Self {
105 cp,
106 partition_strategies,
107 };
108 let tables = manager.get_table_partitions()?;
109 info!(
110 "Found {} tables with partitions : [{:?}]",
111 tables.len(),
112 tables
113 );
114 Ok(manager)
115 }
116
117 pub fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
118 #[derive(QueryableByName, Debug, Clone)]
119 struct PartitionedTable {
120 #[diesel(sql_type = VarChar)]
121 table_name: String,
122 #[diesel(sql_type = BigInt)]
123 first_partition: i64,
124 #[diesel(sql_type = BigInt)]
125 last_partition: i64,
126 }
127
128 Ok(
129 read_only_blocking!(&self.cp, |conn| diesel::RunQueryDsl::load(
130 diesel::sql_query(GET_PARTITION_SQL),
131 conn
132 ))?
133 .into_iter()
134 .map(|table: PartitionedTable| {
135 (
136 table.table_name,
137 (table.first_partition as u64, table.last_partition as u64),
138 )
139 })
140 .collect(),
141 )
142 }
143
144 pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
148 self.partition_strategies
149 .get(table_name)
150 .copied()
151 .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
152 }
153
154 pub fn determine_epoch_partition_range(
155 &self,
156 table_name: &str,
157 data: &EpochPartitionData,
158 ) -> Option<(u64, u64)> {
159 match self.get_strategy(table_name) {
160 PgPartitionStrategy::CheckpointSequenceNumber => {
161 Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
162 }
163 PgPartitionStrategy::TxSequenceNumber => {
164 Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
165 }
166 PgPartitionStrategy::ObjectId => None,
167 }
168 }
169
170 pub fn advance_epoch(
171 &self,
172 table: String,
173 last_partition: u64,
174 data: &EpochPartitionData,
175 ) -> Result<(), IndexerError> {
176 let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
177 return Ok(());
178 };
179 if data.next_epoch == 0 {
180 tracing::info!("Epoch 0 partition has been created in the initial setup.");
181 return Ok(());
182 }
183 if last_partition == data.last_epoch {
184 transactional_blocking_with_retry!(
185 &self.cp,
186 |conn| {
187 RunQueryDsl::execute(
188 diesel::sql_query("CALL advance_partition($1, $2, $3, $4, $5)")
189 .bind::<diesel::sql_types::Text, _>(table.clone())
190 .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
191 .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
192 .bind::<diesel::sql_types::BigInt, _>(partition_range.0 as i64)
193 .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
194 conn,
195 )
196 },
197 Duration::from_secs(10)
198 )?;
199
200 info!(
201 "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
202 table, last_partition, data.next_epoch, partition_range.0
203 );
204 } else if last_partition != data.next_epoch {
205 error!(
208 "Epoch partition for table {} is not in sync with the last epoch {}.",
209 table, data.last_epoch
210 );
211 } else {
212 info!(
213 "Epoch has been advanced to {} already, skipping.",
214 data.next_epoch
215 );
216 }
217 Ok(())
218 }
219
220 pub fn drop_table_partition(&self, table: String, partition: u64) -> Result<(), IndexerError> {
221 transactional_blocking_with_retry!(
222 &self.cp,
223 |conn| {
224 RunQueryDsl::execute(
225 diesel::sql_query("CALL drop_partition($1, $2)")
226 .bind::<diesel::sql_types::Text, _>(table.clone())
227 .bind::<diesel::sql_types::BigInt, _>(partition as i64),
228 conn,
229 )
230 },
231 Duration::from_secs(10)
232 )?;
233 Ok(())
234 }
235}