iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10    config::PruningOptions,
11    errors::IndexerError,
12    store::{PgIndexerStore, indexer_store::IndexerStore},
13    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_node_storage::GrpcStateReader;
16use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
17use iota_types::transaction::{Transaction, TransactionData};
18use test_cluster::{TestCluster, TestClusterBuilder};
19use tokio::{join, task::JoinHandle};
20use tokio_util::sync::CancellationToken;
21use tracing::info;
22
23use crate::{
24    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
25    server::graphiql_server::start_graphiql_server,
26};
27
28const VALIDATOR_COUNT: usize = 7;
29const EPOCH_DURATION_MS: u64 = 15000;
30
31const ACCOUNT_NUM: usize = 20;
32const GAS_OBJECT_COUNT: usize = 3;
33
34pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
35
36pub struct ExecutorCluster {
37    pub indexer_store: PgIndexerStore,
38    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
39    pub graphql_server_join_handle: JoinHandle<()>,
40    pub graphql_client: SimpleClient,
41    pub snapshot_config: SnapshotLagConfig,
42    pub graphql_connection_config: ConnectionConfig,
43    pub cancellation_token: CancellationToken,
44}
45
46pub struct Cluster {
47    pub validator_fullnode_handle: TestCluster,
48    pub indexer_store: PgIndexerStore,
49    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
50    pub graphql_server_join_handle: JoinHandle<()>,
51    pub graphql_client: SimpleClient,
52    pub cancellation_token: CancellationToken,
53}
54
55/// Starts a validator, fullnode, indexer, and graphql service for testing.
56pub async fn start_cluster(
57    graphql_connection_config: ConnectionConfig,
58    internal_data_source_rpc_port: Option<u16>,
59    service_config: ServiceConfig,
60) -> Cluster {
61    let data_ingestion_path = iota_common::tempdir().keep();
62    let db_url = graphql_connection_config.db_url.clone();
63    let cancellation_token = CancellationToken::new();
64    // Starts validator+fullnode
65    let test_cluster =
66        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
67            .await;
68
69    let grpc_url = test_cluster.grpc_url();
70    // Starts indexer
71    let (pg_store, pg_handle) = start_test_indexer_impl(
72        db_url,
73        // reset the existing db
74        true,
75        None,
76        grpc_url.clone(),
77        IndexerTypeConfig::writer_mode(None, None),
78        Some(data_ingestion_path),
79        cancellation_token.clone(),
80    )
81    .await;
82
83    // Starts graphql server
84    let graphql_server_handle = start_graphql_server_with_fn_rpc(
85        graphql_connection_config.clone(),
86        Some(grpc_url),
87        Some(cancellation_token.clone()),
88        Some(service_config),
89    )
90    .await;
91
92    let server_url = format!(
93        "http://{}:{}/",
94        graphql_connection_config.host, graphql_connection_config.port
95    );
96
97    // Starts graphql client
98    let client = SimpleClient::new(server_url);
99    wait_for_graphql_server(&client).await;
100
101    Cluster {
102        validator_fullnode_handle: test_cluster,
103        indexer_store: pg_store,
104        indexer_join_handle: pg_handle,
105        graphql_server_join_handle: graphql_server_handle,
106        graphql_client: client,
107        cancellation_token,
108    }
109}
110
111/// Takes in a simulated instantiation of an IOTA blockchain and builds a
112/// cluster around it.
113///
114/// This cluster is typically used in e2e tests to emulate
115/// and test behaviors. It should be noted however that queries
116/// that rely on the fullnode Write API are not supported yet.
117pub async fn serve_executor(
118    graphql_connection_config: ConnectionConfig,
119    internal_data_source_rpc_port: u16,
120    _executor: Arc<dyn GrpcStateReader + Send + Sync>,
121    snapshot_config: Option<SnapshotLagConfig>,
122    epochs_to_keep: Option<u64>,
123    data_ingestion_path: PathBuf,
124) -> ExecutorCluster {
125    let db_url = graphql_connection_config.db_url.clone();
126    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
127    // can send a cancellation token on cleanup
128    let cancellation_token = CancellationToken::new();
129
130    // a dummy address to satisfy the indexer and graphql, the latter needs the url
131    // for the Write API, if not provided the server will return an error.
132    let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
133        .parse()
134        .unwrap();
135
136    // in writer mode the indexer will read checkpoint data from the data ingestion
137    // path and ignore the rpc_url.
138    let (pg_store, pg_handle) = start_test_indexer_impl(
139        db_url,
140        true,
141        None,
142        format!("http://{executor_server_url}"),
143        IndexerTypeConfig::writer_mode(
144            snapshot_config.clone(),
145            Some(PruningOptions {
146                epochs_to_keep,
147                ..Default::default()
148            }),
149        ),
150        Some(data_ingestion_path),
151        cancellation_token.clone(),
152    )
153    .await;
154
155    // Starts graphql server
156    let graphql_server_handle = start_graphql_server_with_fn_rpc(
157        graphql_connection_config.clone(),
158        // this does not provide access to the node write api
159        Some(format!("http://{executor_server_url}")),
160        Some(cancellation_token.clone()),
161        None,
162    )
163    .await;
164
165    let server_url = format!(
166        "http://{}:{}/",
167        graphql_connection_config.host, graphql_connection_config.port
168    );
169
170    // Starts graphql client
171    let client = SimpleClient::new(server_url);
172    wait_for_graphql_server(&client).await;
173
174    ExecutorCluster {
175        indexer_store: pg_store,
176        indexer_join_handle: pg_handle,
177        graphql_server_join_handle: graphql_server_handle,
178        graphql_client: client,
179        snapshot_config: snapshot_config.unwrap_or_default(),
180        graphql_connection_config,
181        cancellation_token,
182    }
183}
184
185/// Ping the GraphQL server for a checkpoint until an empty response is
186/// returned, indicating that the checkpoint has been pruned.
187pub async fn wait_for_graphql_checkpoint_pruned(
188    client: &SimpleClient,
189    checkpoint: u64,
190    base_timeout: Duration,
191) {
192    info!(
193        "Waiting for checkpoint to be pruned {}, base time out is {}",
194        checkpoint,
195        base_timeout.as_secs()
196    );
197    let query = format!(
198        r#"
199        {{
200            checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
201                sequenceNumber
202            }}
203        }}"#
204    );
205
206    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
207
208    tokio::time::timeout(timeout, async {
209        loop {
210            let resp = client
211                .execute_to_graphql(query.to_string(), false, vec![], vec![])
212                .await
213                .unwrap()
214                .response_body_json();
215
216            let current_checkpoint = &resp["data"]["checkpoint"];
217            if current_checkpoint.is_null() {
218                break;
219            } else {
220                tokio::time::sleep(Duration::from_secs(1)).await;
221            }
222        }
223    })
224    .await
225    .expect("timeout waiting for checkpoint to be pruned");
226}
227
228pub async fn start_graphql_server_with_fn_rpc(
229    graphql_connection_config: ConnectionConfig,
230    fn_rpc_url: Option<String>,
231    cancellation_token: Option<CancellationToken>,
232    service_config: Option<ServiceConfig>,
233) -> JoinHandle<()> {
234    let cancellation_token = cancellation_token.unwrap_or_default();
235    let mut server_config = ServerConfig {
236        connection: graphql_connection_config,
237        service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
238        ..ServerConfig::default()
239    };
240    if let Some(fn_rpc_url) = fn_rpc_url {
241        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
242    };
243
244    // Starts graphql server
245    tokio::spawn(async move {
246        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
247            .await
248            .unwrap();
249    })
250}
251
252async fn start_validator_with_fullnode(
253    internal_data_source_rpc_port: Option<u16>,
254    data_ingestion_dir: PathBuf,
255) -> TestCluster {
256    let mut test_cluster_builder = TestClusterBuilder::new()
257        .with_num_validators(VALIDATOR_COUNT)
258        .with_epoch_duration_ms(EPOCH_DURATION_MS)
259        .with_data_ingestion_dir(data_ingestion_dir)
260        .with_accounts(vec![
261            AccountConfig {
262                address: None,
263                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
264            };
265            ACCOUNT_NUM
266        ])
267        .with_fullnode_enable_grpc_api(true);
268
269    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
270        test_cluster_builder =
271            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
272    };
273    test_cluster_builder.build().await
274}
275
276/// Repeatedly ping the GraphQL server for 10s, until it responds
277async fn wait_for_graphql_server(client: &SimpleClient) {
278    tokio::time::timeout(Duration::from_secs(10), async {
279        while client.ping().await.is_err() {
280            tokio::time::sleep(Duration::from_millis(500)).await;
281        }
282    })
283    .await
284    .expect("timeout waiting for graphql server to start");
285}
286
287/// Ping the GraphQL server until its background task has updated the checkpoint
288/// watermark to the desired checkpoint.
289async fn wait_for_graphql_checkpoint_catchup(
290    client: &SimpleClient,
291    checkpoint: u64,
292    base_timeout: Duration,
293) {
294    info!(
295        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
296        checkpoint,
297        base_timeout.as_secs()
298    );
299    let query = r#"
300    {
301        availableRange {
302            last {
303                sequenceNumber
304            }
305        }
306    }"#;
307
308    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
309
310    tokio::time::timeout(timeout, async {
311        loop {
312            let resp = client
313                .execute_to_graphql(query.to_string(), false, vec![], vec![])
314                .await
315                .unwrap()
316                .response_body_json();
317
318            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
319            info!("Current checkpoint: {:?}", current_checkpoint);
320            // Indexer has not picked up any checkpoints yet
321            let Some(current_checkpoint) = current_checkpoint else {
322                tokio::time::sleep(Duration::from_secs(1)).await;
323                continue;
324            };
325
326            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
327            let current_checkpoint = current_checkpoint.as_u64().unwrap();
328            if current_checkpoint < checkpoint {
329                tokio::time::sleep(Duration::from_secs(1)).await;
330            } else {
331                break;
332            }
333        }
334    })
335    .await
336    .expect("timeout waiting for graphql to catchup to checkpoint");
337}
338
339impl Cluster {
340    /// Waits for the indexer to index up to the given checkpoint, then waits
341    /// for the graphql service's background task to update the checkpoint
342    /// watermark to the given checkpoint.
343    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
344        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
345    }
346
347    /// Waits for the indexer to prune a given checkpoint.
348    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
349        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
350    }
351
352    /// Builds a transaction that transfers IOTA for testing.
353    pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
354        let addresses = self.validator_fullnode_handle.wallet.get_addresses();
355
356        let recipient = addresses[1];
357        self.validator_fullnode_handle
358            .test_transaction_builder()
359            .await
360            .transfer_iota(Some(1_000), recipient)
361            .build()
362    }
363
364    /// Signs a transaction.
365    pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
366        self.validator_fullnode_handle
367            .wallet
368            .sign_transaction(transaction)
369    }
370}
371
372impl ExecutorCluster {
373    /// Waits for the indexer to index up to the given checkpoint, then waits
374    /// for the graphql service's background task to update the checkpoint
375    /// watermark to the given checkpoint.
376    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
377        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
378    }
379
380    /// Waits for the indexer to prune a given checkpoint.
381    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
382        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
383    }
384
385    /// The ObjectsSnapshotProcessor is a long-running task that periodically
386    /// takes a snapshot of the objects table. This leads to flakiness in
387    /// tests, so we wait until the objects_snapshot has reached the
388    /// expected state.
389    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
390        let mut latest_snapshot_cp = 0;
391
392        let latest_cp = self
393            .indexer_store
394            .get_latest_checkpoint_sequence_number()
395            .await
396            .unwrap()
397            .unwrap();
398
399        tokio::time::timeout(base_timeout, async {
400            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
401                tokio::time::sleep(Duration::from_secs(1)).await;
402                latest_snapshot_cp = self
403                    .indexer_store
404                    .get_latest_object_snapshot_watermark()
405                    .await
406                    .unwrap()
407                    .map(|watermark| watermark.max_committed_cp)
408                    .unwrap_or_default();
409            }
410        })
411        .await
412        .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
413    }
414
415    /// Sends a cancellation signal to the graphql and indexer services, waits
416    /// for them to complete, and then deletes the database created for the
417    /// test.
418    pub async fn cleanup_resources(self) {
419        self.cancellation_token.cancel();
420        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
421        let db_url = self.graphql_connection_config.db_url.clone();
422        force_delete_database(db_url).await;
423    }
424}