iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10    config::PruningOptions,
11    errors::IndexerError,
12    store::{PgIndexerStore, indexer_store::IndexerStore},
13    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
14};
15use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
16use iota_types::{
17    storage::RestStateReader,
18    transaction::{Transaction, TransactionData},
19};
20use test_cluster::{TestCluster, TestClusterBuilder};
21use tokio::{join, task::JoinHandle};
22use tokio_util::sync::CancellationToken;
23use tracing::info;
24
25use crate::{
26    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
27    server::graphiql_server::start_graphiql_server,
28};
29
30const VALIDATOR_COUNT: usize = 7;
31const EPOCH_DURATION_MS: u64 = 15000;
32
33const ACCOUNT_NUM: usize = 20;
34const GAS_OBJECT_COUNT: usize = 3;
35
36pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
37
38pub struct ExecutorCluster {
39    pub executor_server_handle: JoinHandle<()>,
40    pub indexer_store: PgIndexerStore,
41    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
42    pub graphql_server_join_handle: JoinHandle<()>,
43    pub graphql_client: SimpleClient,
44    pub snapshot_config: SnapshotLagConfig,
45    pub graphql_connection_config: ConnectionConfig,
46    pub cancellation_token: CancellationToken,
47}
48
49pub struct Cluster {
50    pub validator_fullnode_handle: TestCluster,
51    pub indexer_store: PgIndexerStore,
52    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
53    pub graphql_server_join_handle: JoinHandle<()>,
54    pub graphql_client: SimpleClient,
55    pub cancellation_token: CancellationToken,
56}
57
58/// Starts a validator, fullnode, indexer, and graphql service for testing.
59pub async fn start_cluster(
60    graphql_connection_config: ConnectionConfig,
61    internal_data_source_rpc_port: Option<u16>,
62    service_config: ServiceConfig,
63) -> Cluster {
64    let data_ingestion_path = tempfile::tempdir().unwrap().keep();
65    let db_url = graphql_connection_config.db_url.clone();
66    let cancellation_token = CancellationToken::new();
67    // Starts validator+fullnode
68    let val_fn =
69        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
70            .await;
71
72    // Starts indexer
73    let (pg_store, pg_handle) = start_test_indexer_impl(
74        db_url,
75        // reset the existing db
76        true,
77        None,
78        val_fn.rpc_url().to_string(),
79        IndexerTypeConfig::writer_mode(None, None),
80        Some(data_ingestion_path),
81        cancellation_token.clone(),
82    )
83    .await;
84
85    // Starts graphql server
86    let fn_rpc_url = val_fn.rpc_url().to_string();
87    let graphql_server_handle = start_graphql_server_with_fn_rpc(
88        graphql_connection_config.clone(),
89        Some(fn_rpc_url),
90        Some(cancellation_token.clone()),
91        Some(service_config),
92    )
93    .await;
94
95    let server_url = format!(
96        "http://{}:{}/",
97        graphql_connection_config.host, graphql_connection_config.port
98    );
99
100    // Starts graphql client
101    let client = SimpleClient::new(server_url);
102    wait_for_graphql_server(&client).await;
103
104    Cluster {
105        validator_fullnode_handle: val_fn,
106        indexer_store: pg_store,
107        indexer_join_handle: pg_handle,
108        graphql_server_join_handle: graphql_server_handle,
109        graphql_client: client,
110        cancellation_token,
111    }
112}
113
114/// Takes in a simulated instantiation of an IOTA blockchain and builds a
115/// cluster around it.
116///
117/// This cluster is typically used in e2e tests to emulate
118/// and test behaviors. It should be noted however that queries
119/// that rely on the fullnode Write API are not supported yet.
120pub async fn serve_executor(
121    graphql_connection_config: ConnectionConfig,
122    internal_data_source_rpc_port: u16,
123    executor: Arc<dyn RestStateReader + Send + Sync>,
124    snapshot_config: Option<SnapshotLagConfig>,
125    epochs_to_keep: Option<u64>,
126    data_ingestion_path: PathBuf,
127) -> ExecutorCluster {
128    let db_url = graphql_connection_config.db_url.clone();
129    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
130    // can send a cancellation token on cleanup
131    let cancellation_token = CancellationToken::new();
132
133    let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
134        .parse()
135        .unwrap();
136
137    info!("Starting executor server on {}", executor_server_url);
138
139    let executor_server_handle = tokio::spawn(async move {
140        iota_rest_api::RestService::new_without_version(executor)
141            .start_service(executor_server_url)
142            .await;
143    });
144
145    info!("spawned executor server");
146
147    let (pg_store, pg_handle) = start_test_indexer_impl(
148        db_url,
149        true,
150        None,
151        format!("http://{executor_server_url}"),
152        IndexerTypeConfig::writer_mode(
153            snapshot_config.clone(),
154            Some(PruningOptions {
155                epochs_to_keep,
156                ..Default::default()
157            }),
158        ),
159        Some(data_ingestion_path),
160        cancellation_token.clone(),
161    )
162    .await;
163
164    // Starts graphql server
165    let graphql_server_handle = start_graphql_server_with_fn_rpc(
166        graphql_connection_config.clone(),
167        // this does not provide access to the node write api
168        Some(format!("http://{executor_server_url}")),
169        Some(cancellation_token.clone()),
170        None,
171    )
172    .await;
173
174    let server_url = format!(
175        "http://{}:{}/",
176        graphql_connection_config.host, graphql_connection_config.port
177    );
178
179    // Starts graphql client
180    let client = SimpleClient::new(server_url);
181    wait_for_graphql_server(&client).await;
182
183    ExecutorCluster {
184        executor_server_handle,
185        indexer_store: pg_store,
186        indexer_join_handle: pg_handle,
187        graphql_server_join_handle: graphql_server_handle,
188        graphql_client: client,
189        snapshot_config: snapshot_config.unwrap_or_default(),
190        graphql_connection_config,
191        cancellation_token,
192    }
193}
194
195/// Ping the GraphQL server for a checkpoint until an empty response is
196/// returned, indicating that the checkpoint has been pruned.
197pub async fn wait_for_graphql_checkpoint_pruned(
198    client: &SimpleClient,
199    checkpoint: u64,
200    base_timeout: Duration,
201) {
202    info!(
203        "Waiting for checkpoint to be pruned {}, base time out is {}",
204        checkpoint,
205        base_timeout.as_secs()
206    );
207    let query = format!(
208        r#"
209        {{
210            checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
211                sequenceNumber
212            }}
213        }}"#
214    );
215
216    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
217
218    tokio::time::timeout(timeout, async {
219        loop {
220            let resp = client
221                .execute_to_graphql(query.to_string(), false, vec![], vec![])
222                .await
223                .unwrap()
224                .response_body_json();
225
226            let current_checkpoint = &resp["data"]["checkpoint"];
227            if current_checkpoint.is_null() {
228                break;
229            } else {
230                tokio::time::sleep(Duration::from_secs(1)).await;
231            }
232        }
233    })
234    .await
235    .expect("timeout waiting for checkpoint to be pruned");
236}
237
238pub async fn start_graphql_server_with_fn_rpc(
239    graphql_connection_config: ConnectionConfig,
240    fn_rpc_url: Option<String>,
241    cancellation_token: Option<CancellationToken>,
242    service_config: Option<ServiceConfig>,
243) -> JoinHandle<()> {
244    let cancellation_token = cancellation_token.unwrap_or_default();
245    let mut server_config = ServerConfig {
246        connection: graphql_connection_config,
247        service: service_config.unwrap_or_else(ServiceConfig::test_defaults),
248        ..ServerConfig::default()
249    };
250    if let Some(fn_rpc_url) = fn_rpc_url {
251        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
252    };
253
254    // Starts graphql server
255    tokio::spawn(async move {
256        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
257            .await
258            .unwrap();
259    })
260}
261
262async fn start_validator_with_fullnode(
263    internal_data_source_rpc_port: Option<u16>,
264    data_ingestion_dir: PathBuf,
265) -> TestCluster {
266    let mut test_cluster_builder = TestClusterBuilder::new()
267        .with_num_validators(VALIDATOR_COUNT)
268        .with_epoch_duration_ms(EPOCH_DURATION_MS)
269        .with_data_ingestion_dir(data_ingestion_dir)
270        .with_accounts(vec![
271            AccountConfig {
272                address: None,
273                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
274            };
275            ACCOUNT_NUM
276        ]);
277
278    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
279        test_cluster_builder =
280            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
281    };
282    test_cluster_builder.build().await
283}
284
285/// Repeatedly ping the GraphQL server for 10s, until it responds
286async fn wait_for_graphql_server(client: &SimpleClient) {
287    tokio::time::timeout(Duration::from_secs(10), async {
288        while client.ping().await.is_err() {
289            tokio::time::sleep(Duration::from_millis(500)).await;
290        }
291    })
292    .await
293    .expect("timeout waiting for graphql server to start");
294}
295
296/// Ping the GraphQL server until its background task has updated the checkpoint
297/// watermark to the desired checkpoint.
298async fn wait_for_graphql_checkpoint_catchup(
299    client: &SimpleClient,
300    checkpoint: u64,
301    base_timeout: Duration,
302) {
303    info!(
304        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
305        checkpoint,
306        base_timeout.as_secs()
307    );
308    let query = r#"
309    {
310        availableRange {
311            last {
312                sequenceNumber
313            }
314        }
315    }"#;
316
317    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
318
319    tokio::time::timeout(timeout, async {
320        loop {
321            let resp = client
322                .execute_to_graphql(query.to_string(), false, vec![], vec![])
323                .await
324                .unwrap()
325                .response_body_json();
326
327            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
328            info!("Current checkpoint: {:?}", current_checkpoint);
329            // Indexer has not picked up any checkpoints yet
330            let Some(current_checkpoint) = current_checkpoint else {
331                tokio::time::sleep(Duration::from_secs(1)).await;
332                continue;
333            };
334
335            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
336            let current_checkpoint = current_checkpoint.as_u64().unwrap();
337            if current_checkpoint < checkpoint {
338                tokio::time::sleep(Duration::from_secs(1)).await;
339            } else {
340                break;
341            }
342        }
343    })
344    .await
345    .expect("timeout waiting for graphql to catchup to checkpoint");
346}
347
348impl Cluster {
349    /// Waits for the indexer to index up to the given checkpoint, then waits
350    /// for the graphql service's background task to update the checkpoint
351    /// watermark to the given checkpoint.
352    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
353        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
354    }
355
356    /// Waits for the indexer to prune a given checkpoint.
357    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
358        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
359    }
360
361    /// Builds a transaction that transfers IOTA for testing.
362    pub async fn build_transfer_iota_for_test(&self) -> TransactionData {
363        let addresses = self.validator_fullnode_handle.wallet.get_addresses();
364
365        let recipient = addresses[1];
366        self.validator_fullnode_handle
367            .test_transaction_builder()
368            .await
369            .transfer_iota(Some(1_000), recipient)
370            .build()
371    }
372
373    /// Signs a transaction.
374    pub fn sign_transaction(&self, transaction: &TransactionData) -> Transaction {
375        self.validator_fullnode_handle
376            .wallet
377            .sign_transaction(transaction)
378    }
379}
380
381impl ExecutorCluster {
382    /// Waits for the indexer to index up to the given checkpoint, then waits
383    /// for the graphql service's background task to update the checkpoint
384    /// watermark to the given checkpoint.
385    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
386        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
387    }
388
389    /// Waits for the indexer to prune a given checkpoint.
390    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
391        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
392    }
393
394    /// The ObjectsSnapshotProcessor is a long-running task that periodically
395    /// takes a snapshot of the objects table. This leads to flakiness in
396    /// tests, so we wait until the objects_snapshot has reached the
397    /// expected state.
398    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
399        let mut latest_snapshot_cp = 0;
400
401        let latest_cp = self
402            .indexer_store
403            .get_latest_checkpoint_sequence_number()
404            .await
405            .unwrap()
406            .unwrap();
407
408        tokio::time::timeout(base_timeout, async {
409            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
410                tokio::time::sleep(Duration::from_secs(1)).await;
411                latest_snapshot_cp = self
412                    .indexer_store
413                    .get_latest_object_snapshot_watermark()
414                    .await
415                    .unwrap()
416                    .map(|watermark| watermark.checkpoint_hi_inclusive)
417                    .unwrap_or_default();
418            }
419        })
420        .await
421        .unwrap_or_else(|_| panic!("timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
422    }
423
424    /// Sends a cancellation signal to the graphql and indexer services, waits
425    /// for them to complete, and then deletes the database created for the
426    /// test.
427    pub async fn cleanup_resources(self) {
428        self.cancellation_token.cancel();
429        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
430        let db_url = self.graphql_connection_config.db_url.clone();
431        force_delete_database(db_url).await;
432    }
433}