archival_ingestion/
archival_ingestion.rs1use anyhow::Result;
6use iota_data_ingestion::{ArchivalConfig, ArchivalReducer, RelayWorker};
7use iota_data_ingestion_core::{
8 DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, WorkerPool,
9};
10use prometheus::Registry;
11use serde::{Deserialize, Serialize};
12use tokio_util::sync::CancellationToken;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15struct Config {
16 remote_store_url: String,
17 archive_url: String,
18 archive_remote_store_options: Vec<(String, String)>,
19 #[serde(default = "default_commit_file_size")]
20 commit_file_size: usize,
21 #[serde(default = "default_commit_duration_seconds")]
22 commit_duration_seconds: u64,
23}
24
25fn default_commit_file_size() -> usize {
26 268435456
27}
28
29fn default_commit_duration_seconds() -> u64 {
30 600
31}
32
33#[tokio::main]
34async fn main() -> Result<()> {
35 let args: Vec<String> = std::env::args().collect();
36 assert_eq!(args.len(), 2, "configuration yaml file is required");
37 let config: Config = serde_yaml::from_str(&std::fs::read_to_string(&args[1])?)?;
38
39 let archival_config = ArchivalConfig {
40 remote_url: config.archive_url,
41 remote_store_options: config.archive_remote_store_options,
42 commit_file_size: config.commit_file_size,
43 commit_duration_seconds: config.commit_duration_seconds,
44 };
45
46 let reducer = ArchivalReducer::new(archival_config).await?;
47 let progress_store = ShimProgressStore(reducer.get_watermark().await?);
48 let mut executor = IndexerExecutor::new(
49 progress_store,
50 1,
51 DataIngestionMetrics::new(&Registry::new()),
52 CancellationToken::new(),
53 );
54 let worker_pool = WorkerPool::new_with_reducer(
55 RelayWorker,
56 "archival".to_string(),
57 1,
58 Default::default(),
59 reducer,
60 );
61 executor.register(worker_pool).await?;
62 executor
63 .run(
64 tempfile::tempdir()?.into_path(),
65 Some(config.remote_store_url),
66 vec![],
67 ReaderOptions::default(),
68 )
69 .await?;
70 Ok(())
71}