iota_indexer_builder/
iota_datasource.rs1use std::{path::PathBuf, sync::Arc};
6
7use anyhow::Error;
8use async_trait::async_trait;
9use iota_data_ingestion_core::{
10 DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
11};
12use iota_metrics::{metered_channel, spawn_monitored_task};
13use iota_types::{
14 base_types::TransactionDigest,
15 full_checkpoint_content::{CheckpointData as IotaCheckpointData, CheckpointTransaction},
16 messages_checkpoint::CheckpointSequenceNumber,
17};
18use tokio::task::JoinHandle;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use crate::indexer_builder::{DataSender, Datasource};
23
24pub struct IotaCheckpointDatasource {
25 remote_store_url: String,
26 concurrency: usize,
27 checkpoint_path: PathBuf,
28 metrics: DataIngestionMetrics,
29}
30impl IotaCheckpointDatasource {
31 pub fn new(
32 remote_store_url: String,
33 concurrency: usize,
34 checkpoint_path: PathBuf,
35 metrics: DataIngestionMetrics,
36 ) -> Self {
37 IotaCheckpointDatasource {
38 remote_store_url,
39 concurrency,
40 checkpoint_path,
41 metrics,
42 }
43 }
44}
45
46#[async_trait]
47impl Datasource<CheckpointTxnData> for IotaCheckpointDatasource {
48 async fn start_data_retrieval(
49 &self,
50 starting_checkpoint: u64,
51 target_checkpoint: u64,
52 data_sender: DataSender<CheckpointTxnData>,
53 ) -> Result<JoinHandle<Result<(), Error>>, Error> {
54 let token = CancellationToken::new();
55 let child_token = token.child_token();
56 let progress_store = PerTaskInMemProgressStore {
57 current_checkpoint: starting_checkpoint,
58 exit_checkpoint: target_checkpoint,
59 token: Some(token),
60 };
61 let mut executor =
62 IndexerExecutor::new(progress_store, 1, self.metrics.clone(), child_token);
63 let worker = IndexerWorker::new(data_sender);
64 let worker_pool = WorkerPool::new(
65 worker,
66 TransactionDigest::random().to_string(),
67 self.concurrency,
68 Default::default(),
69 );
70 executor.register(worker_pool).await?;
71 let checkpoint_path = self.checkpoint_path.clone();
72 let remote_store_url = self.remote_store_url.clone();
73 Ok(spawn_monitored_task!(async {
74 executor
75 .run(
76 checkpoint_path,
77 Some(remote_store_url),
78 vec![], ReaderOptions::default(),
80 )
81 .await?;
82 Ok(())
83 }))
84 }
85}
86
87struct PerTaskInMemProgressStore {
88 pub current_checkpoint: u64,
89 pub exit_checkpoint: u64,
90 pub token: Option<CancellationToken>,
91}
92
93#[async_trait]
94impl ProgressStore for PerTaskInMemProgressStore {
95 type Error = anyhow::Error;
96
97 async fn load(&mut self, _task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
98 Ok(self.current_checkpoint)
99 }
100
101 async fn save(
102 &mut self,
103 _task_name: String,
104 checkpoint_number: CheckpointSequenceNumber,
105 ) -> Result<(), Self::Error> {
106 if checkpoint_number >= self.exit_checkpoint {
107 if let Some(token) = self.token.take() {
108 token.cancel();
109 }
110 }
111 self.current_checkpoint = checkpoint_number;
112 Ok(())
113 }
114}
115
116pub struct IndexerWorker<T> {
117 data_sender: metered_channel::Sender<(u64, Vec<T>)>,
118}
119
120impl<T> IndexerWorker<T> {
121 pub fn new(data_sender: metered_channel::Sender<(u64, Vec<T>)>) -> Self {
122 Self { data_sender }
123 }
124}
125
126pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
127
128#[async_trait]
129impl Worker for IndexerWorker<CheckpointTxnData> {
130 type Message = ();
131 type Error = anyhow::Error;
132
133 async fn process_checkpoint(
134 &self,
135 checkpoint: Arc<IotaCheckpointData>,
136 ) -> Result<Self::Message, Self::Error> {
137 info!(
138 "Received checkpoint [{}] {}: {}",
139 checkpoint.checkpoint_summary.epoch,
140 checkpoint.checkpoint_summary.sequence_number,
141 checkpoint.transactions.len(),
142 );
143 let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
144 let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
145
146 let transactions = checkpoint
147 .transactions
148 .iter()
149 .cloned()
150 .map(|tx| (tx, checkpoint_num, timestamp_ms))
151 .collect();
152 Ok(self
153 .data_sender
154 .send((checkpoint_num, transactions))
155 .await?)
156 }
157}