iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::config::SnapshotLagConfig;
9use iota_indexer::{
10    errors::IndexerError,
11    store::{PgIndexerStore, indexer_store::IndexerStore},
12    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
15use iota_types::storage::RestStateReader;
16use test_cluster::{TestCluster, TestClusterBuilder};
17use tokio::{join, task::JoinHandle};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
23    server::graphiql_server::start_graphiql_server,
24};
25
26const VALIDATOR_COUNT: usize = 7;
27const EPOCH_DURATION_MS: u64 = 15000;
28
29const ACCOUNT_NUM: usize = 20;
30const GAS_OBJECT_COUNT: usize = 3;
31
32pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
33
34pub struct ExecutorCluster {
35    pub executor_server_handle: JoinHandle<()>,
36    pub indexer_store: PgIndexerStore,
37    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38    pub graphql_server_join_handle: JoinHandle<()>,
39    pub graphql_client: SimpleClient,
40    pub snapshot_config: SnapshotLagConfig,
41    pub graphql_connection_config: ConnectionConfig,
42    pub cancellation_token: CancellationToken,
43}
44
45pub struct Cluster {
46    pub validator_fullnode_handle: TestCluster,
47    pub indexer_store: PgIndexerStore,
48    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
49    pub graphql_server_join_handle: JoinHandle<()>,
50    pub graphql_client: SimpleClient,
51    pub cancellation_token: CancellationToken,
52}
53
54/// Starts a validator, fullnode, indexer, and graphql service for testing.
55pub async fn start_cluster(
56    graphql_connection_config: ConnectionConfig,
57    internal_data_source_rpc_port: Option<u16>,
58) -> Cluster {
59    let data_ingestion_path = tempfile::tempdir().unwrap().into_path();
60    let db_url = graphql_connection_config.db_url.clone();
61    let cancellation_token = CancellationToken::new();
62    // Starts validator+fullnode
63    let val_fn =
64        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65            .await;
66
67    // Starts indexer
68    let (pg_store, pg_handle) = start_test_indexer_impl(
69        db_url,
70        // reset the existing db
71        true,
72        None,
73        val_fn.rpc_url().to_string(),
74        IndexerTypeConfig::writer_mode(None, None),
75        Some(data_ingestion_path),
76        cancellation_token.clone(),
77    )
78    .await;
79
80    // Starts graphql server
81    let fn_rpc_url = val_fn.rpc_url().to_string();
82    let graphql_server_handle = start_graphql_server_with_fn_rpc(
83        graphql_connection_config.clone(),
84        Some(fn_rpc_url),
85        Some(cancellation_token.clone()),
86    )
87    .await;
88
89    let server_url = format!(
90        "http://{}:{}/",
91        graphql_connection_config.host, graphql_connection_config.port
92    );
93
94    // Starts graphql client
95    let client = SimpleClient::new(server_url);
96    wait_for_graphql_server(&client).await;
97
98    Cluster {
99        validator_fullnode_handle: val_fn,
100        indexer_store: pg_store,
101        indexer_join_handle: pg_handle,
102        graphql_server_join_handle: graphql_server_handle,
103        graphql_client: client,
104        cancellation_token,
105    }
106}
107
108/// Takes in a simulated instantiation of an IOTA blockchain and builds a
109/// cluster around it. This cluster is typically used in e2e tests to emulate
110/// and test behaviors.
111pub async fn serve_executor(
112    graphql_connection_config: ConnectionConfig,
113    internal_data_source_rpc_port: u16,
114    executor: Arc<dyn RestStateReader + Send + Sync>,
115    snapshot_config: Option<SnapshotLagConfig>,
116    epochs_to_keep: Option<u64>,
117    data_ingestion_path: PathBuf,
118) -> ExecutorCluster {
119    let db_url = graphql_connection_config.db_url.clone();
120    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
121    // can send a cancellation token on cleanup
122    let cancellation_token = CancellationToken::new();
123
124    let executor_server_url: SocketAddr = format!("127.0.0.1:{internal_data_source_rpc_port}")
125        .parse()
126        .unwrap();
127
128    info!("Starting executor server on {}", executor_server_url);
129
130    let executor_server_handle = tokio::spawn(async move {
131        iota_rest_api::RestService::new_without_version(executor)
132            .start_service(executor_server_url)
133            .await;
134    });
135
136    info!("spawned executor server");
137
138    let (pg_store, pg_handle) = start_test_indexer_impl(
139        db_url,
140        true,
141        None,
142        format!("http://{executor_server_url}"),
143        IndexerTypeConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
144        Some(data_ingestion_path),
145        cancellation_token.clone(),
146    )
147    .await;
148
149    // Starts graphql server
150    let graphql_server_handle = start_graphql_server(
151        graphql_connection_config.clone(),
152        cancellation_token.clone(),
153    )
154    .await;
155
156    let server_url = format!(
157        "http://{}:{}/",
158        graphql_connection_config.host, graphql_connection_config.port
159    );
160
161    // Starts graphql client
162    let client = SimpleClient::new(server_url);
163    wait_for_graphql_server(&client).await;
164
165    ExecutorCluster {
166        executor_server_handle,
167        indexer_store: pg_store,
168        indexer_join_handle: pg_handle,
169        graphql_server_join_handle: graphql_server_handle,
170        graphql_client: client,
171        snapshot_config: snapshot_config.unwrap_or_default(),
172        graphql_connection_config,
173        cancellation_token,
174    }
175}
176
177/// Ping the GraphQL server for a checkpoint until an empty response is
178/// returned, indicating that the checkpoint has been pruned.
179pub async fn wait_for_graphql_checkpoint_pruned(
180    client: &SimpleClient,
181    checkpoint: u64,
182    base_timeout: Duration,
183) {
184    info!(
185        "Waiting for checkpoint to be pruned {}, base time out is {}",
186        checkpoint,
187        base_timeout.as_secs()
188    );
189    let query = format!(
190        r#"
191        {{
192            checkpoint(id: {{ sequenceNumber: {checkpoint} }}) {{
193                sequenceNumber
194            }}
195        }}"#
196    );
197
198    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
199
200    tokio::time::timeout(timeout, async {
201        loop {
202            let resp = client
203                .execute_to_graphql(query.to_string(), false, vec![], vec![])
204                .await
205                .unwrap()
206                .response_body_json();
207
208            let current_checkpoint = &resp["data"]["checkpoint"];
209            if current_checkpoint.is_null() {
210                break;
211            } else {
212                tokio::time::sleep(Duration::from_secs(1)).await;
213            }
214        }
215    })
216    .await
217    .expect("Timeout waiting for checkpoint to be pruned");
218}
219
220pub async fn start_graphql_server(
221    graphql_connection_config: ConnectionConfig,
222    cancellation_token: CancellationToken,
223) -> JoinHandle<()> {
224    start_graphql_server_with_fn_rpc(graphql_connection_config, None, Some(cancellation_token))
225        .await
226}
227
228pub async fn start_graphql_server_with_fn_rpc(
229    graphql_connection_config: ConnectionConfig,
230    fn_rpc_url: Option<String>,
231    cancellation_token: Option<CancellationToken>,
232) -> JoinHandle<()> {
233    let cancellation_token = cancellation_token.unwrap_or_default();
234    let mut server_config = ServerConfig {
235        connection: graphql_connection_config,
236        service: ServiceConfig::test_defaults(),
237        ..ServerConfig::default()
238    };
239    if let Some(fn_rpc_url) = fn_rpc_url {
240        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
241    };
242
243    // Starts graphql server
244    tokio::spawn(async move {
245        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
246            .await
247            .unwrap();
248    })
249}
250
251async fn start_validator_with_fullnode(
252    internal_data_source_rpc_port: Option<u16>,
253    data_ingestion_dir: PathBuf,
254) -> TestCluster {
255    let mut test_cluster_builder = TestClusterBuilder::new()
256        .with_num_validators(VALIDATOR_COUNT)
257        .with_epoch_duration_ms(EPOCH_DURATION_MS)
258        .with_data_ingestion_dir(data_ingestion_dir)
259        .with_accounts(vec![
260            AccountConfig {
261                address: None,
262                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
263            };
264            ACCOUNT_NUM
265        ]);
266
267    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
268        test_cluster_builder =
269            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
270    };
271    test_cluster_builder.build().await
272}
273
274/// Repeatedly ping the GraphQL server for 10s, until it responds
275async fn wait_for_graphql_server(client: &SimpleClient) {
276    tokio::time::timeout(Duration::from_secs(10), async {
277        while client.ping().await.is_err() {
278            tokio::time::sleep(Duration::from_millis(500)).await;
279        }
280    })
281    .await
282    .expect("Timeout waiting for graphql server to start");
283}
284
285/// Ping the GraphQL server until its background task has updated the checkpoint
286/// watermark to the desired checkpoint.
287async fn wait_for_graphql_checkpoint_catchup(
288    client: &SimpleClient,
289    checkpoint: u64,
290    base_timeout: Duration,
291) {
292    info!(
293        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
294        checkpoint,
295        base_timeout.as_secs()
296    );
297    let query = r#"
298    {
299        availableRange {
300            last {
301                sequenceNumber
302            }
303        }
304    }"#;
305
306    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
307
308    tokio::time::timeout(timeout, async {
309        loop {
310            let resp = client
311                .execute_to_graphql(query.to_string(), false, vec![], vec![])
312                .await
313                .unwrap()
314                .response_body_json();
315
316            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
317            info!("Current checkpoint: {:?}", current_checkpoint);
318            // Indexer has not picked up any checkpoints yet
319            let Some(current_checkpoint) = current_checkpoint else {
320                tokio::time::sleep(Duration::from_secs(1)).await;
321                continue;
322            };
323
324            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
325            let current_checkpoint = current_checkpoint.as_u64().unwrap();
326            if current_checkpoint < checkpoint {
327                tokio::time::sleep(Duration::from_secs(1)).await;
328            } else {
329                break;
330            }
331        }
332    })
333    .await
334    .expect("Timeout waiting for graphql to catchup to checkpoint");
335}
336
337impl Cluster {
338    /// Waits for the indexer to index up to the given checkpoint, then waits
339    /// for the graphql service's background task to update the checkpoint
340    /// watermark to the given checkpoint.
341    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
342        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
343    }
344
345    /// Waits for the indexer to prune a given checkpoint.
346    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
347        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
348    }
349
350    /// Sends a cancellation signal to the graphql and indexer services and
351    /// waits for them to shutdown.
352    pub async fn cleanup_resources(self) {
353        self.cancellation_token.cancel();
354        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
355    }
356}
357
358impl ExecutorCluster {
359    /// Waits for the indexer to index up to the given checkpoint, then waits
360    /// for the graphql service's background task to update the checkpoint
361    /// watermark to the given checkpoint.
362    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
363        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
364    }
365
366    /// Waits for the indexer to prune a given checkpoint.
367    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
368        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
369    }
370
371    /// The ObjectsSnapshotProcessor is a long-running task that periodically
372    /// takes a snapshot of the objects table. This leads to flakiness in
373    /// tests, so we wait until the objects_snapshot has reached the
374    /// expected state.
375    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
376        let mut latest_snapshot_cp = 0;
377
378        let latest_cp = self
379            .indexer_store
380            .get_latest_checkpoint_sequence_number()
381            .await
382            .unwrap()
383            .unwrap();
384
385        tokio::time::timeout(base_timeout, async {
386            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
387                tokio::time::sleep(Duration::from_secs(1)).await;
388                latest_snapshot_cp = self
389                    .indexer_store
390                    .get_latest_object_snapshot_checkpoint_sequence_number()
391                    .await
392                    .unwrap()
393                    .unwrap_or_default();
394            }
395        })
396        .await
397        .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {latest_cp}, latest_snapshot_cp: {latest_snapshot_cp}"));
398    }
399
400    /// Sends a cancellation signal to the graphql and indexer services, waits
401    /// for them to complete, and then deletes the database created for the
402    /// test.
403    pub async fn cleanup_resources(self) {
404        self.cancellation_token.cancel();
405        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
406        let db_url = self.graphql_connection_config.db_url.clone();
407        force_delete_database(db_url).await;
408    }
409}