iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10    config::PruningOptions,
11    errors::IndexerError,
12    store::{PgIndexerStore, indexer_store::IndexerStore},
13    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::{
17    storage::RestStateReader,
18    transaction::{Transaction, TransactionData},
19};
20use test_cluster::{TestCluster, TestClusterBuilder};
21use tokio::{join, task::JoinHandle};
22use tokio_util::sync::CancellationToken;
23use tracing::info;
24
25use crate::{
26    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
27    server::graphiql_server::start_graphiql_server,
28};
29
30const VALIDATOR_COUNT: usize = 7;
31const EPOCH_DURATION_MS: u64 = 15000;
32
33const ACCOUNT_NUM: usize = 20;
34const GAS_OBJECT_COUNT: usize = 3;
35
36pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
37
38pub struct ExecutorCluster {
39    pub executor_server_handle: JoinHandle<()>,
40    pub indexer_store: PgIndexerStore,
41    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
42    pub graphql_server_join_handle: JoinHandle<()>,
43    pub graphql_client: SimpleClient,
44    pub snapshot_config: SnapshotLagConfig,
45    pub graphql_connection_config: ConnectionConfig,
46    pub cancellation_token: CancellationToken,
47}
48
49pub struct Cluster {
50    pub validator_fullnode_handle: TestCluster,
51    pub indexer_store: PgIndexerStore,
52    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
53    pub graphql_server_join_handle: JoinHandle<()>,
54    pub graphql_client: SimpleClient,
55    pub cancellation_token: CancellationToken,
56}
57
58/// Starts a validator, fullnode, indexer, and graphql service for testing.
59pub async fn start_cluster(
60    graphql_connection_config: ConnectionConfig,
61    internal_data_source_rpc_port: Option<u16>,
62) -> Cluster {
63    let data_ingestion_path = tempfile::tempdir().unwrap().keep();
64    let db_url = graphql_connection_config.db_url.clone();
65    let cancellation_token = CancellationToken::new();
66    // Starts validator+fullnode
67    let val_fn =
68        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
69            .await;
70
71    // Starts indexer
72    let (pg_store, pg_handle) = start_test_indexer_impl(
73        db_url,
74        // reset the existing db
75        true,
76        None,
77        val_fn.rpc_url().to_string(),
78        IndexerTypeConfig::writer_mode(None, None),
79        Some(data_ingestion_path),
80        cancellation_token.clone(),
81    )
82    .await;
83
84    // Starts graphql server
85    let fn_rpc_url = val_fn.rpc_url().to_string();
86    let graphql_server_handle = start_graphql_server_with_fn_rpc(
87        graphql_connection_config.clone(),
88        Some(fn_rpc_url),
89        Some(cancellation_token.clone()),
90    )
91    .await;
92
93    let server_url = format!(
94        "http://{}:{}/",
95        graphql_connection_config.host, graphql_connection_config.port
96    );
97
98    // Starts graphql client
99    let client = SimpleClient::new(server_url);
100    wait_for_graphql_server(&client).await;
101
102    Cluster {
103        validator_fullnode_handle: val_fn,
104        indexer_store: pg_store,
105        indexer_join_handle: pg_handle,
106        graphql_server_join_handle: graphql_server_handle,
107        graphql_client: client,
108        cancellation_token,
109    }
110}
111
112/// Takes in a simulated instantiation of an IOTA blockchain and builds a
113/// cluster around it.
114///
115/// This cluster is typically used in e2e tests to emulate
116/// and test behaviors. It should be noted however that queries
117/// that rely on the fullnode Write API are not supported yet.
118pub async fn serve_executor(
119    graphql_connection_config: ConnectionConfig,
120    internal_data_source_rpc_port: u16,
121    executor: Arc<dyn RestStateReader + Send + Sync>,
122    snapshot_config: Option<SnapshotLagConfig>,
123    epochs_to_keep: Option<u64>,
124    data_ingestion_path: PathBuf,
125) -> ExecutorCluster {
126    let db_url = graphql_connection_config.db_url.clone();
127    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
128    // can send a cancellation token on cleanup
129    let cancellation_token = CancellationToken::new();
130
131    let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
132        .parse()
133        .unwrap();
134
135    info!("Starting executor server on {}", executor_server_url);
136
137    let executor_server_handle = tokio::spawn(async move {
138        iota_rest_api::RestService::new_without_version(executor)
139            .start_service(executor_server_url)
140            .await;
141    });
142
143    info!("spawned executor server");
144
145    let (pg_store, pg_handle) = start_test_indexer_impl(
146        db_url,
147        true,
148        None,
149        format!("http://{executor_server_url}"),
150        IndexerTypeConfig::writer_mode(
151            snapshot_config.clone(),
152            Some(PruningOptions {
153                epochs_to_keep,
154                ..Default::default()
155            }),
156        ),
157        Some(data_ingestion_path),
158        cancellation_token.clone(),
159    )
160    .await;
161
162    // Starts graphql server
163    let graphql_server_handle = start_graphql_server_with_fn_rpc(
164        graphql_connection_config.clone(),
165        // this does not provide access to the node write api
166        Some(format!("http://{executor_server_url}")),
167        Some(cancellation_token.clone()),
168    )
169    .await;
170
171    let server_url = format!(
172        "http://{}:{}/",
173        graphql_connection_config.host, graphql_connection_config.port
174    );
175
176    // Starts graphql client
177    let client = SimpleClient::new(server_url);
178    wait_for_graphql_server(&client).await;
179
180    ExecutorCluster {
181        executor_server_handle,
182        indexer_store: pg_store,
183        indexer_join_handle: pg_handle,
184        graphql_server_join_handle: graphql_server_handle,
185        graphql_client: client,
186        snapshot_config: snapshot_config.unwrap_or_default(),
187        graphql_connection_config,
188        cancellation_token,
189    }
190}
191
192/// Ping the GraphQL server for a checkpoint until an empty response is
193/// returned, indicating that the checkpoint has been pruned.
194pub async fn wait_for_graphql_checkpoint_pruned(
195    client: &SimpleClient,
196    checkpoint: u64,
197    base_timeout: Duration,
198) {
199    info!(
200        "Waiting for checkpoint to be pruned {}, base time out is {}",
201        checkpoint,
202        base_timeout.as_secs()
203    );
204    let query = format!(
205        r#"
206        {{
207            checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
208                sequenceNumber
209            }}
210        }}"#
211    );
212
213    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
214
215    tokio::time::timeout(timeout, async {
216        loop {
217            let resp = client
218                .execute_to_graphql(query.to_string(), false, vec![], vec![])
219                .await
220                .unwrap()
221                .response_body_json();
222
223            let current_checkpoint = &resp["data"]["checkpoint"];
224            if current_checkpoint.is_null() {
225                break;
226            } else {
227                tokio::time::sleep(Duration::from_secs(1)).await;
228            }
229        }
230    })
231    .await
232    .expect("timeout waiting for checkpoint to be pruned");
233}
234
235pub async fn start_graphql_server_with_fn_rpc(
236    graphql_connection_config: ConnectionConfig,
237    fn_rpc_url: Option<String>,
238    cancellation_token: Option<CancellationToken>,
239) -> JoinHandle<()> {
240    let cancellation_token = cancellation_token.unwrap_or_default();
241    let mut server_config = ServerConfig {
242        connection: graphql_connection_config,
243        service: ServiceConfig::test_defaults(),
244        ..ServerConfig::default()
245    };
246    if let Some(fn_rpc_url) = fn_rpc_url {
247        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
248    };
249
250    // Starts graphql server
251    tokio::spawn(async move {
252        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
253            .await
254            .unwrap();
255    })
256}
257
258async fn start_validator_with_fullnode(
259    internal_data_source_rpc_port: Option<u16>,
260    data_ingestion_dir: PathBuf,
261) -> TestCluster {
262    let mut test_cluster_builder = TestClusterBuilder::new()
263        .with_num_validators(VALIDATOR_COUNT)
264        .with_epoch_duration_ms(EPOCH_DURATION_MS)
265        .with_data_ingestion_dir(data_ingestion_dir)
266        .with_accounts(vec![
267            AccountConfig {
268                address: None,
269                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
270            };
271            ACCOUNT_NUM
272        ]);
273
274    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
275        test_cluster_builder =
276            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
277    };
278    test_cluster_builder.build().await
279}
280
281/// Repeatedly ping the GraphQL server for 10s, until it responds
282async fn wait_for_graphql_server(client: &SimpleClient) {
283    tokio::time::timeout(Duration::from_secs(10), async {
284        while client.ping().await.is_err() {
285            tokio::time::sleep(Duration::from_millis(500)).await;
286        }
287    })
288    .await
289    .expect("timeout waiting for graphql server to start");
290}
291
292/// Ping the GraphQL server until its background task has updated the checkpoint
293/// watermark to the desired checkpoint.
294async fn wait_for_graphql_checkpoint_catchup(
295    client: &SimpleClient,
296    checkpoint: u64,
297    base_timeout: Duration,
298) {
299    info!(
300        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
301        checkpoint,
302        base_timeout.as_secs()
303    );
304    let query = r#"
305    {
306        availableRange {
307            last {
308                sequenceNumber
309            }
310        }
311    }"#;
312
313    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
314
315    tokio::time::timeout(timeout, async {
316        loop {
317            let resp = client
318                .execute_to_graphql(query.to_string(), false, vec![], vec![])
319                .await
320                .unwrap()
321                .response_body_json();
322
323            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
324            info!("Current checkpoint: {:?}", current_checkpoint);
325            // Indexer has not picked up any checkpoints yet
326            let Some(current_checkpoint) = current_checkpoint else {
327                tokio::time::sleep(Duration::from_secs(1)).await;
328                continue;
329            };
330
331            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
332            let current_checkpoint = current_checkpoint.as_u64().unwrap();
333            if current_checkpoint < checkpoint {
334                tokio::time::sleep(Duration::from_secs(1)).await;
335            } else {
336                break;
337            }
338        }
339    })
340    .await
341    .expect("timeout waiting for graphql to catchup to checkpoint");
342}
343
344impl Cluster {
345    /// Waits for the indexer to index up to the given checkpoint, then waits
346    /// for the graphql service's background task to update the checkpoint
347    /// watermark to the given checkpoint.
348    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
349        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
350    }
351
352    /// Waits for the indexer to prune a given checkpoint.
353    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
354        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
355    }
356
357    /// Sends a cancellation signal to the graphql and indexer services and
358    /// waits for them to shutdown.
359    pub async fn cleanup_resources(self) {
360        self.cancellation_token.cancel();
361        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
362    }
363
364    /// Builds a transaction that transfers IOTA for testing.
365    pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
366        let addresses = self.validator_fullnode_handle.wallet.get_addresses();
367
368        let recipient = addresses[1];
369        self.validator_fullnode_handle
370            .test_transaction_builder()
371            .await
372            .transfer_iota(Some(1_000), recipient)
373            .build()
374    }
375
376    /// Signs a transaction.
377    pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
378        self.validator_fullnode_handle
379            .wallet
380            .sign_transaction(transaction)
381    }
382}
383
384impl ExecutorCluster {
385    /// Waits for the indexer to index up to the given checkpoint, then waits
386    /// for the graphql service's background task to update the checkpoint
387    /// watermark to the given checkpoint.
388    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
389        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
390    }
391
392    /// Waits for the indexer to prune a given checkpoint.
393    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
394        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
395    }
396
397    /// The ObjectsSnapshotProcessor is a long-running task that periodically
398    /// takes a snapshot of the objects table. This leads to flakiness in
399    /// tests, so we wait until the objects_snapshot has reached the
400    /// expected state.
401    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
402        let mut latest_snapshot_cp = 0;
403
404        let latest_cp = self
405            .indexer_store
406            .get_latest_checkpoint_sequence_number()
407            .await
408            .unwrap()
409            .unwrap();
410
411        tokio::time::timeout(base_timeout, async {
412            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
413                tokio::time::sleep(Duration::from_secs(1)).await;
414                latest_snapshot_cp = self
415                    .indexer_store
416                    .get_latest_object_snapshot_checkpoint_sequence_number()
417                    .await
418                    .unwrap()
419                    .unwrap_or_default();
420            }
421        })
422        .await
423        .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
424    }
425
426    /// Sends a cancellation signal to the graphql and indexer services, waits
427    /// for them to complete, and then deletes the database created for the
428    /// test.
429    pub async fn cleanup_resources(self) {
430        self.cancellation_token.cancel();
431        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
432        let db_url = self.graphql_connection_config.db_url.clone();
433        force_delete_database(db_url).await;
434    }
435}