iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10 config::PruningOptions,
11 errors::IndexerError,
12 store::{PgIndexerStore, indexer_store::IndexerStore},
13 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::{
17 storage::RestStateReader,
18 transaction::{Transaction, TransactionData},
19};
20use test_cluster::{TestCluster, TestClusterBuilder};
21use tokio::{join, task::JoinHandle};
22use tokio_util::sync::CancellationToken;
23use tracing::info;
24
25use crate::{
26 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
27 server::graphiql_server::start_graphiql_server,
28};
29
30const VALIDATOR_COUNT: usize = 7;
31const EPOCH_DURATION_MS: u64 = 15000;
32
33const ACCOUNT_NUM: usize = 20;
34const GAS_OBJECT_COUNT: usize = 3;
35
36pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
37
38pub struct ExecutorCluster {
39 pub executor_server_handle: JoinHandle<()>,
40 pub indexer_store: PgIndexerStore,
41 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
42 pub graphql_server_join_handle: JoinHandle<()>,
43 pub graphql_client: SimpleClient,
44 pub snapshot_config: SnapshotLagConfig,
45 pub graphql_connection_config: ConnectionConfig,
46 pub cancellation_token: CancellationToken,
47}
48
49pub struct Cluster {
50 pub validator_fullnode_handle: TestCluster,
51 pub indexer_store: PgIndexerStore,
52 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
53 pub graphql_server_join_handle: JoinHandle<()>,
54 pub graphql_client: SimpleClient,
55 pub cancellation_token: CancellationToken,
56}
57
58pub async fn start_cluster(
60 graphql_connection_config: ConnectionConfig,
61 internal_data_source_rpc_port: Option<u16>,
62) -> Cluster {
63 let data_ingestion_path = tempfile::tempdir().unwrap().keep();
64 let db_url = graphql_connection_config.db_url.clone();
65 let cancellation_token = CancellationToken::new();
66 let val_fn =
68 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
69 .await;
70
71 let (pg_store, pg_handle) = start_test_indexer_impl(
73 db_url,
74 true,
76 None,
77 val_fn.rpc_url().to_string(),
78 IndexerTypeConfig::writer_mode(None, None),
79 Some(data_ingestion_path),
80 cancellation_token.clone(),
81 )
82 .await;
83
84 let fn_rpc_url = val_fn.rpc_url().to_string();
86 let graphql_server_handle = start_graphql_server_with_fn_rpc(
87 graphql_connection_config.clone(),
88 Some(fn_rpc_url),
89 Some(cancellation_token.clone()),
90 )
91 .await;
92
93 let server_url = format!(
94 "http://{}:{}/",
95 graphql_connection_config.host, graphql_connection_config.port
96 );
97
98 let client = SimpleClient::new(server_url);
100 wait_for_graphql_server(&client).await;
101
102 Cluster {
103 validator_fullnode_handle: val_fn,
104 indexer_store: pg_store,
105 indexer_join_handle: pg_handle,
106 graphql_server_join_handle: graphql_server_handle,
107 graphql_client: client,
108 cancellation_token,
109 }
110}
111
112pub async fn serve_executor(
119 graphql_connection_config: ConnectionConfig,
120 internal_data_source_rpc_port: u16,
121 executor: Arc<dyn RestStateReader + Send + Sync>,
122 snapshot_config: Option<SnapshotLagConfig>,
123 epochs_to_keep: Option<u64>,
124 data_ingestion_path: PathBuf,
125) -> ExecutorCluster {
126 let db_url = graphql_connection_config.db_url.clone();
127 let cancellation_token = CancellationToken::new();
130
131 let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
132 .parse()
133 .unwrap();
134
135 info!("Starting executor server on {}", executor_server_url);
136
137 let executor_server_handle = tokio::spawn(async move {
138 iota_rest_api::RestService::new_without_version(executor)
139 .start_service(executor_server_url)
140 .await;
141 });
142
143 info!("spawned executor server");
144
145 let (pg_store, pg_handle) = start_test_indexer_impl(
146 db_url,
147 true,
148 None,
149 format!("http://{executor_server_url}"),
150 IndexerTypeConfig::writer_mode(
151 snapshot_config.clone(),
152 Some(PruningOptions {
153 epochs_to_keep,
154 ..Default::default()
155 }),
156 ),
157 Some(data_ingestion_path),
158 cancellation_token.clone(),
159 )
160 .await;
161
162 let graphql_server_handle = start_graphql_server_with_fn_rpc(
164 graphql_connection_config.clone(),
165 Some(format!("http://{executor_server_url}")),
167 Some(cancellation_token.clone()),
168 )
169 .await;
170
171 let server_url = format!(
172 "http://{}:{}/",
173 graphql_connection_config.host, graphql_connection_config.port
174 );
175
176 let client = SimpleClient::new(server_url);
178 wait_for_graphql_server(&client).await;
179
180 ExecutorCluster {
181 executor_server_handle,
182 indexer_store: pg_store,
183 indexer_join_handle: pg_handle,
184 graphql_server_join_handle: graphql_server_handle,
185 graphql_client: client,
186 snapshot_config: snapshot_config.unwrap_or_default(),
187 graphql_connection_config,
188 cancellation_token,
189 }
190}
191
192pub async fn wait_for_graphql_checkpoint_pruned(
195 client: &SimpleClient,
196 checkpoint: u64,
197 base_timeout: Duration,
198) {
199 info!(
200 "Waiting for checkpoint to be pruned {}, base time out is {}",
201 checkpoint,
202 base_timeout.as_secs()
203 );
204 let query = format!(
205 r#"
206 {{
207 checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
208 sequenceNumber
209 }}
210 }}"#
211 );
212
213 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
214
215 tokio::time::timeout(timeout, async {
216 loop {
217 let resp = client
218 .execute_to_graphql(query.to_string(), false, vec![], vec![])
219 .await
220 .unwrap()
221 .response_body_json();
222
223 let current_checkpoint = &resp["data"]["checkpoint"];
224 if current_checkpoint.is_null() {
225 break;
226 } else {
227 tokio::time::sleep(Duration::from_secs(1)).await;
228 }
229 }
230 })
231 .await
232 .expect("timeout waiting for checkpoint to be pruned");
233}
234
235pub async fn start_graphql_server_with_fn_rpc(
236 graphql_connection_config: ConnectionConfig,
237 fn_rpc_url: Option<String>,
238 cancellation_token: Option<CancellationToken>,
239) -> JoinHandle<()> {
240 let cancellation_token = cancellation_token.unwrap_or_default();
241 let mut server_config = ServerConfig {
242 connection: graphql_connection_config,
243 service: ServiceConfig::test_defaults(),
244 ..ServerConfig::default()
245 };
246 if let Some(fn_rpc_url) = fn_rpc_url {
247 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
248 };
249
250 tokio::spawn(async move {
252 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
253 .await
254 .unwrap();
255 })
256}
257
258async fn start_validator_with_fullnode(
259 internal_data_source_rpc_port: Option<u16>,
260 data_ingestion_dir: PathBuf,
261) -> TestCluster {
262 let mut test_cluster_builder = TestClusterBuilder::new()
263 .with_num_validators(VALIDATOR_COUNT)
264 .with_epoch_duration_ms(EPOCH_DURATION_MS)
265 .with_data_ingestion_dir(data_ingestion_dir)
266 .with_accounts(vec![
267 AccountConfig {
268 address: None,
269 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
270 };
271 ACCOUNT_NUM
272 ]);
273
274 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
275 test_cluster_builder =
276 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
277 };
278 test_cluster_builder.build().await
279}
280
281async fn wait_for_graphql_server(client: &SimpleClient) {
283 tokio::time::timeout(Duration::from_secs(10), async {
284 while client.ping().await.is_err() {
285 tokio::time::sleep(Duration::from_millis(500)).await;
286 }
287 })
288 .await
289 .expect("timeout waiting for graphql server to start");
290}
291
292async fn wait_for_graphql_checkpoint_catchup(
295 client: &SimpleClient,
296 checkpoint: u64,
297 base_timeout: Duration,
298) {
299 info!(
300 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
301 checkpoint,
302 base_timeout.as_secs()
303 );
304 let query = r#"
305 {
306 availableRange {
307 last {
308 sequenceNumber
309 }
310 }
311 }"#;
312
313 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
314
315 tokio::time::timeout(timeout, async {
316 loop {
317 let resp = client
318 .execute_to_graphql(query.to_string(), false, vec![], vec![])
319 .await
320 .unwrap()
321 .response_body_json();
322
323 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
324 info!("Current checkpoint: {:?}", current_checkpoint);
325 let Some(current_checkpoint) = current_checkpoint else {
327 tokio::time::sleep(Duration::from_secs(1)).await;
328 continue;
329 };
330
331 let current_checkpoint = current_checkpoint.as_u64().unwrap();
333 if current_checkpoint < checkpoint {
334 tokio::time::sleep(Duration::from_secs(1)).await;
335 } else {
336 break;
337 }
338 }
339 })
340 .await
341 .expect("timeout waiting for graphql to catchup to checkpoint");
342}
343
344impl Cluster {
345 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
349 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
350 }
351
352 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
354 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
355 }
356
357 pub async fn cleanup_resources(self) {
360 self.cancellation_token.cancel();
361 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
362 }
363
364 pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
366 let addresses = self.validator_fullnode_handle.wallet.get_addresses();
367
368 let recipient = addresses[1];
369 self.validator_fullnode_handle
370 .test_transaction_builder()
371 .await
372 .transfer_iota(Some(1_000), recipient)
373 .build()
374 }
375
376 pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
378 self.validator_fullnode_handle
379 .wallet
380 .sign_transaction(transaction)
381 }
382}
383
384impl ExecutorCluster {
385 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
389 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
390 }
391
392 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
394 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
395 }
396
397 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
402 let mut latest_snapshot_cp = 0;
403
404 let latest_cp = self
405 .indexer_store
406 .get_latest_checkpoint_sequence_number()
407 .await
408 .unwrap()
409 .unwrap();
410
411 tokio::time::timeout(base_timeout, async {
412 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
413 tokio::time::sleep(Duration::from_secs(1)).await;
414 latest_snapshot_cp = self
415 .indexer_store
416 .get_latest_object_snapshot_checkpoint_sequence_number()
417 .await
418 .unwrap()
419 .unwrap_or_default();
420 }
421 })
422 .await
423 .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
424 }
425
426 pub async fn cleanup_resources(self) {
430 self.cancellation_token.cancel();
431 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
432 let db_url = self.graphql_connection_config.db_url.clone();
433 force_delete_database(db_url).await;
434 }
435}