iota_graphql_rpc/test_infra/
cluster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
6
7use iota_graphql_rpc_client::simple_client::SimpleClient;
8pub use iota_indexer::handlers::objects_snapshot_handler::SnapshotLagConfig;
9use iota_indexer::{
10    errors::IndexerError,
11    store::{PgIndexerStore, indexer_store::IndexerStore},
12    test_utils::{IndexerTypeConfig, force_delete_database, start_test_indexer_impl},
13};
14use iota_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
15use iota_types::storage::RestStateReader;
16use test_cluster::{TestCluster, TestClusterBuilder};
17use tokio::{join, task::JoinHandle};
18use tokio_util::sync::CancellationToken;
19use tracing::info;
20
21use crate::{
22    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
23    server::graphiql_server::start_graphiql_server,
24};
25
26const VALIDATOR_COUNT: usize = 7;
27const EPOCH_DURATION_MS: u64 = 15000;
28
29const ACCOUNT_NUM: usize = 20;
30const GAS_OBJECT_COUNT: usize = 3;
31
32pub const DEFAULT_INTERNAL_DATA_SOURCE_PORT: u16 = 3000;
33
34pub struct ExecutorCluster {
35    pub executor_server_handle: JoinHandle<()>,
36    pub indexer_store: PgIndexerStore,
37    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
38    pub graphql_server_join_handle: JoinHandle<()>,
39    pub graphql_client: SimpleClient,
40    pub snapshot_config: SnapshotLagConfig,
41    pub graphql_connection_config: ConnectionConfig,
42    pub cancellation_token: CancellationToken,
43}
44
45pub struct Cluster {
46    pub validator_fullnode_handle: TestCluster,
47    pub indexer_store: PgIndexerStore,
48    pub indexer_join_handle: JoinHandle<Result<(), IndexerError>>,
49    pub graphql_server_join_handle: JoinHandle<()>,
50    pub graphql_client: SimpleClient,
51    pub cancellation_token: CancellationToken,
52}
53
54/// Starts a validator, fullnode, indexer, and graphql service for testing.
55pub async fn start_cluster(
56    graphql_connection_config: ConnectionConfig,
57    internal_data_source_rpc_port: Option<u16>,
58) -> Cluster {
59    let data_ingestion_path = tempfile::tempdir().unwrap().into_path();
60    let db_url = graphql_connection_config.db_url.clone();
61    let cancellation_token = CancellationToken::new();
62    // Starts validator+fullnode
63    let val_fn =
64        start_validator_with_fullnode(internal_data_source_rpc_port, data_ingestion_path.clone())
65            .await;
66
67    // Starts indexer
68    let (pg_store, pg_handle) = start_test_indexer_impl(
69        db_url,
70        // reset the existing db
71        true,
72        None,
73        val_fn.rpc_url().to_string(),
74        IndexerTypeConfig::writer_mode(None, None),
75        Some(data_ingestion_path),
76        cancellation_token.clone(),
77    )
78    .await;
79
80    // Starts graphql server
81    let fn_rpc_url = val_fn.rpc_url().to_string();
82    let graphql_server_handle = start_graphql_server_with_fn_rpc(
83        graphql_connection_config.clone(),
84        Some(fn_rpc_url),
85        Some(cancellation_token.clone()),
86    )
87    .await;
88
89    let server_url = format!(
90        "http://{}:{}/",
91        graphql_connection_config.host, graphql_connection_config.port
92    );
93
94    // Starts graphql client
95    let client = SimpleClient::new(server_url);
96    wait_for_graphql_server(&client).await;
97
98    Cluster {
99        validator_fullnode_handle: val_fn,
100        indexer_store: pg_store,
101        indexer_join_handle: pg_handle,
102        graphql_server_join_handle: graphql_server_handle,
103        graphql_client: client,
104        cancellation_token,
105    }
106}
107
108/// Takes in a simulated instantiation of an IOTA blockchain and builds a
109/// cluster around it. This cluster is typically used in e2e tests to emulate
110/// and test behaviors.
111pub async fn serve_executor(
112    graphql_connection_config: ConnectionConfig,
113    internal_data_source_rpc_port: u16,
114    executor: Arc<dyn RestStateReader + Send + Sync>,
115    snapshot_config: Option<SnapshotLagConfig>,
116    epochs_to_keep: Option<u64>,
117    data_ingestion_path: PathBuf,
118) -> ExecutorCluster {
119    let db_url = graphql_connection_config.db_url.clone();
120    // Creates a cancellation token and adds this to the ExecutorCluster, so that we
121    // can send a cancellation token on cleanup
122    let cancellation_token = CancellationToken::new();
123
124    let executor_server_url: SocketAddr = format!("127.0.0.1:{}", internal_data_source_rpc_port)
125        .parse()
126        .unwrap();
127
128    info!("Starting executor server on {}", executor_server_url);
129
130    let executor_server_handle = tokio::spawn(async move {
131        iota_rest_api::RestService::new_without_version(executor)
132            .start_service(executor_server_url)
133            .await;
134    });
135
136    info!("spawned executor server");
137
138    let (pg_store, pg_handle) = start_test_indexer_impl(
139        db_url,
140        true,
141        None,
142        format!("http://{}", executor_server_url),
143        IndexerTypeConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
144        Some(data_ingestion_path),
145        cancellation_token.clone(),
146    )
147    .await;
148
149    // Starts graphql server
150    let graphql_server_handle = start_graphql_server(
151        graphql_connection_config.clone(),
152        cancellation_token.clone(),
153    )
154    .await;
155
156    let server_url = format!(
157        "http://{}:{}/",
158        graphql_connection_config.host, graphql_connection_config.port
159    );
160
161    // Starts graphql client
162    let client = SimpleClient::new(server_url);
163    wait_for_graphql_server(&client).await;
164
165    ExecutorCluster {
166        executor_server_handle,
167        indexer_store: pg_store,
168        indexer_join_handle: pg_handle,
169        graphql_server_join_handle: graphql_server_handle,
170        graphql_client: client,
171        snapshot_config: snapshot_config.unwrap_or_default(),
172        graphql_connection_config,
173        cancellation_token,
174    }
175}
176
177/// Ping the GraphQL server for a checkpoint until an empty response is
178/// returned, indicating that the checkpoint has been pruned.
179pub async fn wait_for_graphql_checkpoint_pruned(
180    client: &SimpleClient,
181    checkpoint: u64,
182    base_timeout: Duration,
183) {
184    info!(
185        "Waiting for checkpoint to be pruned {}, base time out is {}",
186        checkpoint,
187        base_timeout.as_secs()
188    );
189    let query = format!(
190        r#"
191        {{
192            checkpoint(id: {{ sequenceNumber: {} }}) {{
193                sequenceNumber
194            }}
195        }}"#,
196        checkpoint
197    );
198
199    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
200
201    tokio::time::timeout(timeout, async {
202        loop {
203            let resp = client
204                .execute_to_graphql(query.to_string(), false, vec![], vec![])
205                .await
206                .unwrap()
207                .response_body_json();
208
209            let current_checkpoint = &resp["data"]["checkpoint"];
210            if current_checkpoint.is_null() {
211                break;
212            } else {
213                tokio::time::sleep(Duration::from_secs(1)).await;
214            }
215        }
216    })
217    .await
218    .expect("Timeout waiting for checkpoint to be pruned");
219}
220
221pub async fn start_graphql_server(
222    graphql_connection_config: ConnectionConfig,
223    cancellation_token: CancellationToken,
224) -> JoinHandle<()> {
225    start_graphql_server_with_fn_rpc(graphql_connection_config, None, Some(cancellation_token))
226        .await
227}
228
229pub async fn start_graphql_server_with_fn_rpc(
230    graphql_connection_config: ConnectionConfig,
231    fn_rpc_url: Option<String>,
232    cancellation_token: Option<CancellationToken>,
233) -> JoinHandle<()> {
234    let cancellation_token = cancellation_token.unwrap_or_default();
235    let mut server_config = ServerConfig {
236        connection: graphql_connection_config,
237        service: ServiceConfig::test_defaults(),
238        ..ServerConfig::default()
239    };
240    if let Some(fn_rpc_url) = fn_rpc_url {
241        server_config.tx_exec_full_node.node_rpc_url = Some(fn_rpc_url);
242    };
243
244    // Starts graphql server
245    tokio::spawn(async move {
246        start_graphiql_server(&server_config, &Version::for_testing(), cancellation_token)
247            .await
248            .unwrap();
249    })
250}
251
252async fn start_validator_with_fullnode(
253    internal_data_source_rpc_port: Option<u16>,
254    data_ingestion_dir: PathBuf,
255) -> TestCluster {
256    let mut test_cluster_builder = TestClusterBuilder::new()
257        .with_num_validators(VALIDATOR_COUNT)
258        .with_epoch_duration_ms(EPOCH_DURATION_MS)
259        .with_data_ingestion_dir(data_ingestion_dir)
260        .with_accounts(vec![
261            AccountConfig {
262                address: None,
263                gas_amounts: vec![DEFAULT_GAS_AMOUNT; GAS_OBJECT_COUNT],
264            };
265            ACCOUNT_NUM
266        ]);
267
268    if let Some(internal_data_source_rpc_port) = internal_data_source_rpc_port {
269        test_cluster_builder =
270            test_cluster_builder.with_fullnode_rpc_port(internal_data_source_rpc_port);
271    };
272    test_cluster_builder.build().await
273}
274
275/// Repeatedly ping the GraphQL server for 10s, until it responds
276async fn wait_for_graphql_server(client: &SimpleClient) {
277    tokio::time::timeout(Duration::from_secs(10), async {
278        while client.ping().await.is_err() {
279            tokio::time::sleep(Duration::from_millis(500)).await;
280        }
281    })
282    .await
283    .expect("Timeout waiting for graphql server to start");
284}
285
286/// Ping the GraphQL server until its background task has updated the checkpoint
287/// watermark to the desired checkpoint.
288async fn wait_for_graphql_checkpoint_catchup(
289    client: &SimpleClient,
290    checkpoint: u64,
291    base_timeout: Duration,
292) {
293    info!(
294        "Waiting for graphql to catchup to checkpoint {}, base time out is {}",
295        checkpoint,
296        base_timeout.as_secs()
297    );
298    let query = r#"
299    {
300        availableRange {
301            last {
302                sequenceNumber
303            }
304        }
305    }"#;
306
307    let timeout = base_timeout.mul_f64(checkpoint.max(1) as f64);
308
309    tokio::time::timeout(timeout, async {
310        loop {
311            let resp = client
312                .execute_to_graphql(query.to_string(), false, vec![], vec![])
313                .await
314                .unwrap()
315                .response_body_json();
316
317            let current_checkpoint = resp["data"]["availableRange"]["last"].get("sequenceNumber");
318            info!("Current checkpoint: {:?}", current_checkpoint);
319            // Indexer has not picked up any checkpoints yet
320            let Some(current_checkpoint) = current_checkpoint else {
321                tokio::time::sleep(Duration::from_secs(1)).await;
322                continue;
323            };
324
325            // Indexer has picked up a checkpoint, but it's not the one we're waiting for
326            let current_checkpoint = current_checkpoint.as_u64().unwrap();
327            if current_checkpoint < checkpoint {
328                tokio::time::sleep(Duration::from_secs(1)).await;
329            } else {
330                break;
331            }
332        }
333    })
334    .await
335    .expect("Timeout waiting for graphql to catchup to checkpoint");
336}
337
338impl Cluster {
339    /// Waits for the indexer to index up to the given checkpoint, then waits
340    /// for the graphql service's background task to update the checkpoint
341    /// watermark to the given checkpoint.
342    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
343        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
344    }
345
346    /// Waits for the indexer to prune a given checkpoint.
347    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
348        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
349    }
350
351    /// Sends a cancellation signal to the graphql and indexer services and
352    /// waits for them to shutdown.
353    pub async fn cleanup_resources(self) {
354        self.cancellation_token.cancel();
355        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
356    }
357}
358
359impl ExecutorCluster {
360    /// Waits for the indexer to index up to the given checkpoint, then waits
361    /// for the graphql service's background task to update the checkpoint
362    /// watermark to the given checkpoint.
363    pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
364        wait_for_graphql_checkpoint_catchup(&self.graphql_client, checkpoint, base_timeout).await
365    }
366
367    /// Waits for the indexer to prune a given checkpoint.
368    pub async fn wait_for_checkpoint_pruned(&self, checkpoint: u64, base_timeout: Duration) {
369        wait_for_graphql_checkpoint_pruned(&self.graphql_client, checkpoint, base_timeout).await
370    }
371
372    /// The ObjectsSnapshotProcessor is a long-running task that periodically
373    /// takes a snapshot of the objects table. This leads to flakiness in
374    /// tests, so we wait until the objects_snapshot has reached the
375    /// expected state.
376    pub async fn wait_for_objects_snapshot_catchup(&self, base_timeout: Duration) {
377        let mut latest_snapshot_cp = 0;
378
379        let latest_cp = self
380            .indexer_store
381            .get_latest_checkpoint_sequence_number()
382            .await
383            .unwrap()
384            .unwrap();
385
386        tokio::time::timeout(base_timeout, async {
387            while latest_cp > latest_snapshot_cp + self.snapshot_config.snapshot_min_lag as u64 {
388                tokio::time::sleep(Duration::from_secs(1)).await;
389                latest_snapshot_cp = self
390                    .indexer_store
391                    .get_latest_object_snapshot_checkpoint_sequence_number()
392                    .await
393                    .unwrap()
394                    .unwrap_or_default();
395            }
396        })
397        .await
398        .unwrap_or_else(|_| panic!("Timeout waiting for indexer to update objects snapshot - latest_cp: {}, latest_snapshot_cp: {}",
399        latest_cp, latest_snapshot_cp));
400    }
401
402    /// Sends a cancellation signal to the graphql and indexer services, waits
403    /// for them to complete, and then deletes the database created for the
404    /// test.
405    pub async fn cleanup_resources(self) {
406        self.cancellation_token.cancel();
407        let _ = join!(self.graphql_server_join_handle, self.indexer_join_handle);
408        let db_url = self.graphql_connection_config.db_url.clone();
409        force_delete_database(db_url).await;
410    }
411}