iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10 config::PruningOptions,
11 errors::IndexerError,
12 store::{PgIndexerStore, indexer_store::IndexerStore},
13 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::{
17 storage::RestStateReader,
18 transaction::{Transaction, TransactionData},
19};
20use test_cluster::{TestCluster, TestClusterBuilder};
21use tokio::{join, task::JoinHandle};
22use tokio_util::sync::CancellationToken;
23use tracing::info;
24
25use crate::{
26 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
27 server::graphiql_server::start_graphiql_server,
28};
29
30const VALIDATOR_COUNT: usize = 7;
31const EPOCH_DURATION_MS: u64 = 15000;
32
33const ACCOUNT_NUM: usize = 20;
34const GAS_OBJECT_COUNT: usize = 3;
35
36pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
37
38pub struct ExecutorCluster {
39 pub executor_server_handle: JoinHandle<()>,
40 pub indexer_store: PgIndexerStore,
41 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
42 pub graphql_server_join_handle: JoinHandle<()>,
43 pub graphql_client: SimpleClient,
44 pub snapshot_config: SnapshotLagConfig,
45 pub graphql_connection_config: ConnectionConfig,
46 pub cancellation_token: CancellationToken,
47}
48
49pub struct Cluster {
50 pub validator_fullnode_handle: TestCluster,
51 pub indexer_store: PgIndexerStore,
52 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
53 pub graphql_server_join_handle: JoinHandle<()>,
54 pub graphql_client: SimpleClient,
55 pub cancellation_token: CancellationToken,
56}
57
58pub async fn start_cluster(
60 graphql_connection_config: ConnectionConfig,
61 internal_data_source_rpc_port: Option<u16>,
62 service_config: ServiceConfig,
63) -> Cluster {
64 let data_ingestion_path = tempfile::tempdir().unwrap().keep();
65 let db_url = graphql_connection_config.db_url.clone();
66 let cancellation_token = CancellationToken::new();
67 let val_fn =
69 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
70 .await;
71
72 let (pg_store, pg_handle) = start_test_indexer_impl(
74 db_url,
75 true,
77 None,
78 val_fn.rpc_url().to_string(),
79 IndexerTypeConfig::writer_mode(None, None),
80 Some(data_ingestion_path),
81 cancellation_token.clone(),
82 )
83 .await;
84
85 let fn_rpc_url = val_fn.rpc_url().to_string();
87 let graphql_server_handle = start_graphql_server_with_fn_rpc(
88 graphql_connection_config.clone(),
89 Some(fn_rpc_url),
90 Some(cancellation_token.clone()),
91 Some(service_config),
92 )
93 .await;
94
95 let server_url = format!(
96 "http://{}:{}/",
97 graphql_connection_config.host, graphql_connection_config.port
98 );
99
100 let client = SimpleClient::new(server_url);
102 wait_for_graphql_server(&client).await;
103
104 Cluster {
105 validator_fullnode_handle: val_fn,
106 indexer_store: pg_store,
107 indexer_join_handle: pg_handle,
108 graphql_server_join_handle: graphql_server_handle,
109 graphql_client: client,
110 cancellation_token,
111 }
112}
113
114pub async fn serve_executor(
121 graphql_connection_config: ConnectionConfig,
122 internal_data_source_rpc_port: u16,
123 executor: Arc<dyn RestStateReader + Send + Sync>,
124 snapshot_config: Option<SnapshotLagConfig>,
125 epochs_to_keep: Option<u64>,
126 data_ingestion_path: PathBuf,
127) -> ExecutorCluster {
128 let db_url = graphql_connection_config.db_url.clone();
129 let cancellation_token = CancellationToken::new();
132
133 let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
134 .parse()
135 .unwrap();
136
137 info!("Starting executor server on {}", executor_server_url);
138
139 let executor_server_handle = tokio::spawn(async move {
140 iota_rest_api::RestService::new_without_version(executor)
141 .start_service(executor_server_url)
142 .await;
143 });
144
145 info!("spawned executor server");
146
147 let (pg_store, pg_handle) = start_test_indexer_impl(
148 db_url,
149 true,
150 None,
151 format!("http://{executor_server_url}"),
152 IndexerTypeConfig::writer_mode(
153 snapshot_config.clone(),
154 Some(PruningOptions {
155 epochs_to_keep,
156 ..Default::default()
157 }),
158 ),
159 Some(data_ingestion_path),
160 cancellation_token.clone(),
161 )
162 .await;
163
164 let graphql_server_handle = start_graphql_server_with_fn_rpc(
166 graphql_connection_config.clone(),
167 Some(format!("http://{executor_server_url}")),
169 Some(cancellation_token.clone()),
170 None,
171 )
172 .await;
173
174 let server_url = format!(
175 "http://{}:{}/",
176 graphql_connection_config.host, graphql_connection_config.port
177 );
178
179 let client = SimpleClient::new(server_url);
181 wait_for_graphql_server(&client).await;
182
183 ExecutorCluster {
184 executor_server_handle,
185 indexer_store: pg_store,
186 indexer_join_handle: pg_handle,
187 graphql_server_join_handle: graphql_server_handle,
188 graphql_client: client,
189 snapshot_config: snapshot_config.unwrap_or_default(),
190 graphql_connection_config,
191 cancellation_token,
192 }
193}
194
195pub async fn wait_for_graphql_checkpoint_pruned(
198 client: &SimpleClient,
199 checkpoint: u64,
200 base_timeout: Duration,
201) {
202 info!(
203 "Waiting for checkpoint to be pruned {}, base time out is {}",
204 checkpoint,
205 base_timeout.as_secs()
206 );
207 let query = format!(
208 r#"
209 {{
210 checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
211 sequenceNumber
212 }}
213 }}"#
214 );
215
216 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
217
218 tokio::time::timeout(timeout, async {
219 loop {
220 let resp = client
221 .execute_to_graphql(query.to_string(), false, vec![], vec![])
222 .await
223 .unwrap()
224 .response_body_json();
225
226 let current_checkpoint = &resp["data"]["checkpoint"];
227 if current_checkpoint.is_null() {
228 break;
229 } else {
230 tokio::time::sleep(Duration::from_secs(1)).await;
231 }
232 }
233 })
234 .await
235 .expect("timeout waiting for checkpoint to be pruned");
236}
237
238pub async fn start_graphql_server_with_fn_rpc(
239 graphql_connection_config: ConnectionConfig,
240 fn_rpc_url: Option<String>,
241 cancellation_token: Option<CancellationToken>,
242 service_config: Option<ServiceConfig>,
243) -> JoinHandle<()> {
244 let cancellation_token = cancellation_token.unwrap_or_default();
245 let mut server_config = ServerConfig {
246 connection: graphql_connection_config,
247 service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
248 ..ServerConfig::default()
249 };
250 if let Some(fn_rpc_url) = fn_rpc_url {
251 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
252 };
253
254 tokio::spawn(async move {
256 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
257 .await
258 .unwrap();
259 })
260}
261
262async fn start_validator_with_fullnode(
263 internal_data_source_rpc_port: Option<u16>,
264 data_ingestion_dir: PathBuf,
265) -> TestCluster {
266 let mut test_cluster_builder = TestClusterBuilder::new()
267 .with_num_validators(VALIDATOR_COUNT)
268 .with_epoch_duration_ms(EPOCH_DURATION_MS)
269 .with_data_ingestion_dir(data_ingestion_dir)
270 .with_accounts(vec![
271 AccountConfig {
272 address: None,
273 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
274 };
275 ACCOUNT_NUM
276 ]);
277
278 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
279 test_cluster_builder =
280 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
281 };
282 test_cluster_builder.build().await
283}
284
285async fn wait_for_graphql_server(client: &SimpleClient) {
287 tokio::time::timeout(Duration::from_secs(10), async {
288 while client.ping().await.is_err() {
289 tokio::time::sleep(Duration::from_millis(500)).await;
290 }
291 })
292 .await
293 .expect("timeout waiting for graphql server to start");
294}
295
296async fn wait_for_graphql_checkpoint_catchup(
299 client: &SimpleClient,
300 checkpoint: u64,
301 base_timeout: Duration,
302) {
303 info!(
304 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
305 checkpoint,
306 base_timeout.as_secs()
307 );
308 let query = r#"
309 {
310 availableRange {
311 last {
312 sequenceNumber
313 }
314 }
315 }"#;
316
317 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
318
319 tokio::time::timeout(timeout, async {
320 loop {
321 let resp = client
322 .execute_to_graphql(query.to_string(), false, vec![], vec![])
323 .await
324 .unwrap()
325 .response_body_json();
326
327 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
328 info!("Current checkpoint: {:?}", current_checkpoint);
329 let Some(current_checkpoint) = current_checkpoint else {
331 tokio::time::sleep(Duration::from_secs(1)).await;
332 continue;
333 };
334
335 let current_checkpoint = current_checkpoint.as_u64().unwrap();
337 if current_checkpoint < checkpoint {
338 tokio::time::sleep(Duration::from_secs(1)).await;
339 } else {
340 break;
341 }
342 }
343 })
344 .await
345 .expect("timeout waiting for graphql to catchup to checkpoint");
346}
347
348impl Cluster {
349 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
353 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
354 }
355
356 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
358 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
359 }
360
361 pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
363 let addresses = self.validator_fullnode_handle.wallet.get_addresses();
364
365 let recipient = addresses[1];
366 self.validator_fullnode_handle
367 .test_transaction_builder()
368 .await
369 .transfer_iota(Some(1_000), recipient)
370 .build()
371 }
372
373 pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
375 self.validator_fullnode_handle
376 .wallet
377 .sign_transaction(transaction)
378 }
379}
380
381impl ExecutorCluster {
382 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
386 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
387 }
388
389 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
391 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
392 }
393
394 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
399 let mut latest_snapshot_cp = 0;
400
401 let latest_cp = self
402 .indexer_store
403 .get_latest_checkpoint_sequence_number()
404 .await
405 .unwrap()
406 .unwrap();
407
408 tokio::time::timeout(base_timeout, async {
409 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
410 tokio::time::sleep(Duration::from_secs(1)).await;
411 latest_snapshot_cp = self
412 .indexer_store
413 .get_latest_object_snapshot_watermark()
414 .await
415 .unwrap()
416 .map(|watermark| watermark.checkpoint_hi_inclusive)
417 .unwrap_or_default();
418 }
419 })
420 .await
421 .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
422 }
423
424 pub async fn cleanup_resources(self) {
428 self.cancellation_token.cancel();
429 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
430 let db_url = self.graphql_connection_config.db_url.clone();
431 force_delete_database(db_url).await;
432 }
433}