iota_indexer_builder/
indexer_builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{max, min},
7    sync::Arc,
8};
9
10use anyhow::Error;
11use async_trait::async_trait;
12use iota_metrics::{metered_channel, spawn_monitored_task};
13use tokio::task::JoinHandle;
14
15use crate::{Task, Tasks};
16
17type CheckpointData<T> = (u64, Vec<T>);
18pub type DataSender<T> = metered_channel::Sender<CheckpointData<T>>;
19
20pub struct IndexerBuilder<D, M> {
21    name: String,
22    datasource: D,
23    data_mapper: M,
24    backfill_strategy: BackfillStrategy,
25    disable_live_task: bool,
26}
27
28impl<D, M> IndexerBuilder<D, M> {
29    pub fn new(name: &str, datasource: D, data_mapper: M) -> IndexerBuilder<D, M> {
30        IndexerBuilder {
31            name: name.into(),
32            datasource,
33            data_mapper,
34            backfill_strategy: BackfillStrategy::Simple,
35            disable_live_task: false,
36        }
37    }
38    pub fn build<R, P>(
39        self,
40        start_from_checkpoint: u64,
41        genesis_checkpoint: u64,
42        persistent: P,
43    ) -> Indexer<P, D, M>
44    where
45        P: Persistent<R>,
46    {
47        Indexer {
48            name: self.name,
49            storage: persistent,
50            datasource: self.datasource.into(),
51            backfill_strategy: self.backfill_strategy,
52            disable_live_task: self.disable_live_task,
53            start_from_checkpoint,
54            data_mapper: self.data_mapper,
55            genesis_checkpoint,
56        }
57    }
58
59    pub fn with_backfill_strategy(mut self, backfill: BackfillStrategy) -> Self {
60        self.backfill_strategy = backfill;
61        self
62    }
63
64    pub fn disable_live_task(mut self) -> Self {
65        self.disable_live_task = true;
66        self
67    }
68}
69
70pub struct Indexer<P, D, M> {
71    name: String,
72    storage: P,
73    datasource: Arc<D>,
74    data_mapper: M,
75    backfill_strategy: BackfillStrategy,
76    disable_live_task: bool,
77    start_from_checkpoint: u64,
78    genesis_checkpoint: u64,
79}
80
81impl<P, D, M> Indexer<P, D, M> {
82    pub async fn start<T, R>(mut self) -> Result<(), Error>
83    where
84        D: Datasource<T> + 'static,
85        M: DataMapper<T, R> + 'static,
86        P: Persistent<R> + 'static,
87        T: Send,
88    {
89        // Update tasks first
90        self.update_tasks().await?;
91        // get updated tasks from storage and start workers
92        let updated_tasks = self.storage.tasks(&self.name).await?;
93        // Start latest checkpoint worker
94        // Tasks are ordered in checkpoint descending order, realtime update task always
95        // come first tasks won't be empty here, ok to unwrap.
96        let live_task_future = match updated_tasks.live_task() {
97            Some(live_task) if !self.disable_live_task => {
98                let live_task_future = self.datasource.start_ingestion_task(
99                    live_task.task_name.clone(),
100                    live_task.checkpoint,
101                    live_task.target_checkpoint,
102                    self.storage.clone(),
103                    self.data_mapper.clone(),
104                );
105                Some(live_task_future)
106            }
107            _ => None,
108        };
109
110        let backfill_tasks = updated_tasks.backfill_tasks();
111        let storage_clone = self.storage.clone();
112        let data_mapper_clone = self.data_mapper.clone();
113        let datasource_clone = self.datasource.clone();
114
115        let handle = spawn_monitored_task!(async {
116            // Execute task one by one
117            for backfill_task in backfill_tasks {
118                if backfill_task.checkpoint < backfill_task.target_checkpoint {
119                    datasource_clone
120                        .start_ingestion_task(
121                            backfill_task.task_name.clone(),
122                            backfill_task.checkpoint,
123                            backfill_task.target_checkpoint,
124                            storage_clone.clone(),
125                            data_mapper_clone.clone(),
126                        )
127                        .await
128                        .expect("Backfill task failed");
129                }
130            }
131        });
132
133        if let Some(live_task_future) = live_task_future {
134            live_task_future.await?;
135        }
136
137        tokio::try_join!(handle)?;
138
139        Ok(())
140    }
141
142    async fn update_tasks<R>(&mut self) -> Result<(), Error>
143    where
144        P: Persistent<R>,
145    {
146        let tasks = self.storage.tasks(&self.name).await?;
147        let backfill_tasks = tasks.backfill_tasks();
148        let latest_task = backfill_tasks.first();
149
150        // 1, create and update live task if needed
151        if !self.disable_live_task {
152            let from_checkpoint = max(
153                self.start_from_checkpoint,
154                latest_task
155                    .map(|t| t.target_checkpoint + 1)
156                    .unwrap_or_default(),
157            );
158
159            match tasks.live_task() {
160                None => {
161                    self.storage
162                        .register_task(
163                            format!("{} - Live", self.name),
164                            from_checkpoint,
165                            i64::MAX as u64,
166                        )
167                        .await?;
168                }
169                Some(mut live_task) => {
170                    if self.start_from_checkpoint > live_task.checkpoint {
171                        live_task.checkpoint = self.start_from_checkpoint;
172                        self.storage.update_task(live_task).await?;
173                    }
174                }
175            }
176        }
177
178        // 2, create backfill tasks base on task config and existing tasks in the db
179        match latest_task {
180            None => {
181                // No task in database, create backfill tasks from genesis to
182                // `start_from_checkpoint`
183                if self.start_from_checkpoint != self.genesis_checkpoint {
184                    self.create_backfill_tasks(
185                        self.genesis_checkpoint,
186                        self.start_from_checkpoint - 1,
187                    )
188                    .await?
189                }
190            }
191            Some(latest_task) => {
192                if latest_task.target_checkpoint + 1 < self.start_from_checkpoint {
193                    self.create_backfill_tasks(
194                        latest_task.target_checkpoint + 1,
195                        self.start_from_checkpoint - 1,
196                    )
197                    .await?;
198                }
199            }
200        }
201        Ok(())
202    }
203
204    // Create backfill tasks according to backfill strategy
205    async fn create_backfill_tasks<R>(&mut self, mut from_cp: u64, to_cp: u64) -> Result<(), Error>
206    where
207        P: Persistent<R>,
208    {
209        match self.backfill_strategy {
210            BackfillStrategy::Simple => {
211                self.storage
212                    .register_task(
213                        format!("{} - backfill - {from_cp}:{to_cp}", self.name),
214                        from_cp,
215                        to_cp,
216                    )
217                    .await
218            }
219            BackfillStrategy::Partitioned { task_size } => {
220                while from_cp < self.start_from_checkpoint {
221                    let target_cp = min(from_cp + task_size - 1, to_cp);
222                    self.storage
223                        .register_task(
224                            format!("{} - backfill - {from_cp}:{target_cp}", self.name),
225                            from_cp,
226                            target_cp,
227                        )
228                        .await?;
229                    from_cp = target_cp + 1;
230                }
231                Ok(())
232            }
233            BackfillStrategy::Disabled => Ok(()),
234        }
235    }
236}
237
238#[async_trait]
239pub trait Persistent<T>: IndexerProgressStore + Sync + Send + Clone {
240    async fn write(&self, data: Vec<T>) -> Result<(), Error>;
241}
242
243#[async_trait]
244pub trait IndexerProgressStore: Send {
245    async fn load_progress(&self, task_name: String) -> anyhow::Result<u64>;
246    async fn save_progress(
247        &mut self,
248        task_name: String,
249        checkpoint_number: u64,
250    ) -> anyhow::Result<()>;
251
252    async fn tasks(&self, task_prefix: &str) -> Result<Vec<Task>, Error>;
253
254    async fn register_task(
255        &mut self,
256        task_name: String,
257        checkpoint: u64,
258        target_checkpoint: u64,
259    ) -> Result<(), anyhow::Error>;
260
261    async fn update_task(&mut self, task: Task) -> Result<(), Error>;
262}
263
264#[async_trait]
265pub trait Datasource<T: Send>: Sync + Send {
266    async fn start_ingestion_task<M, P, R>(
267        &self,
268        task_name: String,
269        starting_checkpoint: u64,
270        target_checkpoint: u64,
271        mut storage: P,
272        data_mapper: M,
273    ) -> Result<(), Error>
274    where
275        M: DataMapper<T, R>,
276        P: Persistent<R>,
277    {
278        // todo: add metrics for number of tasks
279        let (data_sender, mut data_channel) = metered_channel::channel(
280            1000,
281            &iota_metrics::get_metrics()
282                .unwrap()
283                .channel_inflight
284                .with_label_values(&[&task_name]),
285        );
286        let join_handle = self
287            .start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender)
288            .await?;
289
290        while let Some((block_number, data)) = data_channel.recv().await {
291            if block_number > target_checkpoint {
292                break;
293            }
294            if !data.is_empty() {
295                let processed_data = data.into_iter().try_fold(vec![], |mut result, d| {
296                    result.append(&mut data_mapper.map(d)?);
297                    Ok::<Vec<_>, Error>(result)
298                })?;
299                // TODO: we might be able to write data and progress in a single transaction.
300                storage.write(processed_data).await?;
301            }
302            storage
303                .save_progress(task_name.clone(), block_number)
304                .await?;
305        }
306        join_handle.abort();
307        join_handle.await?
308    }
309
310    async fn start_data_retrieval(
311        &self,
312        starting_checkpoint: u64,
313        target_checkpoint: u64,
314        data_sender: DataSender<T>,
315    ) -> Result<JoinHandle<Result<(), Error>>, Error>;
316}
317
318pub enum BackfillStrategy {
319    Simple,
320    Partitioned { task_size: u64 },
321    Disabled,
322}
323
324pub trait DataMapper<T, R>: Sync + Send + Clone {
325    fn map(&self, data: T) -> Result<Vec<R>, anyhow::Error>;
326}