iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10 config::PruningOptions,
11 errors::IndexerError,
12 store::{PgIndexerStore, indexer_store::IndexerStore},
13 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_node_storage::GrpcStateReader;
16use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
17use iota_types::transaction::{Transaction, TransactionData};
18use test_cluster::{TestCluster, TestClusterBuilder};
19use tokio::{join, task::JoinHandle};
20use tokio_util::sync::CancellationToken;
21use tracing::info;
22
23use crate::{
24 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
25 server::graphiql_server::start_graphiql_server,
26};
27
28const VALIDATOR_COUNT: usize = 7;
29const EPOCH_DURATION_MS: u64 = 15000;
30
31const ACCOUNT_NUM: usize = 20;
32const GAS_OBJECT_COUNT: usize = 3;
33
34pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
35
36pub struct ExecutorCluster {
37 pub indexer_store: PgIndexerStore,
38 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
39 pub graphql_server_join_handle: JoinHandle<()>,
40 pub graphql_client: SimpleClient,
41 pub snapshot_config: SnapshotLagConfig,
42 pub graphql_connection_config: ConnectionConfig,
43 pub cancellation_token: CancellationToken,
44}
45
46pub struct Cluster {
47 pub validator_fullnode_handle: TestCluster,
48 pub indexer_store: PgIndexerStore,
49 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
50 pub graphql_server_join_handle: JoinHandle<()>,
51 pub graphql_client: SimpleClient,
52 pub cancellation_token: CancellationToken,
53}
54
55pub async fn start_cluster(
57 graphql_connection_config: ConnectionConfig,
58 internal_data_source_rpc_port: Option<u16>,
59 service_config: ServiceConfig,
60) -> Cluster {
61 let data_ingestion_path = iota_common::tempdir().keep();
62 let db_url = graphql_connection_config.db_url.clone();
63 let cancellation_token = CancellationToken::new();
64 let test_cluster =
66 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
67 .await;
68
69 let grpc_url = test_cluster.grpc_url();
70 let (pg_store, pg_handle) = start_test_indexer_impl(
72 db_url,
73 true,
75 None,
76 grpc_url.clone(),
77 IndexerTypeConfig::writer_mode(None, None),
78 Some(data_ingestion_path),
79 cancellation_token.clone(),
80 )
81 .await;
82
83 let graphql_server_handle = start_graphql_server_with_fn_rpc(
85 graphql_connection_config.clone(),
86 Some(grpc_url),
87 Some(cancellation_token.clone()),
88 Some(service_config),
89 )
90 .await;
91
92 let server_url = format!(
93 "http://{}:{}/",
94 graphql_connection_config.host, graphql_connection_config.port
95 );
96
97 let client = SimpleClient::new(server_url);
99 wait_for_graphql_server(&client).await;
100
101 Cluster {
102 validator_fullnode_handle: test_cluster,
103 indexer_store: pg_store,
104 indexer_join_handle: pg_handle,
105 graphql_server_join_handle: graphql_server_handle,
106 graphql_client: client,
107 cancellation_token,
108 }
109}
110
111pub async fn serve_executor(
118 graphql_connection_config: ConnectionConfig,
119 internal_data_source_rpc_port: u16,
120 _executor: Arc<dyn GrpcStateReader + Send + Sync>,
121 snapshot_config: Option<SnapshotLagConfig>,
122 epochs_to_keep: Option<u64>,
123 data_ingestion_path: PathBuf,
124) -> ExecutorCluster {
125 let db_url = graphql_connection_config.db_url.clone();
126 let cancellation_token = CancellationToken::new();
129
130 let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
133 .parse()
134 .unwrap();
135
136 let (pg_store, pg_handle) = start_test_indexer_impl(
139 db_url,
140 true,
141 None,
142 format!("http://{executor_server_url}"),
143 IndexerTypeConfig::writer_mode(
144 snapshot_config.clone(),
145 Some(PruningOptions {
146 epochs_to_keep,
147 ..Default::default()
148 }),
149 ),
150 Some(data_ingestion_path),
151 cancellation_token.clone(),
152 )
153 .await;
154
155 let graphql_server_handle = start_graphql_server_with_fn_rpc(
157 graphql_connection_config.clone(),
158 Some(format!("http://{executor_server_url}")),
160 Some(cancellation_token.clone()),
161 None,
162 )
163 .await;
164
165 let server_url = format!(
166 "http://{}:{}/",
167 graphql_connection_config.host, graphql_connection_config.port
168 );
169
170 let client = SimpleClient::new(server_url);
172 wait_for_graphql_server(&client).await;
173
174 ExecutorCluster {
175 indexer_store: pg_store,
176 indexer_join_handle: pg_handle,
177 graphql_server_join_handle: graphql_server_handle,
178 graphql_client: client,
179 snapshot_config: snapshot_config.unwrap_or_default(),
180 graphql_connection_config,
181 cancellation_token,
182 }
183}
184
185pub async fn wait_for_graphql_checkpoint_pruned(
188 client: &SimpleClient,
189 checkpoint: u64,
190 base_timeout: Duration,
191) {
192 info!(
193 "Waiting for checkpoint to be pruned {}, base time out is {}",
194 checkpoint,
195 base_timeout.as_secs()
196 );
197 let query = format!(
198 r#"
199 {{
200 checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
201 sequenceNumber
202 }}
203 }}"#
204 );
205
206 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
207
208 tokio::time::timeout(timeout, async {
209 loop {
210 let resp = client
211 .execute_to_graphql(query.to_string(), false, vec![], vec![])
212 .await
213 .unwrap()
214 .response_body_json();
215
216 let current_checkpoint = &resp["data"]["checkpoint"];
217 if current_checkpoint.is_null() {
218 break;
219 } else {
220 tokio::time::sleep(Duration::from_secs(1)).await;
221 }
222 }
223 })
224 .await
225 .expect("timeout waiting for checkpoint to be pruned");
226}
227
228pub async fn start_graphql_server_with_fn_rpc(
229 graphql_connection_config: ConnectionConfig,
230 fn_rpc_url: Option<String>,
231 cancellation_token: Option<CancellationToken>,
232 service_config: Option<ServiceConfig>,
233) -> JoinHandle<()> {
234 let cancellation_token = cancellation_token.unwrap_or_default();
235 let mut server_config = ServerConfig {
236 connection: graphql_connection_config,
237 service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
238 ..ServerConfig::default()
239 };
240 if let Some(fn_rpc_url) = fn_rpc_url {
241 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
242 };
243
244 tokio::spawn(async move {
246 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
247 .await
248 .unwrap();
249 })
250}
251
252async fn start_validator_with_fullnode(
253 internal_data_source_rpc_port: Option<u16>,
254 data_ingestion_dir: PathBuf,
255) -> TestCluster {
256 let mut test_cluster_builder = TestClusterBuilder::new()
257 .with_num_validators(VALIDATOR_COUNT)
258 .with_epoch_duration_ms(EPOCH_DURATION_MS)
259 .with_data_ingestion_dir(data_ingestion_dir)
260 .with_accounts(vec![
261 AccountConfig {
262 address: None,
263 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
264 };
265 ACCOUNT_NUM
266 ])
267 .with_fullnode_enable_grpc_api(true);
268
269 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
270 test_cluster_builder =
271 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
272 };
273 test_cluster_builder.build().await
274}
275
276async fn wait_for_graphql_server(client: &SimpleClient) {
278 tokio::time::timeout(Duration::from_secs(10), async {
279 while client.ping().await.is_err() {
280 tokio::time::sleep(Duration::from_millis(500)).await;
281 }
282 })
283 .await
284 .expect("timeout waiting for graphql server to start");
285}
286
287async fn wait_for_graphql_checkpoint_catchup(
290 client: &SimpleClient,
291 checkpoint: u64,
292 base_timeout: Duration,
293) {
294 info!(
295 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
296 checkpoint,
297 base_timeout.as_secs()
298 );
299 let query = r#"
300 {
301 availableRange {
302 last {
303 sequenceNumber
304 }
305 }
306 }"#;
307
308 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
309
310 tokio::time::timeout(timeout, async {
311 loop {
312 let resp = client
313 .execute_to_graphql(query.to_string(), false, vec![], vec![])
314 .await
315 .unwrap()
316 .response_body_json();
317
318 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
319 info!("Current checkpoint: {:?}", current_checkpoint);
320 let Some(current_checkpoint) = current_checkpoint else {
322 tokio::time::sleep(Duration::from_secs(1)).await;
323 continue;
324 };
325
326 let current_checkpoint = current_checkpoint.as_u64().unwrap();
328 if current_checkpoint < checkpoint {
329 tokio::time::sleep(Duration::from_secs(1)).await;
330 } else {
331 break;
332 }
333 }
334 })
335 .await
336 .expect("timeout waiting for graphql to catchup to checkpoint");
337}
338
339impl Cluster {
340 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
344 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
345 }
346
347 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
349 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
350 }
351
352 pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
354 let addresses = self.validator_fullnode_handle.wallet.get_addresses();
355
356 let recipient = addresses[1];
357 self.validator_fullnode_handle
358 .test_transaction_builder()
359 .await
360 .transfer_iota(Some(1_000), recipient)
361 .build()
362 }
363
364 pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
366 self.validator_fullnode_handle
367 .wallet
368 .sign_transaction(transaction)
369 }
370}
371
372impl ExecutorCluster {
373 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
377 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
378 }
379
380 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
382 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
383 }
384
385 pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
390 let mut latest_snapshot_cp = 0;
391
392 let latest_cp = self
393 .indexer_store
394 .get_latest_checkpoint_sequence_number()
395 .await
396 .unwrap()
397 .unwrap();
398
399 tokio::time::timeout(base_timeout, async {
400 while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
401 tokio::time::sleep(Duration::from_secs(1)).await;
402 latest_snapshot_cp = self
403 .indexer_store
404 .get_latest_object_snapshot_watermark()
405 .await
406 .unwrap()
407 .map(|watermark| watermark.max_committed_cp)
408 .unwrap_or_default();
409 }
410 })
411 .await
412 .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
413 }
414
415 pub async fn cleanup_resources(self) {
419 self.cancellation_token.cancel();
420 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
421 let db_url = self.graphql_connection_config.db_url.clone();
422 force_delete_database(db_url).await;
423 }
424}