Skip to main content

iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8use iota_indexer::{
9    config::PruningOptions,
10    errors::IndexerError,
11    store::PgIndexerStore,
12    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_node_storage::GrpcStateReader;
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::transaction::{Transaction, TransactionData};
17use test_cluster::{TestCluster, TestClusterBuilder};
18use tokio::{join, task::JoinHandle};
19use tokio_util::sync::CancellationToken;
20use tracing::info;
21
22use crate::{
23    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
24    server::graphiql_server::start_graphiql_server,
25};
26
27const VALIDATOR_COUNT: usize = 7;
28const EPOCH_DURATION_MS: u64 = 15000;
29
30const ACCOUNT_NUM: usize = 20;
31const GAS_OBJECT_COUNT: usize = 3;
32
33pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
34
35pub struct ExecutorCluster {
36    pub indexer_store: PgIndexerStore,
37    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38    pub graphql_server_join_handle: JoinHandle<()>,
39    pub graphql_client: SimpleClient,
40    pub graphql_connection_config: ConnectionConfig,
41    pub cancellation_token: CancellationToken,
42}
43
44pub struct Cluster {
45    pub validator_fullnode_handle: TestCluster,
46    pub indexer_store: PgIndexerStore,
47    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
48    pub graphql_server_join_handle: JoinHandle<()>,
49    pub graphql_client: SimpleClient,
50    pub cancellation_token: CancellationToken,
51}
52
53/// Starts a validator, fullnode, indexer, and graphql service for testing.
54pub async fn start_cluster(
55    graphql_connection_config: ConnectionConfig,
56    internal_data_source_rpc_port: Option<u16>,
57    service_config: ServiceConfig,
58) -> Cluster {
59    let data_ingestion_path = iota_common::tempdir().keep();
60    let db_url = graphql_connection_config.db_url.clone();
61    let cancellation_token = CancellationToken::new();
62    // Starts validator+fullnode
63    let test_cluster =
64        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65            .await;
66
67    let grpc_url = test_cluster.grpc_url();
68    // Starts indexer
69    let (pg_store, pg_handle) = start_test_indexer_impl(
70        db_url,
71        // reset the existing db
72        true,
73        None,
74        grpc_url.clone(),
75        IndexerTypeConfig::writer_mode(None),
76        Some(data_ingestion_path),
77        cancellation_token.clone(),
78    )
79    .await;
80
81    // Starts graphql server
82    let graphql_server_handle = start_graphql_server_with_fn_rpc(
83        graphql_connection_config.clone(),
84        Some(grpc_url),
85        Some(cancellation_token.clone()),
86        Some(service_config),
87    )
88    .await;
89
90    let server_url = format!(
91        "http://{}:{}/",
92        graphql_connection_config.host, graphql_connection_config.port
93    );
94
95    // Starts graphql client
96    let client = SimpleClient::new(server_url);
97    wait_for_graphql_server(&client).await;
98
99    Cluster {
100        validator_fullnode_handle: test_cluster,
101        indexer_store: pg_store,
102        indexer_join_handle: pg_handle,
103        graphql_server_join_handle: graphql_server_handle,
104        graphql_client: client,
105        cancellation_token,
106    }
107}
108
109/// Takes in a simulated instantiation of an IOTA blockchain and builds a
110/// cluster around it.
111///
112/// This cluster is typically used in e2e tests to emulate
113/// and test behaviors. It should be noted however that queries
114/// that rely on the fullnode Write API are not supported yet.
115pub async fn serve_executor(
116    graphql_connection_config: ConnectionConfig,
117    internal_data_source_rpc_port: u16,
118    _executor: Arc<dyn GrpcStateReader + Send + Sync>,
119    epochs_to_keep: Option<u64>,
120    data_ingestion_path: PathBuf,
121) -> ExecutorCluster {
122    let db_url = graphql_connection_config.db_url.clone();
123    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
124    // can send a cancellation token on cleanup
125    let cancellation_token = CancellationToken::new();
126
127    // a dummy address to satisfy the indexer and graphql, the latter needs the url
128    // for the Write API, if not provided the server will return an error.
129    let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
130        .parse()
131        .unwrap();
132
133    // in writer mode the indexer will read checkpoint data from the data ingestion
134    // path and ignore the rpc_url.
135    let (pg_store, pg_handle) = start_test_indexer_impl(
136        db_url,
137        true,
138        None,
139        format!("http://{executor_server_url}"),
140        IndexerTypeConfig::writer_mode(Some(PruningOptions {
141            epochs_to_keep,
142            ..Default::default()
143        })),
144        Some(data_ingestion_path),
145        cancellation_token.clone(),
146    )
147    .await;
148
149    // Starts graphql server
150    let graphql_server_handle = start_graphql_server_with_fn_rpc(
151        graphql_connection_config.clone(),
152        // this does not provide access to the node write api
153        Some(format!("http://{executor_server_url}")),
154        Some(cancellation_token.clone()),
155        None,
156    )
157    .await;
158
159    let server_url = format!(
160        "http://{}:{}/",
161        graphql_connection_config.host, graphql_connection_config.port
162    );
163
164    // Starts graphql client
165    let client = SimpleClient::new(server_url);
166    wait_for_graphql_server(&client).await;
167
168    ExecutorCluster {
169        indexer_store: pg_store,
170        indexer_join_handle: pg_handle,
171        graphql_server_join_handle: graphql_server_handle,
172        graphql_client: client,
173        graphql_connection_config,
174        cancellation_token,
175    }
176}
177
178/// Ping the GraphQL server for a checkpoint until an empty response is
179/// returned, indicating that the checkpoint has been pruned.
180pub async fn wait_for_graphql_checkpoint_pruned(
181    client: &SimpleClient,
182    checkpoint: u64,
183    base_timeout: Duration,
184) {
185    info!(
186        "Waiting for checkpoint to be pruned {}, base time out is {}",
187        checkpoint,
188        base_timeout.as_secs()
189    );
190    let query = format!(
191        r#"
192        {{
193            checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
194                sequenceNumber
195            }}
196        }}"#
197    );
198
199    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
200
201    tokio::time::timeout(timeout, async {
202        loop {
203            let resp = client
204                .execute_to_graphql(query.to_string(), false, vec![], vec![])
205                .await
206                .unwrap()
207                .response_body_json();
208
209            let current_checkpoint = &resp["data"]["checkpoint"];
210            if current_checkpoint.is_null() {
211                break;
212            } else {
213                tokio::time::sleep(Duration::from_secs(1)).await;
214            }
215        }
216    })
217    .await
218    .expect("timeout waiting for checkpoint to be pruned");
219}
220
221pub async fn start_graphql_server_with_fn_rpc(
222    graphql_connection_config: ConnectionConfig,
223    fn_rpc_url: Option<String>,
224    cancellation_token: Option<CancellationToken>,
225    service_config: Option<ServiceConfig>,
226) -> JoinHandle<()> {
227    let cancellation_token = cancellation_token.unwrap_or_default();
228    let mut server_config = ServerConfig {
229        connection: graphql_connection_config,
230        service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
231        ..ServerConfig::default()
232    };
233    if let Some(fn_rpc_url) = fn_rpc_url {
234        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
235    };
236
237    // Starts graphql server
238    tokio::spawn(async move {
239        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
240            .await
241            .unwrap();
242    })
243}
244
245async fn start_validator_with_fullnode(
246    internal_data_source_rpc_port: Option<u16>,
247    data_ingestion_dir: PathBuf,
248) -> TestCluster {
249    let mut test_cluster_builder = TestClusterBuilder::new()
250        .with_num_validators(VALIDATOR_COUNT)
251        .with_epoch_duration_ms(EPOCH_DURATION_MS)
252        .with_data_ingestion_dir(data_ingestion_dir)
253        .with_accounts(vec![
254            AccountConfig {
255                address: None,
256                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
257            };
258            ACCOUNT_NUM
259        ])
260        .with_fullnode_enable_grpc_api(true);
261
262    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
263        test_cluster_builder =
264            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
265    };
266    test_cluster_builder.build().await
267}
268
269/// Repeatedly ping the GraphQL server for 10s, until it responds
270async fn wait_for_graphql_server(client: &SimpleClient) {
271    tokio::time::timeout(Duration::from_secs(10), async {
272        while client.ping().await.is_err() {
273            tokio::time::sleep(Duration::from_millis(500)).await;
274        }
275    })
276    .await
277    .expect("timeout waiting for graphql server to start");
278}
279
280/// Ping the GraphQL server until its background task has updated the checkpoint
281/// watermark to the desired checkpoint.
282async fn wait_for_graphql_checkpoint_catchup(
283    client: &SimpleClient,
284    checkpoint: u64,
285    base_timeout: Duration,
286) {
287    info!(
288        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
289        checkpoint,
290        base_timeout.as_secs()
291    );
292    let query = r#"
293    {
294        availableRange {
295            last {
296                sequenceNumber
297            }
298        }
299    }"#;
300
301    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
302
303    tokio::time::timeout(timeout, async {
304        loop {
305            let resp = client
306                .execute_to_graphql(query.to_string(), false, vec![], vec![])
307                .await
308                .unwrap()
309                .response_body_json();
310
311            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
312            info!("Current checkpoint: {:?}", current_checkpoint);
313            // Indexer has not picked up any checkpoints yet
314            let Some(current_checkpoint) = current_checkpoint else {
315                tokio::time::sleep(Duration::from_secs(1)).await;
316                continue;
317            };
318
319            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
320            let current_checkpoint = current_checkpoint.as_u64().unwrap();
321            if current_checkpoint < checkpoint {
322                tokio::time::sleep(Duration::from_secs(1)).await;
323            } else {
324                break;
325            }
326        }
327    })
328    .await
329    .expect("timeout waiting for graphql to catchup to checkpoint");
330}
331
332impl Cluster {
333    /// Waits for the indexer to index up to the given checkpoint, then waits
334    /// for the graphql service's background task to update the checkpoint
335    /// watermark to the given checkpoint.
336    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
337        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
338    }
339
340    /// Waits for the indexer to prune a given checkpoint.
341    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
342        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
343    }
344
345    /// Builds a transaction that transfers IOTA for testing.
346    pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
347        let addresses = self.validator_fullnode_handle.wallet.get_addresses();
348
349        let recipient = addresses[1];
350        self.validator_fullnode_handle
351            .test_transaction_builder()
352            .await
353            .transfer_iota(Some(1_000), recipient)
354            .build()
355    }
356
357    /// Signs a transaction.
358    pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
359        self.validator_fullnode_handle
360            .wallet
361            .sign_transaction(transaction)
362    }
363}
364
365impl ExecutorCluster {
366    /// Waits for the indexer to index up to the given checkpoint, then waits
367    /// for the graphql service's background task to update the checkpoint
368    /// watermark to the given checkpoint.
369    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
370        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
371    }
372
373    /// Waits for the indexer to prune a given checkpoint.
374    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
375        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
376    }
377
378    /// Sends a cancellation signal to the graphql and indexer services, waits
379    /// for them to complete, and then deletes the database created for the
380    /// test.
381    pub async fn cleanup_resources(self) {
382        self.cancellation_token.cancel();
383        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
384        let db_url = self.graphql_connection_config.db_url.clone();
385        force_delete_database(db_url).await;
386    }
387}