iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10 errors::IndexerError,
11 store::{PgIndexerStore, indexer_store::IndexerStore},
12 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
15use iota_types::storage::RestStateReader;
16use test_cluster::{TestCluster, TestClusterBuilder};
17use tokio::{join, task::JoinHandle};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
23 server::graphiql_server::start_graphiql_server,
24};
25
26const VALIDATOR_COUNT: usize = 7;
27const EPOCH_DURATION_MS: u64 = 15000;
28
29const ACCOUNT_NUM: usize = 20;
30const GAS_OBJECT_COUNT: usize = 3;
31
32pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
33
34pub struct ExecutorCluster {
35 pub executor_server_handle: JoinHandle<()>,
36 pub indexer_store: PgIndexerStore,
37 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38 pub graphql_server_join_handle: JoinHandle<()>,
39 pub graphql_client: SimpleClient,
40 pub snapshot_config: SnapshotLagConfig,
41 pub graphql_connection_config: ConnectionConfig,
42 pub cancellation_token: CancellationToken,
43}
44
45pub struct Cluster {
46 pub validator_fullnode_handle: TestCluster,
47 pub indexer_store: PgIndexerStore,
48 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
49 pub graphql_server_join_handle: JoinHandle<()>,
50 pub graphql_client: SimpleClient,
51 pub cancellation_token: CancellationToken,
52}
53
54pub async fn start_cluster(
56 graphql_connection_config: ConnectionConfig,
57 internal_data_source_rpc_port: Option<u16>,
58) -> Cluster {
59 let data_ingestion_path = tempfile::tempdir().unwrap().into_path();
60 let db_url = graphql_connection_config.db_url.clone();
61 let cancellation_token = CancellationToken::new();
62 let val_fn =
64 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65 .await;
66
67 let (pg_store, pg_handle) = start_test_indexer_impl(
69 db_url,
70 true,
72 None,
73 val_fn.rpc_url().to_string(),
74 IndexerTypeConfig::writer_mode(None, None),
75 Some(data_ingestion_path),
76 cancellation_token.clone(),
77 )
78 .await;
79
80 let fn_rpc_url = val_fn.rpc_url().to_string();
82 let graphql_server_handle = start_graphql_server_with_fn_rpc(
83 graphql_connection_config.clone(),
84 Some(fn_rpc_url),
85 Some(cancellation_token.clone()),
86 )
87 .await;
88
89 let server_url = format!(
90 "http://{}:{}/",
91 graphql_connection_config.host, graphql_connection_config.port
92 );
93
94 let client = SimpleClient::new(server_url);
96 wait_for_graphql_server(&client).await;
97
98 Cluster {
99 validator_fullnode_handle: val_fn,
100 indexer_store: pg_store,
101 indexer_join_handle: pg_handle,
102 graphql_server_join_handle: graphql_server_handle,
103 graphql_client: client,
104 cancellation_token,
105 }
106}
107
108pub async fn serve_executor(
112 graphql_connection_config: ConnectionConfig,
113 internal_data_source_rpc_port: u16,
114 executor: Arc<dyn RestStateReader + Send + Sync>,
115 snapshot_config: Option<SnapshotLagConfig>,
116 epochs_to_keep: Option<u64>,
117 data_ingestion_path: PathBuf,
118) -> ExecutorCluster {
119 let db_url = graphql_connection_config.db_url.clone();
120 let cancellation_token = CancellationToken::new();
123
124 let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
125 .parse()
126 .unwrap();
127
128 info!("Starting executor server on {}", executor_server_url);
129
130 let executor_server_handle = tokio::spawn(async move {
131 iota_rest_api::RestService::new_without_version(executor)
132 .start_service(executor_server_url)
133 .await;
134 });
135
136 info!("spawned executor server");
137
138 let (pg_store, pg_handle) = start_test_indexer_impl(
139 db_url,
140 true,
141 None,
142 format!("http://{executor_server_url}"),
143 IndexerTypeConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
144 Some(data_ingestion_path),
145 cancellation_token.clone(),
146 )
147 .await;
148
149 let graphql_server_handle = start_graphql_server(
151 graphql_connection_config.clone(),
152 cancellation_token.clone(),
153 )
154 .await;
155
156 let server_url = format!(
157 "http://{}:{}/",
158 graphql_connection_config.host, graphql_connection_config.port
159 );
160
161 let client = SimpleClient::new(server_url);
163 wait_for_graphql_server(&client).await;
164
165 ExecutorCluster {
166 executor_server_handle,
167 indexer_store: pg_store,
168 indexer_join_handle: pg_handle,
169 graphql_server_join_handle: graphql_server_handle,
170 graphql_client: client,
171 snapshot_config: snapshot_config.unwrap_or_default(),
172 graphql_connection_config,
173 cancellation_token,
174 }
175}
176
177pub async fn wait_for_graphql_checkpoint_pruned(
180 client: &SimpleClient,
181 checkpoint: u64,
182 base_timeout: Duration,
183) {
184 info!(
185 "Waiting for checkpoint to be pruned {}, base time out is {}",
186 checkpoint,
187 base_timeout.as_secs()
188 );
189 let query = format!(
190 r#"
191 {{
192 checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
193 sequenceNumber
194 }}
195 }}"#
196 );
197
198 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
199
200 tokio::time::timeout(timeout, async {
201 loop {
202 let resp = client
203 .execute_to_graphql(query.to_string(), false, vec![], vec![])
204 .await
205 .unwrap()
206 .response_body_json();
207
208 let current_checkpoint = &resp["data"]["checkpoint"];
209 if current_checkpoint.is_null() {
210 break;
211 } else {
212 tokio::time::sleep(Duration::from_secs(1)).await;
213 }
214 }
215 })
216 .await
217 .expect("Timeout waiting for checkpoint to be pruned");
218}
219
220pub async fn start_graphql_server(
221 graphql_connection_config: ConnectionConfig,
222 cancellation_token: CancellationToken,
223) -> JoinHandle<()> {
224 start_graphql_server_with_fn_rpc(graphql_connection_config, None, Some(cancellation_token))
225 .await
226}
227
228pub async fn start_graphql_server_with_fn_rpc(
229 graphql_connection_config: ConnectionConfig,
230 fn_rpc_url: Option<String>,
231 cancellation_token: Option<CancellationToken>,
232) -> JoinHandle<()> {
233 let cancellation_token = cancellation_token.unwrap_or_default();
234 let mut server_config = ServerConfig {
235 connection: graphql_connection_config,
236 service: ServiceConfig::test_defaults(),
237 ..ServerConfig::default()
238 };
239 if let Some(fn_rpc_url) = fn_rpc_url {
240 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
241 };
242
243 tokio::spawn(async move {
245 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
246 .await
247 .unwrap();
248 })
249}
250
251async fn start_validator_with_fullnode(
252 internal_data_source_rpc_port: Option<u16>,
253 data_ingestion_dir: PathBuf,
254) -> TestCluster {
255 let mut test_cluster_builder = TestClusterBuilder::new()
256 .with_num_validators(VALIDATOR_COUNT)
257 .with_epoch_duration_ms(EPOCH_DURATION_MS)
258 .with_data_ingestion_dir(data_ingestion_dir)
259 .with_accounts(vec![
260 AccountConfig {
261 address: None,
262 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
263 };
264 ACCOUNT_NUM
265 ]);
266
267 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
268 test_cluster_builder =
269 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
270 };
271 test_cluster_builder.build().await
272}
273
274async fn wait_for_graphql_server(client: &SimpleClient) {
276 tokio::time::timeout(Duration::from_secs(10), async {
277 while client.ping().await.is_err() {
278 tokio::time::sleep(Duration::from_millis(500)).await;
279 }
280 })
281 .await
282 .expect("Timeout waiting for graphql server to start");
283}
284
285async fn wait_for_graphql_checkpoint_catchup(
288 client: &SimpleClient,
289 checkpoint: u64,
290 base_timeout: Duration,
291) {
292 info!(
293 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
294 checkpoint,
295 base_timeout.as_secs()
296 );
297 let query = r#"
298 {
299 availableRange {
300 last {
301 sequenceNumber
302 }
303 }
304 }"#;
305
306 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
307
308 tokio::time::timeout(timeout, async {
309 loop {
310 let resp = client
311 .execute_to_graphql(query.to_string(), false, vec![], vec![])
312 .await
313 .unwrap()
314 .response_body_json();
315
316 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
317 info!("Current checkpoint: {:?}", current_checkpoint);
318 let Some(current_checkpoint) = current_checkpoint else {
320 tokio::time::sleep(Duration::from_secs(1)).await;
321 continue;
322 };
323
324 let current_checkpoint = current_checkpoint.as_u64().unwrap();
326 if current_checkpoint < checkpoint {
327 tokio::time::sleep(Duration::from_secs(1)).await;
328 } else {
329 break;
330 }
331 }
332 })
333 .await
334 .expect("Timeout waiting for graphql to catchup to checkpoint");
335}
336
337impl Cluster {
338 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
342 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
343 }
344
345 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
347 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
348 }
349
350 pub async fn cleanup_resources(self) {
353 self.cancellation_token.cancel();
354 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
355 }
356}
357
358impl ExecutorCluster {
359 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
363 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
364 }
365
366 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
368 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
369 }
370
371 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
376 let mut latest_snapshot_cp = 0;
377
378 let latest_cp = self
379 .indexer_store
380 .get_latest_checkpoint_sequence_number()
381 .await
382 .unwrap()
383 .unwrap();
384
385 tokio::time::timeout(base_timeout, async {
386 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
387 tokio::time::sleep(Duration::from_secs(1)).await;
388 latest_snapshot_cp = self
389 .indexer_store
390 .get_latest_object_snapshot_checkpoint_sequence_number()
391 .await
392 .unwrap()
393 .unwrap_or_default();
394 }
395 })
396 .await
397 .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
398 }
399
400 pub async fn cleanup_resources(self) {
404 self.cancellation_token.cancel();
405 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
406 let db_url = self.graphql_connection_config.db_url.clone();
407 force_delete_database(db_url).await;
408 }
409}