iota_indexer_builder/
iota_datasource.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::PathBuf, sync::Arc};
6
7use anyhow::Error;
8use async_trait::async_trait;
9use iota_data_ingestion_core::{
10    DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
11};
12use iota_metrics::{metered_channel, spawn_monitored_task};
13use iota_types::{
14    base_types::TransactionDigest,
15    full_checkpoint_content::{CheckpointData as IotaCheckpointData, CheckpointTransaction},
16    messages_checkpoint::CheckpointSequenceNumber,
17};
18use tokio::task::JoinHandle;
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use crate::indexer_builder::{DataSender, Datasource};
23
24pub struct IotaCheckpointDatasource {
25    remote_store_url: String,
26    concurrency: usize,
27    checkpoint_path: PathBuf,
28    metrics: DataIngestionMetrics,
29}
30impl IotaCheckpointDatasource {
31    pub fn new(
32        remote_store_url: String,
33        concurrency: usize,
34        checkpoint_path: PathBuf,
35        metrics: DataIngestionMetrics,
36    ) -> Self {
37        IotaCheckpointDatasource {
38            remote_store_url,
39            concurrency,
40            checkpoint_path,
41            metrics,
42        }
43    }
44}
45
46#[async_trait]
47impl Datasource<CheckpointTxnData> for IotaCheckpointDatasource {
48    async fn start_data_retrieval(
49        &self,
50        starting_checkpoint: u64,
51        target_checkpoint: u64,
52        data_sender: DataSender<CheckpointTxnData>,
53    ) -> Result<JoinHandle<Result<(), Error>>, Error> {
54        let token = CancellationToken::new();
55        let child_token = token.child_token();
56        let progress_store = PerTaskInMemProgressStore {
57            current_checkpoint: starting_checkpoint,
58            exit_checkpoint: target_checkpoint,
59            token: Some(token),
60        };
61        let mut executor =
62            IndexerExecutor::new(progress_store, 1, self.metrics.clone(), child_token);
63        let worker = IndexerWorker::new(data_sender);
64        let worker_pool = WorkerPool::new(
65            worker,
66            TransactionDigest::random().to_string(),
67            self.concurrency,
68            Default::default(),
69        );
70        executor.register(worker_pool).await?;
71        let checkpoint_path = self.checkpoint_path.clone();
72        let remote_store_url = self.remote_store_url.clone();
73        Ok(spawn_monitored_task!(async {
74            executor
75                .run(
76                    checkpoint_path,
77                    Some(remote_store_url),
78                    vec![], // optional remote store access options
79                    ReaderOptions::default(),
80                )
81                .await?;
82            Ok(())
83        }))
84    }
85}
86
87struct PerTaskInMemProgressStore {
88    pub current_checkpoint: u64,
89    pub exit_checkpoint: u64,
90    pub token: Option<CancellationToken>,
91}
92
93#[async_trait]
94impl ProgressStore for PerTaskInMemProgressStore {
95    type Error = anyhow::Error;
96
97    async fn load(&mut self, _task_name: String) -> Result<CheckpointSequenceNumber, Self::Error> {
98        Ok(self.current_checkpoint)
99    }
100
101    async fn save(
102        &mut self,
103        _task_name: String,
104        checkpoint_number: CheckpointSequenceNumber,
105    ) -> Result<(), Self::Error> {
106        if checkpoint_number >= self.exit_checkpoint {
107            if let Some(token) = self.token.take() {
108                token.cancel();
109            }
110        }
111        self.current_checkpoint = checkpoint_number;
112        Ok(())
113    }
114}
115
116pub struct IndexerWorker<T> {
117    data_sender: metered_channel::Sender<(u64, Vec<T>)>,
118}
119
120impl<T> IndexerWorker<T> {
121    pub fn new(data_sender: metered_channel::Sender<(u64, Vec<T>)>) -> Self {
122        Self { data_sender }
123    }
124}
125
126pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);
127
128#[async_trait]
129impl Worker for IndexerWorker<CheckpointTxnData> {
130    type Message = ();
131    type Error = anyhow::Error;
132
133    async fn process_checkpoint(
134        &self,
135        checkpoint: Arc<IotaCheckpointData>,
136    ) -> Result<Self::Message, Self::Error> {
137        info!(
138            "Received checkpoint [{}] {}: {}",
139            checkpoint.checkpoint_summary.epoch,
140            checkpoint.checkpoint_summary.sequence_number,
141            checkpoint.transactions.len(),
142        );
143        let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
144        let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;
145
146        let transactions = checkpoint
147            .transactions
148            .iter()
149            .cloned()
150            .map(|tx| (tx, checkpoint_num, timestamp_ms))
151            .collect();
152        Ok(self
153            .data_sender
154            .send((checkpoint_num, transactions))
155            .await?)
156    }
157}