pub async fn setup_single_workflow<W: Worker + 'static>(
worker: W,
remote_store_url: String,
initial_checkpoint_number: CheckpointSequenceNumber,
concurrency: usize,
reader_options: Option<ReaderOptions>,
) -> IngestionResult<(impl Future<Output = IngestionResult<HashMap<String, CheckpointSequenceNumber>>>, CancellationToken)>
Expand description
Sets up a single workflow for data ingestion.
This function initializes an IndexerExecutor
with a single worker pool,
using a ShimProgressStore
initialized with the provided
initial_checkpoint_number
. It then returns a future that runs the executor
and a [CancellationToken
] for graceful shutdown.
§Docs
For more info please check the custom indexer docs.
§Example
use std::sync::Arc;
use async_trait::async_trait;
use iota_data_ingestion_core::{IngestionError, Worker, setup_single_workflow};
use iota_types::full_checkpoint_content::CheckpointData;
struct CustomWorker;
#[async_trait]
impl Worker for CustomWorker {
type Message = ();
type Error = IngestionError;
async fn process_checkpoint(
&self,
checkpoint: Arc<CheckpointData>,
) -> Result<Self::Message, Self::Error> {
// custom processing logic.
println!(
"Processing checkpoint: {}",
checkpoint.checkpoint_summary.to_string()
);
Ok(())
}
}
#[tokio::main]
async fn main() {
let (executor, _) = setup_single_workflow(
CustomWorker,
"http://127.0.0.1:9000/api/v1".to_string(), // fullnode REST API
0, // initial checkpoint number.
5, // concurrency.
None, // extra reader options.
)
.await
.unwrap();
executor.await.unwrap();
}