archival_ingestion/
archival_ingestion.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use iota_data_ingestion::{ArchivalConfig, ArchivalReducer, RelayWorker};
7use iota_data_ingestion_core::{
8    DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, WorkerPool,
9};
10use prometheus::Registry;
11use serde::{Deserialize, Serialize};
12use tokio_util::sync::CancellationToken;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct Config {
16    remote_store_url: String,
17    archive_url: String,
18    archive_remote_store_options: Vec<(String, String)>,
19    #[serde(default = "default_commit_file_size")]
20    commit_file_size: usize,
21    #[serde(default = "default_commit_duration_seconds")]
22    commit_duration_seconds: u64,
23}
24
25fn default_commit_file_size() -> usize {
26    268435456
27}
28
29fn default_commit_duration_seconds() -> u64 {
30    600
31}
32
33#[tokio::main]
34async fn main() -> Result<()> {
35    let args: Vec<String> = std::env::args().collect();
36    assert_eq!(args.len(), 2, "configuration yaml file is required");
37    let config: Config = serde_yaml::from_str(&std::fs::read_to_string(&args[1])?)?;
38
39    let archival_config = ArchivalConfig {
40        remote_url: config.archive_url,
41        remote_store_options: config.archive_remote_store_options,
42        commit_file_size: config.commit_file_size,
43        commit_duration_seconds: config.commit_duration_seconds,
44    };
45
46    let reducer = ArchivalReducer::new(archival_config).await?;
47    let progress_store = ShimProgressStore(reducer.get_watermark().await?);
48    let mut executor = IndexerExecutor::new(
49        progress_store,
50        1,
51        DataIngestionMetrics::new(&Registry::new()),
52        CancellationToken::new(),
53    );
54    let worker_pool = WorkerPool::new_with_reducer(
55        RelayWorker,
56        "archival".to_string(),
57        1,
58        Default::default(),
59        reducer,
60    );
61    executor.register(worker_pool).await?;
62    executor
63        .run(
64            tempfile::tempdir()?.into_path(),
65            Some(config.remote_store_url),
66            vec![],
67            ReaderOptions::default(),
68        )
69        .await?;
70    Ok(())
71}