1use std::{
6 cmp::{max, min},
7 sync::Arc,
8};
9
10use anyhow::Error;
11use async_trait::async_trait;
12use iota_metrics::{metered_channel, spawn_monitored_task};
13use tokio::task::JoinHandle;
14
15use crate::{Task, Tasks};
16
17type CheckpointData<T> = (u64, Vec<T>);
18pub type DataSender<T> = metered_channel::Sender<CheckpointData<T>>;
19
20pub struct IndexerBuilder<D, M> {
21 name: String,
22 datasource: D,
23 data_mapper: M,
24 backfill_strategy: BackfillStrategy,
25 disable_live_task: bool,
26}
27
28impl<D, M> IndexerBuilder<D, M> {
29 pub fn new(name: &str, datasource: D, data_mapper: M) -> IndexerBuilder<D, M> {
30 IndexerBuilder {
31 name: name.into(),
32 datasource,
33 data_mapper,
34 backfill_strategy: BackfillStrategy::Simple,
35 disable_live_task: false,
36 }
37 }
38 pub fn build<R, P>(
39 self,
40 start_from_checkpoint: u64,
41 genesis_checkpoint: u64,
42 persistent: P,
43 ) -> Indexer<P, D, M>
44 where
45 P: Persistent<R>,
46 {
47 Indexer {
48 name: self.name,
49 storage: persistent,
50 datasource: self.datasource.into(),
51 backfill_strategy: self.backfill_strategy,
52 disable_live_task: self.disable_live_task,
53 start_from_checkpoint,
54 data_mapper: self.data_mapper,
55 genesis_checkpoint,
56 }
57 }
58
59 pub fn with_backfill_strategy(mut self, backfill: BackfillStrategy) -> Self {
60 self.backfill_strategy = backfill;
61 self
62 }
63
64 pub fn disable_live_task(mut self) -> Self {
65 self.disable_live_task = true;
66 self
67 }
68}
69
70pub struct Indexer<P, D, M> {
71 name: String,
72 storage: P,
73 datasource: Arc<D>,
74 data_mapper: M,
75 backfill_strategy: BackfillStrategy,
76 disable_live_task: bool,
77 start_from_checkpoint: u64,
78 genesis_checkpoint: u64,
79}
80
81impl<P, D, M> Indexer<P, D, M> {
82 pub async fn start<T, R>(mut self) -> Result<(), Error>
83 where
84 D: Datasource<T> + 'static,
85 M: DataMapper<T, R> + 'static,
86 P: Persistent<R> + 'static,
87 T: Send,
88 {
89 self.update_tasks().await?;
91 let updated_tasks = self.storage.tasks(&self.name).await?;
93 let live_task_future = match updated_tasks.live_task() {
97 Some(live_task) if !self.disable_live_task => {
98 let live_task_future = self.datasource.start_ingestion_task(
99 live_task.task_name.clone(),
100 live_task.checkpoint,
101 live_task.target_checkpoint,
102 self.storage.clone(),
103 self.data_mapper.clone(),
104 );
105 Some(live_task_future)
106 }
107 _ => None,
108 };
109
110 let backfill_tasks = updated_tasks.backfill_tasks();
111 let storage_clone = self.storage.clone();
112 let data_mapper_clone = self.data_mapper.clone();
113 let datasource_clone = self.datasource.clone();
114
115 let handle = spawn_monitored_task!(async {
116 for backfill_task in backfill_tasks {
118 if backfill_task.checkpoint < backfill_task.target_checkpoint {
119 datasource_clone
120 .start_ingestion_task(
121 backfill_task.task_name.clone(),
122 backfill_task.checkpoint,
123 backfill_task.target_checkpoint,
124 storage_clone.clone(),
125 data_mapper_clone.clone(),
126 )
127 .await
128 .expect("Backfill task failed");
129 }
130 }
131 });
132
133 if let Some(live_task_future) = live_task_future {
134 live_task_future.await?;
135 }
136
137 tokio::try_join!(handle)?;
138
139 Ok(())
140 }
141
142 async fn update_tasks<R>(&mut self) -> Result<(), Error>
143 where
144 P: Persistent<R>,
145 {
146 let tasks = self.storage.tasks(&self.name).await?;
147 let backfill_tasks = tasks.backfill_tasks();
148 let latest_task = backfill_tasks.first();
149
150 if !self.disable_live_task {
152 let from_checkpoint = max(
153 self.start_from_checkpoint,
154 latest_task
155 .map(|t| t.target_checkpoint + 1)
156 .unwrap_or_default(),
157 );
158
159 match tasks.live_task() {
160 None => {
161 self.storage
162 .register_task(
163 format!("{} - Live", self.name),
164 from_checkpoint,
165 i64::MAX as u64,
166 )
167 .await?;
168 }
169 Some(mut live_task) => {
170 if self.start_from_checkpoint > live_task.checkpoint {
171 live_task.checkpoint = self.start_from_checkpoint;
172 self.storage.update_task(live_task).await?;
173 }
174 }
175 }
176 }
177
178 match latest_task {
180 None => {
181 if self.start_from_checkpoint != self.genesis_checkpoint {
184 self.create_backfill_tasks(
185 self.genesis_checkpoint,
186 self.start_from_checkpoint - 1,
187 )
188 .await?
189 }
190 }
191 Some(latest_task) => {
192 if latest_task.target_checkpoint + 1 < self.start_from_checkpoint {
193 self.create_backfill_tasks(
194 latest_task.target_checkpoint + 1,
195 self.start_from_checkpoint - 1,
196 )
197 .await?;
198 }
199 }
200 }
201 Ok(())
202 }
203
204 async fn create_backfill_tasks<R>(&mut self, mut from_cp: u64, to_cp: u64) -> Result<(), Error>
206 where
207 P: Persistent<R>,
208 {
209 match self.backfill_strategy {
210 BackfillStrategy::Simple => {
211 self.storage
212 .register_task(
213 format!("{} - backfill - {from_cp}:{to_cp}", self.name),
214 from_cp,
215 to_cp,
216 )
217 .await
218 }
219 BackfillStrategy::Partitioned { task_size } => {
220 while from_cp < self.start_from_checkpoint {
221 let target_cp = min(from_cp + task_size - 1, to_cp);
222 self.storage
223 .register_task(
224 format!("{} - backfill - {from_cp}:{target_cp}", self.name),
225 from_cp,
226 target_cp,
227 )
228 .await?;
229 from_cp = target_cp + 1;
230 }
231 Ok(())
232 }
233 BackfillStrategy::Disabled => Ok(()),
234 }
235 }
236}
237
238#[async_trait]
239pub trait Persistent<T>: IndexerProgressStore + Sync + Send + Clone {
240 async fn write(&self, data: Vec<T>) -> Result<(), Error>;
241}
242
243#[async_trait]
244pub trait IndexerProgressStore: Send {
245 async fn load_progress(&self, task_name: String) -> anyhow::Result<u64>;
246 async fn save_progress(
247 &mut self,
248 task_name: String,
249 checkpoint_number: u64,
250 ) -> anyhow::Result<()>;
251
252 async fn tasks(&self, task_prefix: &str) -> Result<Vec<Task>, Error>;
253
254 async fn register_task(
255 &mut self,
256 task_name: String,
257 checkpoint: u64,
258 target_checkpoint: u64,
259 ) -> Result<(), anyhow::Error>;
260
261 async fn update_task(&mut self, task: Task) -> Result<(), Error>;
262}
263
264#[async_trait]
265pub trait Datasource<T: Send>: Sync + Send {
266 async fn start_ingestion_task<M, P, R>(
267 &self,
268 task_name: String,
269 starting_checkpoint: u64,
270 target_checkpoint: u64,
271 mut storage: P,
272 data_mapper: M,
273 ) -> Result<(), Error>
274 where
275 M: DataMapper<T, R>,
276 P: Persistent<R>,
277 {
278 let (data_sender, mut data_channel) = metered_channel::channel(
280 1000,
281 &iota_metrics::get_metrics()
282 .unwrap()
283 .channel_inflight
284 .with_label_values(&[&task_name]),
285 );
286 let join_handle = self
287 .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender)
288 .await?;
289
290 while let Some((block_number, data)) = data_channel.recv().await {
291 if block_number > target_checkpoint {
292 break;
293 }
294 if !data.is_empty() {
295 let processed_data = data.into_iter().try_fold(vec![], |mut result, d| {
296 result.append(&mut data_mapper.map(d)?);
297 Ok::<Vec<_>, Error>(result)
298 })?;
299 storage.write(processed_data).await?;
301 }
302 storage
303 .save_progress(task_name.clone(), block_number)
304 .await?;
305 }
306 join_handle.abort();
307 join_handle.await?
308 }
309
310 async fn start_data_retrieval(
311 &self,
312 starting_checkpoint: u64,
313 target_checkpoint: u64,
314 data_sender: DataSender<T>,
315 ) -> Result<JoinHandle<Result<(), Error>>, Error>;
316}
317
318pub enum BackfillStrategy {
319 Simple,
320 Partitioned { task_size: u64 },
321 Disabled,
322}
323
324pub trait DataMapper<T, R>: Sync + Send + Clone {
325 fn map(&self, data: T) -> Result<Vec<R>, anyhow::Error>;
326}