iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::handlers::objects_snapshot_handler::SnapshotLagConfig;
9use iota_indexer::{
10 errors::IndexerError,
11 store::{PgIndexerStore, indexer_store::IndexerStore},
12 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
15use iota_types::storage::RestStateReader;
16use test_cluster::{TestCluster, TestClusterBuilder};
17use tokio::{join, task::JoinHandle};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
23 server::graphiql_server::start_graphiql_server,
24};
25
26const VALIDATOR_COUNT: usize = 7;
27const EPOCH_DURATION_MS: u64 = 15000;
28
29const ACCOUNT_NUM: usize = 20;
30const GAS_OBJECT_COUNT: usize = 3;
31
32pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
33
34pub struct ExecutorCluster {
35 pub executor_server_handle: JoinHandle<()>,
36 pub indexer_store: PgIndexerStore,
37 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38 pub graphql_server_join_handle: JoinHandle<()>,
39 pub graphql_client: SimpleClient,
40 pub snapshot_config: SnapshotLagConfig,
41 pub graphql_connection_config: ConnectionConfig,
42 pub cancellation_token: CancellationToken,
43}
44
45pub struct Cluster {
46 pub validator_fullnode_handle: TestCluster,
47 pub indexer_store: PgIndexerStore,
48 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
49 pub graphql_server_join_handle: JoinHandle<()>,
50 pub graphql_client: SimpleClient,
51 pub cancellation_token: CancellationToken,
52}
53
54pub async fn start_cluster(
56 graphql_connection_config: ConnectionConfig,
57 internal_data_source_rpc_port: Option<u16>,
58) -> Cluster {
59 let data_ingestion_path = tempfile::tempdir().unwrap().into_path();
60 let db_url = graphql_connection_config.db_url.clone();
61 let cancellation_token = CancellationToken::new();
62 let val_fn =
64 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65 .await;
66
67 let (pg_store, pg_handle) = start_test_indexer_impl(
69 db_url,
70 true,
72 None,
73 val_fn.rpc_url().to_string(),
74 IndexerTypeConfig::writer_mode(None, None),
75 Some(data_ingestion_path),
76 cancellation_token.clone(),
77 )
78 .await;
79
80 let fn_rpc_url = val_fn.rpc_url().to_string();
82 let graphql_server_handle = start_graphql_server_with_fn_rpc(
83 graphql_connection_config.clone(),
84 Some(fn_rpc_url),
85 Some(cancellation_token.clone()),
86 )
87 .await;
88
89 let server_url = format!(
90 "http://{}:{}/",
91 graphql_connection_config.host, graphql_connection_config.port
92 );
93
94 let client = SimpleClient::new(server_url);
96 wait_for_graphql_server(&client).await;
97
98 Cluster {
99 validator_fullnode_handle: val_fn,
100 indexer_store: pg_store,
101 indexer_join_handle: pg_handle,
102 graphql_server_join_handle: graphql_server_handle,
103 graphql_client: client,
104 cancellation_token,
105 }
106}
107
108pub async fn serve_executor(
112 graphql_connection_config: ConnectionConfig,
113 internal_data_source_rpc_port: u16,
114 executor: Arc<dyn RestStateReader + Send + Sync>,
115 snapshot_config: Option<SnapshotLagConfig>,
116 epochs_to_keep: Option<u64>,
117 data_ingestion_path: PathBuf,
118) -> ExecutorCluster {
119 let db_url = graphql_connection_config.db_url.clone();
120 let cancellation_token = CancellationToken::new();
123
124 let executor_server_url: SocketAddr = format!("127.0.0.1:{}", internal_data_source_rpc_port)
125 .parse()
126 .unwrap();
127
128 info!("Starting executor server on {}", executor_server_url);
129
130 let executor_server_handle = tokio::spawn(async move {
131 iota_rest_api::RestService::new_without_version(executor)
132 .start_service(executor_server_url)
133 .await;
134 });
135
136 info!("spawned executor server");
137
138 let (pg_store, pg_handle) = start_test_indexer_impl(
139 db_url,
140 true,
141 None,
142 format!("http://{}", executor_server_url),
143 IndexerTypeConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
144 Some(data_ingestion_path),
145 cancellation_token.clone(),
146 )
147 .await;
148
149 let graphql_server_handle = start_graphql_server(
151 graphql_connection_config.clone(),
152 cancellation_token.clone(),
153 )
154 .await;
155
156 let server_url = format!(
157 "http://{}:{}/",
158 graphql_connection_config.host, graphql_connection_config.port
159 );
160
161 let client = SimpleClient::new(server_url);
163 wait_for_graphql_server(&client).await;
164
165 ExecutorCluster {
166 executor_server_handle,
167 indexer_store: pg_store,
168 indexer_join_handle: pg_handle,
169 graphql_server_join_handle: graphql_server_handle,
170 graphql_client: client,
171 snapshot_config: snapshot_config.unwrap_or_default(),
172 graphql_connection_config,
173 cancellation_token,
174 }
175}
176
177pub async fn wait_for_graphql_checkpoint_pruned(
180 client: &SimpleClient,
181 checkpoint: u64,
182 base_timeout: Duration,
183) {
184 info!(
185 "Waiting for checkpoint to be pruned {}, base time out is {}",
186 checkpoint,
187 base_timeout.as_secs()
188 );
189 let query = format!(
190 r#"
191 {{
192 checkpoint(id: {{ sequenceNumber: {} }}) {{
193 sequenceNumber
194 }}
195 }}"#,
196 checkpoint
197 );
198
199 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
200
201 tokio::time::timeout(timeout, async {
202 loop {
203 let resp = client
204 .execute_to_graphql(query.to_string(), false, vec![], vec![])
205 .await
206 .unwrap()
207 .response_body_json();
208
209 let current_checkpoint = &resp["data"]["checkpoint"];
210 if current_checkpoint.is_null() {
211 break;
212 } else {
213 tokio::time::sleep(Duration::from_secs(1)).await;
214 }
215 }
216 })
217 .await
218 .expect("Timeout waiting for checkpoint to be pruned");
219}
220
221pub async fn start_graphql_server(
222 graphql_connection_config: ConnectionConfig,
223 cancellation_token: CancellationToken,
224) -> JoinHandle<()> {
225 start_graphql_server_with_fn_rpc(graphql_connection_config, None, Some(cancellation_token))
226 .await
227}
228
229pub async fn start_graphql_server_with_fn_rpc(
230 graphql_connection_config: ConnectionConfig,
231 fn_rpc_url: Option<String>,
232 cancellation_token: Option<CancellationToken>,
233) -> JoinHandle<()> {
234 let cancellation_token = cancellation_token.unwrap_or_default();
235 let mut server_config = ServerConfig {
236 connection: graphql_connection_config,
237 service: ServiceConfig::test_defaults(),
238 ..ServerConfig::default()
239 };
240 if let Some(fn_rpc_url) = fn_rpc_url {
241 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
242 };
243
244 tokio::spawn(async move {
246 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
247 .await
248 .unwrap();
249 })
250}
251
252async fn start_validator_with_fullnode(
253 internal_data_source_rpc_port: Option<u16>,
254 data_ingestion_dir: PathBuf,
255) -> TestCluster {
256 let mut test_cluster_builder = TestClusterBuilder::new()
257 .with_num_validators(VALIDATOR_COUNT)
258 .with_epoch_duration_ms(EPOCH_DURATION_MS)
259 .with_data_ingestion_dir(data_ingestion_dir)
260 .with_accounts(vec![
261 AccountConfig {
262 address: None,
263 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
264 };
265 ACCOUNT_NUM
266 ]);
267
268 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
269 test_cluster_builder =
270 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
271 };
272 test_cluster_builder.build().await
273}
274
275async fn wait_for_graphql_server(client: &SimpleClient) {
277 tokio::time::timeout(Duration::from_secs(10), async {
278 while client.ping().await.is_err() {
279 tokio::time::sleep(Duration::from_millis(500)).await;
280 }
281 })
282 .await
283 .expect("Timeout waiting for graphql server to start");
284}
285
286async fn wait_for_graphql_checkpoint_catchup(
289 client: &SimpleClient,
290 checkpoint: u64,
291 base_timeout: Duration,
292) {
293 info!(
294 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
295 checkpoint,
296 base_timeout.as_secs()
297 );
298 let query = r#"
299 {
300 availableRange {
301 last {
302 sequenceNumber
303 }
304 }
305 }"#;
306
307 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
308
309 tokio::time::timeout(timeout, async {
310 loop {
311 let resp = client
312 .execute_to_graphql(query.to_string(), false, vec![], vec![])
313 .await
314 .unwrap()
315 .response_body_json();
316
317 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
318 info!("Current checkpoint: {:?}", current_checkpoint);
319 let Some(current_checkpoint) = current_checkpoint else {
321 tokio::time::sleep(Duration::from_secs(1)).await;
322 continue;
323 };
324
325 let current_checkpoint = current_checkpoint.as_u64().unwrap();
327 if current_checkpoint < checkpoint {
328 tokio::time::sleep(Duration::from_secs(1)).await;
329 } else {
330 break;
331 }
332 }
333 })
334 .await
335 .expect("Timeout waiting for graphql to catchup to checkpoint");
336}
337
338impl Cluster {
339 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
343 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
344 }
345
346 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
348 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
349 }
350
351 pub async fn cleanup_resources(self) {
354 self.cancellation_token.cancel();
355 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
356 }
357}
358
359impl ExecutorCluster {
360 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
364 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
365 }
366
367 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
369 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
370 }
371
372 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
377 let mut latest_snapshot_cp = 0;
378
379 let latest_cp = self
380 .indexer_store
381 .get_latest_checkpoint_sequence_number()
382 .await
383 .unwrap()
384 .unwrap();
385
386 tokio::time::timeout(base_timeout, async {
387 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
388 tokio::time::sleep(Duration::from_secs(1)).await;
389 latest_snapshot_cp = self
390 .indexer_store
391 .get_latest_object_snapshot_checkpoint_sequence_number()
392 .await
393 .unwrap()
394 .unwrap_or_default();
395 }
396 })
397 .await
398 .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {}, latest_snapshot_cp: {}",
399 latest_cp, latest_snapshot_cp));
400 }
401
402 pub async fn cleanup_resources(self) {
406 self.cancellation_token.cancel();
407 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
408 let db_url = self.graphql_connection_config.db_url.clone();
409 force_delete_database(db_url).await;
410 }
411}