Function setup_single_workflow

Source
pub async fn setup_single_workflow<W: Worker + 'static>(
    worker: W,
    remote_store_url: String,
    initial_checkpoint_number: CheckpointSequenceNumber,
    concurrency: usize,
    reader_options: Option<ReaderOptions>,
) -> IngestionResult<(impl Future<Output = IngestionResult<HashMap<String, CheckpointSequenceNumber>>>, CancellationToken)>
Expand description

Sets up a single workflow for data ingestion.

This function initializes an IndexerExecutor with a single worker pool, using a ShimProgressStore initialized with the provided initial_checkpoint_number. It then returns a future that runs the executor and a [CancellationToken] for graceful shutdown.

§Docs

For more info please check the custom indexer docs.

§Example

use std::sync::Arc;

use async_trait::async_trait;
use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
use iota_types::full_checkpoint_content::CheckpointData;

struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
    type Message = ();
    type Error = IngestionError;

    async fn process_checkpoint(
        &self,
        checkpoint: Arc<CheckpointData>,
    ) -> Result<Self::Message, Self::Error> {
        // custom processing logic.
        println!(
            "Processing checkpoint: {}",
            checkpoint.checkpoint_summary.to_string()
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let (executor, _) = setup_single_workflow(
        CustomWorker,
        "http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
        0,                                          // initial checkpoint number.
        5,                                          // concurrency.
        None,                                       // extra reader options.
    )
    .await
    .unwrap();
    executor.await.unwrap();
}