iota_graphql_rpc/test_infra/
cluster.rs1use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8use iota_indexer::{
9 config::PruningOptions,
10 errors::IndexerError,
11 store::PgIndexerStore,
12 test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_node_storage::GrpcStateReader;
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::transaction::{Transaction, TransactionData};
17use test_cluster::{TestCluster, TestClusterBuilder};
18use tokio::{join, task::JoinHandle};
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use crate::{
23 config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
24 server::graphiql_server::start_graphiql_server,
25};
26
27const VALIDATOR_COUNT: usize = 7;
28const EPOCH_DURATION_MS: u64 = 15000;
29
30const ACCOUNT_NUM: usize = 20;
31const GAS_OBJECT_COUNT: usize = 3;
32
33pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
34
35pub struct ExecutorCluster {
36 pub indexer_store: PgIndexerStore,
37 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38 pub graphql_server_join_handle: JoinHandle<()>,
39 pub graphql_client: SimpleClient,
40 pub graphql_connection_config: ConnectionConfig,
41 pub cancellation_token: CancellationToken,
42}
43
44pub struct Cluster {
45 pub validator_fullnode_handle: TestCluster,
46 pub indexer_store: PgIndexerStore,
47 pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
48 pub graphql_server_join_handle: JoinHandle<()>,
49 pub graphql_client: SimpleClient,
50 pub cancellation_token: CancellationToken,
51}
52
53pub async fn start_cluster(
55 graphql_connection_config: ConnectionConfig,
56 internal_data_source_rpc_port: Option<u16>,
57 service_config: ServiceConfig,
58) -> Cluster {
59 let data_ingestion_path = iota_common::tempdir().keep();
60 let db_url = graphql_connection_config.db_url.clone();
61 let cancellation_token = CancellationToken::new();
62 let test_cluster =
64 start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65 .await;
66
67 let grpc_url = test_cluster.grpc_url();
68 let (pg_store, pg_handle) = start_test_indexer_impl(
70 db_url,
71 true,
73 None,
74 grpc_url.clone(),
75 IndexerTypeConfig::writer_mode(None),
76 Some(data_ingestion_path),
77 cancellation_token.clone(),
78 )
79 .await;
80
81 let graphql_server_handle = start_graphql_server_with_fn_rpc(
83 graphql_connection_config.clone(),
84 Some(grpc_url),
85 Some(cancellation_token.clone()),
86 Some(service_config),
87 )
88 .await;
89
90 let server_url = format!(
91 "http://{}:{}/",
92 graphql_connection_config.host, graphql_connection_config.port
93 );
94
95 let client = SimpleClient::new(server_url);
97 wait_for_graphql_server(&client).await;
98
99 Cluster {
100 validator_fullnode_handle: test_cluster,
101 indexer_store: pg_store,
102 indexer_join_handle: pg_handle,
103 graphql_server_join_handle: graphql_server_handle,
104 graphql_client: client,
105 cancellation_token,
106 }
107}
108
109pub async fn serve_executor(
116 graphql_connection_config: ConnectionConfig,
117 internal_data_source_rpc_port: u16,
118 _executor: Arc<dyn GrpcStateReader + Send + Sync>,
119 epochs_to_keep: Option<u64>,
120 data_ingestion_path: PathBuf,
121) -> ExecutorCluster {
122 let db_url = graphql_connection_config.db_url.clone();
123 let cancellation_token = CancellationToken::new();
126
127 let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
130 .parse()
131 .unwrap();
132
133 let (pg_store, pg_handle) = start_test_indexer_impl(
136 db_url,
137 true,
138 None,
139 format!("http://{executor_server_url}"),
140 IndexerTypeConfig::writer_mode(Some(PruningOptions {
141 epochs_to_keep,
142 ..Default::default()
143 })),
144 Some(data_ingestion_path),
145 cancellation_token.clone(),
146 )
147 .await;
148
149 let graphql_server_handle = start_graphql_server_with_fn_rpc(
151 graphql_connection_config.clone(),
152 Some(format!("http://{executor_server_url}")),
154 Some(cancellation_token.clone()),
155 None,
156 )
157 .await;
158
159 let server_url = format!(
160 "http://{}:{}/",
161 graphql_connection_config.host, graphql_connection_config.port
162 );
163
164 let client = SimpleClient::new(server_url);
166 wait_for_graphql_server(&client).await;
167
168 ExecutorCluster {
169 indexer_store: pg_store,
170 indexer_join_handle: pg_handle,
171 graphql_server_join_handle: graphql_server_handle,
172 graphql_client: client,
173 graphql_connection_config,
174 cancellation_token,
175 }
176}
177
178pub async fn wait_for_graphql_checkpoint_pruned(
181 client: &SimpleClient,
182 checkpoint: u64,
183 base_timeout: Duration,
184) {
185 info!(
186 "Waiting for checkpoint to be pruned {}, base time out is {}",
187 checkpoint,
188 base_timeout.as_secs()
189 );
190 let query = format!(
191 r#"
192 {{
193 checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
194 sequenceNumber
195 }}
196 }}"#
197 );
198
199 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
200
201 tokio::time::timeout(timeout, async {
202 loop {
203 let resp = client
204 .execute_to_graphql(query.to_string(), false, vec![], vec![])
205 .await
206 .unwrap()
207 .response_body_json();
208
209 let current_checkpoint = &resp["data"]["checkpoint"];
210 if current_checkpoint.is_null() {
211 break;
212 } else {
213 tokio::time::sleep(Duration::from_secs(1)).await;
214 }
215 }
216 })
217 .await
218 .expect("timeout waiting for checkpoint to be pruned");
219}
220
221pub async fn start_graphql_server_with_fn_rpc(
222 graphql_connection_config: ConnectionConfig,
223 fn_rpc_url: Option<String>,
224 cancellation_token: Option<CancellationToken>,
225 service_config: Option<ServiceConfig>,
226) -> JoinHandle<()> {
227 let cancellation_token = cancellation_token.unwrap_or_default();
228 let mut server_config = ServerConfig {
229 connection: graphql_connection_config,
230 service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
231 ..ServerConfig::default()
232 };
233 if let Some(fn_rpc_url) = fn_rpc_url {
234 server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
235 };
236
237 tokio::spawn(async move {
239 start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
240 .await
241 .unwrap();
242 })
243}
244
245async fn start_validator_with_fullnode(
246 internal_data_source_rpc_port: Option<u16>,
247 data_ingestion_dir: PathBuf,
248) -> TestCluster {
249 let mut test_cluster_builder = TestClusterBuilder::new()
250 .with_num_validators(VALIDATOR_COUNT)
251 .with_epoch_duration_ms(EPOCH_DURATION_MS)
252 .with_data_ingestion_dir(data_ingestion_dir)
253 .with_accounts(vec![
254 AccountConfig {
255 address: None,
256 gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
257 };
258 ACCOUNT_NUM
259 ])
260 .with_fullnode_enable_grpc_api(true);
261
262 if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
263 test_cluster_builder =
264 test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
265 };
266 test_cluster_builder.build().await
267}
268
269async fn wait_for_graphql_server(client: &SimpleClient) {
271 tokio::time::timeout(Duration::from_secs(10), async {
272 while client.ping().await.is_err() {
273 tokio::time::sleep(Duration::from_millis(500)).await;
274 }
275 })
276 .await
277 .expect("timeout waiting for graphql server to start");
278}
279
280async fn wait_for_graphql_checkpoint_catchup(
283 client: &SimpleClient,
284 checkpoint: u64,
285 base_timeout: Duration,
286) {
287 info!(
288 "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
289 checkpoint,
290 base_timeout.as_secs()
291 );
292 let query = r#"
293 {
294 availableRange {
295 last {
296 sequenceNumber
297 }
298 }
299 }"#;
300
301 let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
302
303 tokio::time::timeout(timeout, async {
304 loop {
305 let resp = client
306 .execute_to_graphql(query.to_string(), false, vec![], vec![])
307 .await
308 .unwrap()
309 .response_body_json();
310
311 let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
312 info!("Current checkpoint: {:?}", current_checkpoint);
313 let Some(current_checkpoint) = current_checkpoint else {
315 tokio::time::sleep(Duration::from_secs(1)).await;
316 continue;
317 };
318
319 let current_checkpoint = current_checkpoint.as_u64().unwrap();
321 if current_checkpoint < checkpoint {
322 tokio::time::sleep(Duration::from_secs(1)).await;
323 } else {
324 break;
325 }
326 }
327 })
328 .await
329 .expect("timeout waiting for graphql to catchup to checkpoint");
330}
331
332impl Cluster {
333 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
337 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
338 }
339
340 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
342 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
343 }
344
345 pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
347 let addresses = self.validator_fullnode_handle.wallet.get_addresses();
348
349 let recipient = addresses[1];
350 self.validator_fullnode_handle
351 .test_transaction_builder()
352 .await
353 .transfer_iota(Some(1_000), recipient)
354 .build()
355 }
356
357 pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
359 self.validator_fullnode_handle
360 .wallet
361 .sign_transaction(transaction)
362 }
363}
364
365impl ExecutorCluster {
366 pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
370 wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
371 }
372
373 pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
375 wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
376 }
377
378 pub async fn cleanup_resources(self) {
382 self.cancellation_token.cancel();
383 let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
384 let db_url = self.graphql_connection_config.db_url.clone();
385 force_delete_database(db_url).await;
386 }
387}