iota_light_client/
utils.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use std::{
5    collections::HashMap,
6    fs,
7    io::{Read, Write},
8    path::PathBuf,
9    sync::{Arc, Mutex},
10};
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use clap::Subcommand;
15use iota_config::genesis::Genesis;
16use iota_json_rpc_types::{IotaObjectDataOptions, IotaTransactionBlockResponseOptions};
17use iota_package_resolver::{Package, PackageStore, Result as ResolverResult};
18use iota_rest_api::{CheckpointData, Client};
19use iota_sdk::IotaClientBuilder;
20use iota_types::{
21    base_types::ObjectID,
22    committee::Committee,
23    crypto::AuthorityQuorumSignInfo,
24    digests::TransactionDigest,
25    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
26    message_envelope::Envelope,
27    messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary, EndOfEpochData},
28    object::Object,
29};
30use log::info;
31use move_core_types::account_address::AccountAddress;
32use object_store::{parse_url, path::Path};
33use serde_json::{Value, json};
34use url::Url;
35
36pub struct RemotePackageStore {
37    config: Config,
38    cache: Mutex<HashMap<AccountAddress, Arc<Package>>>,
39}
40
41impl RemotePackageStore {
42    pub fn new(config: Config) -> Self {
43        Self {
44            config,
45            cache: Mutex::new(HashMap::new()),
46        }
47    }
48}
49
50#[async_trait]
51impl PackageStore for RemotePackageStore {
52    /// Read package contents. Fails if `id` is not an object, not a package, or
53    /// is malformed in some way.
54    async fn fetch(&self, id: AccountAddress) -> ResolverResult<Arc<Package>> {
55        // Check if we have it in the cache
56        if let Some(package) = self.cache.lock().unwrap().get(&id) {
57            info!("Fetch Package: {} cache hit", id);
58            return Ok(package.clone());
59        }
60
61        info!("Fetch Package: {}", id);
62
63        let object = get_verified_object(&self.config, id.into()).await.unwrap();
64        let package = Arc::new(Package::read_from_object(&object).unwrap());
65
66        // Add to the cache
67        self.cache.lock().unwrap().insert(id, package.clone());
68
69        Ok(package)
70    }
71}
72
73#[derive(Subcommand, Debug)]
74pub enum SCommands {
75    /// Sync all end-of-epoch checkpoints
76    Sync {},
77
78    /// Checks a specific transaction using the light client
79    Transaction {
80        /// Transaction hash
81        #[arg(short, long, value_name = "TID")]
82        tid: String,
83    },
84
85    /// Checks a specific object using the light client
86    Object {
87        /// Transaction hash
88        #[arg(short, long, value_name = "OID")]
89        oid: String,
90    },
91}
92
93// The config file for the light client including the root of trust genesis
94// digest
95#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
96pub struct Config {
97    /// Full node url
98    pub full_node_url: String,
99
100    /// Checkpoint summary directory
101    pub checkpoint_summary_dir: PathBuf,
102
103    //  Genesis file name
104    pub genesis_filename: PathBuf,
105
106    /// Object store url
107    pub object_store_url: String,
108
109    /// GraphQL endpoint
110    pub graphql_url: String,
111}
112
113async fn query_last_checkpoint_of_epoch(config: &Config, epoch_id: u64) -> anyhow::Result<u64> {
114    // GraphQL query to get the last checkpoint of an epoch
115    let query = json!({
116        "query": "query ($epochID: Int) { epoch(id: $epochID) { checkpoints(last: 1) { nodes { sequenceNumber } } } }",
117        "variables": { "epochID": epoch_id }
118    });
119
120    // Submit the query by POSTing to the GraphQL endpoint
121    let client = reqwest::Client::new();
122    let resp = client
123        .post(&config.graphql_url)
124        .header("Content-Type", "application/json")
125        .body(query.to_string())
126        .send()
127        .await
128        .expect("Cannot connect to graphql")
129        .text()
130        .await
131        .expect("Cannot parse response");
132
133    // Parse the JSON response to get the last checkpoint of the epoch
134    let v: Value = serde_json::from_str(resp.as_str()).expect("Incorrect JSON response");
135    let checkpoint_number = v["data"]["epoch"]["checkpoints"]["nodes"][0]["sequenceNumber"]
136        .as_u64()
137        .unwrap();
138
139    Ok(checkpoint_number)
140}
141
142// The list of checkpoints at the end of each epoch
143#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
144pub struct CheckpointsList {
145    // List of end of epoch checkpoints
146    pub checkpoints: Vec<u64>,
147}
148
149pub fn read_checkpoint_list(config: &Config) -> anyhow::Result<CheckpointsList> {
150    let mut checkpoints_path = config.checkpoint_summary_dir.clone();
151    checkpoints_path.push("checkpoints.yaml");
152    // Read the resulting file and parse the yaml checkpoint list
153    let reader = fs::File::open(checkpoints_path.clone())?;
154    Ok(serde_yaml::from_reader(reader)?)
155}
156
157pub fn read_checkpoint(
158    config: &Config,
159    checkpoint_number: u64,
160) -> anyhow::Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
161    read_checkpoint_general(config, checkpoint_number, None)
162}
163
164pub fn read_checkpoint_general(
165    config: &Config,
166    checkpoint_number: u64,
167    path: Option<&str>,
168) -> anyhow::Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
169    // Read the resulting file and parse the yaml checkpoint list
170    let mut checkpoint_path = config.checkpoint_summary_dir.clone();
171    if let Some(path) = path {
172        checkpoint_path.push(path);
173    }
174    // TODO why yaml? rename to .sum
175    checkpoint_path.push(format!("{}.yaml", checkpoint_number));
176    let mut reader = fs::File::open(checkpoint_path.clone())?;
177    let metadata = fs::metadata(&checkpoint_path)?;
178    let mut buffer = vec![0; metadata.len() as usize];
179    reader.read_exact(&mut buffer)?;
180    bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
181}
182
183pub fn write_checkpoint(
184    config: &Config,
185    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
186) -> anyhow::Result<()> {
187    write_checkpoint_general(config, summary, None)
188}
189
190pub fn write_checkpoint_general(
191    config: &Config,
192    summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
193    path: Option<&str>,
194) -> anyhow::Result<()> {
195    // Write the checkpoint summary to a file
196    let mut checkpoint_path = config.checkpoint_summary_dir.clone();
197    if let Some(path) = path {
198        checkpoint_path.push(path);
199    }
200    checkpoint_path.push(format!("{}.json", summary.sequence_number));
201    let writer = fs::File::create(checkpoint_path.clone())?;
202    serde_json::to_writer(&writer, &summary)
203        .map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
204    Ok(())
205}
206
207pub fn write_checkpoint_list(
208    config: &Config,
209    checkpoints_list: &CheckpointsList,
210) -> anyhow::Result<()> {
211    // Write the checkpoint list to a file
212    let mut checkpoints_path = config.checkpoint_summary_dir.clone();
213    checkpoints_path.push("checkpoints.yaml");
214    let mut writer = fs::File::create(checkpoints_path.clone())?;
215    let mut bytes = Vec::new();
216    serde_yaml::to_writer(&mut bytes, &checkpoints_list)?;
217    writer
218        .write_all(&bytes)
219        .map_err(|_| anyhow!("Unable to serialize checkpoint list"))
220}
221
222pub async fn download_checkpoint_summary(
223    config: &Config,
224    checkpoint_number: u64,
225) -> anyhow::Result<CertifiedCheckpointSummary> {
226    // TODO switch to object store once available
227    if true {
228        download_checkpoint_summary_from_fullnode(config, checkpoint_number).await
229    } else {
230        download_checkpoint_summary_from_object_store(config, checkpoint_number).await
231    }
232}
233
234pub async fn download_checkpoint_summary_from_fullnode(
235    config: &Config,
236    checkpoint_number: u64,
237) -> anyhow::Result<CertifiedCheckpointSummary> {
238    // Download the checkpoint from the server
239    let client = Client::new(config.full_node_url.as_str());
240    Ok(client.get_checkpoint_summary(checkpoint_number).await?)
241}
242
243pub async fn download_checkpoint_summary_from_object_store(
244    config: &Config,
245    checkpoint_number: u64,
246) -> anyhow::Result<CertifiedCheckpointSummary> {
247    // Download the checkpoint from the server
248
249    let url = Url::parse(&config.object_store_url)?;
250    let (dyn_store, _store_path) = parse_url(&url).unwrap();
251    let path = Path::from(format!("{}.chk", checkpoint_number));
252    let response = dyn_store.get(&path).await?;
253    let bytes = response.bytes().await?;
254    let (_, blob) = bcs::from_bytes::<(u8, CheckpointData)>(&bytes)?;
255
256    info!("Downloaded checkpoint summary: {}", checkpoint_number);
257    Ok(blob.checkpoint_summary)
258}
259
260/// Run binary search to for each end of epoch checkpoint that is missing
261/// between the latest on the list and the latest checkpoint.
262pub async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<()> {
263    // Get the local checkpoint list
264    let mut checkpoints_list: CheckpointsList = read_checkpoint_list(config)?;
265    let latest_in_list = if let Some(latest_in_list) = checkpoints_list.checkpoints.last() {
266        *latest_in_list
267    } else {
268        let last_checkpoint_in_first_epoch = query_last_checkpoint_of_epoch(config, 0).await?;
269        checkpoints_list
270            .checkpoints
271            .push(last_checkpoint_in_first_epoch);
272        write_checkpoint_list(config, &checkpoints_list)?;
273        last_checkpoint_in_first_epoch
274    };
275
276    // Download the latest in list checkpoint
277    let summary = download_checkpoint_summary(config, latest_in_list).await?;
278    let mut last_epoch = summary.epoch();
279
280    // Download the very latest checkpoint
281    let client = IotaClientBuilder::default()
282        .build(config.full_node_url.as_str())
283        .await
284        .expect("Cannot connect to full node");
285
286    let latest_seq = client
287        .read_api()
288        .get_latest_checkpoint_sequence_number()
289        .await?;
290    let latest = download_checkpoint_summary(config, latest_seq).await?;
291
292    // Sequentially record all the missing end of epoch checkpoints numbers
293    while last_epoch + 1 < latest.epoch() {
294        let target_epoch = last_epoch + 1;
295        let target_last_checkpoint_number =
296            query_last_checkpoint_of_epoch(config, target_epoch).await?;
297
298        // Add to the list
299        checkpoints_list
300            .checkpoints
301            .push(target_last_checkpoint_number);
302        write_checkpoint_list(config, &checkpoints_list)?;
303
304        // Update
305        last_epoch = target_epoch;
306
307        println!(
308            "Last Epoch: {} Last Checkpoint: {}",
309            target_epoch, target_last_checkpoint_number
310        );
311    }
312
313    Ok(())
314}
315
316pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
317    sync_checkpoint_list_to_latest(config)
318        .await
319        .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
320
321    // Get the local checkpoint list
322    let checkpoints_list: CheckpointsList = read_checkpoint_list(config)
323        .map_err(|e| anyhow!(format!("Cannot read checkpoint list: {e}")))?;
324
325    // Load the genesis committee
326    let mut genesis_path = config.checkpoint_summary_dir.clone();
327    genesis_path.push(&config.genesis_filename);
328    let genesis_committee = Genesis::load(&genesis_path)?
329        .committee()
330        .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?;
331
332    // Check the signatures of all checkpoints
333    // And download any missing ones
334
335    let mut prev_committee = genesis_committee;
336    for ckp_id in checkpoints_list.checkpoints {
337        // check if there is a file with this name ckp_id.yaml in the
338        // checkpoint_summary_dir
339        let mut checkpoint_path = config.checkpoint_summary_dir.clone();
340        checkpoint_path.push(format!("{}.yaml", ckp_id));
341
342        // If file exists read the file otherwise download it from the server
343        let summary = if checkpoint_path.exists() {
344            read_checkpoint(config, ckp_id)
345                .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
346        } else {
347            println!("downloading checkpoint summary: {ckp_id}");
348            // Download the checkpoint from the server
349            let summary = download_checkpoint_summary(config, ckp_id)
350                .await
351                .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
352            summary.clone().try_into_verified(&prev_committee)?;
353            // Write the checkpoint summary to a file
354            write_checkpoint(config, &summary)?;
355            summary
356        };
357
358        // Print the id of the checkpoint and the epoch number
359        println!(
360            "Epoch: {} Checkpoint ID: {}",
361            summary.epoch(),
362            summary.digest()
363        );
364
365        // Extract the new committee information
366        if let Some(EndOfEpochData {
367            next_epoch_committee,
368            ..
369        }) = &summary.end_of_epoch_data
370        {
371            let next_committee = next_epoch_committee.iter().cloned().collect();
372            prev_committee =
373                Committee::new(summary.epoch().checked_add(1).unwrap(), next_committee);
374        } else {
375            return Err(anyhow!(
376                "Expected all checkpoints to be end-of-epoch checkpoints"
377            ));
378        }
379    }
380
381    Ok(())
382}
383
384pub async fn get_full_checkpoint(
385    config: &Config,
386    checkpoint_number: u64,
387) -> anyhow::Result<CheckpointData> {
388    // TODO add config option
389    if true {
390        get_full_checkpoint_from_object_store(config, checkpoint_number).await
391    } else {
392        get_full_checkpoint_from_client(config, checkpoint_number).await
393    }
394}
395
396pub async fn get_full_checkpoint_from_client(
397    config: &Config,
398    checkpoint_number: u64,
399) -> anyhow::Result<CheckpointData> {
400    // Downloading the checkpoint from the server
401    let client: Client = Client::new(config.full_node_url.as_str());
402    let full_checkpoint = client.get_full_checkpoint(checkpoint_number).await?;
403
404    Ok(full_checkpoint)
405}
406
407pub async fn get_full_checkpoint_from_object_store(
408    config: &Config,
409    checkpoint_number: u64,
410) -> anyhow::Result<CheckpointData> {
411    let url = Url::parse(&config.object_store_url)
412        .map_err(|_| anyhow!("Cannot parse object store URL"))?;
413    let (dyn_store, _store_path) = parse_url(&url).unwrap();
414    let path = Path::from(format!("{}.chk", checkpoint_number));
415    info!("Request full checkpoint: {}", path);
416    let response = dyn_store
417        .get(&path)
418        .await
419        .map_err(|_| anyhow!("Cannot get full checkpoint from object store"))?;
420    let bytes = response.bytes().await?;
421    let (_, full_checkpoint) = bcs::from_bytes::<(u8, CheckpointData)>(&bytes)?;
422    Ok(full_checkpoint)
423}
424
425pub fn extract_verified_effects_and_events(
426    checkpoint: &CheckpointData,
427    committee: &Committee,
428    tid: TransactionDigest,
429) -> anyhow::Result<(TransactionEffects, Option<TransactionEvents>)> {
430    let summary = &checkpoint.checkpoint_summary;
431
432    // Verify the checkpoint summary using the committee
433    summary.verify_with_contents(committee, Some(&checkpoint.checkpoint_contents))?;
434
435    // Check the validity of the transaction
436    let contents = &checkpoint.checkpoint_contents;
437    let (matching_tx, _) = checkpoint
438        .transactions
439        .iter()
440        .zip(contents.iter())
441        // Note that we get the digest of the effects to ensure this is
442        // indeed the correct effects that are authenticated in the contents.
443        .find(|(tx, digest)| {
444            tx.effects.execution_digests() == **digest && digest.transaction == tid
445        })
446        .ok_or(anyhow!("Transaction not found in checkpoint contents"))?;
447
448    // Check the events are all correct.
449    let events_digest = matching_tx.events.as_ref().map(|events| events.digest());
450    anyhow::ensure!(
451        events_digest.as_ref() == matching_tx.effects.events_digest(),
452        "Events digest does not match"
453    );
454
455    // Since we do not check objects we do not return them
456    Ok((matching_tx.effects.clone(), matching_tx.events.clone()))
457}
458
459pub async fn get_verified_effects_and_events(
460    config: &Config,
461    tid: TransactionDigest,
462) -> anyhow::Result<(TransactionEffects, Option<TransactionEvents>)> {
463    let iota_client: iota_sdk::IotaClient = IotaClientBuilder::default()
464        .build(config.full_node_url.as_str())
465        .await
466        .unwrap();
467    let read_api = iota_client.read_api();
468
469    info!("Getting effects and events for TID: {}", tid);
470    // Lookup the transaction id and get the checkpoint sequence number
471    let options = IotaTransactionBlockResponseOptions::new();
472    let seq = read_api
473        .get_transaction_with_options(tid, options)
474        .await
475        .map_err(|e| anyhow!(format!("Cannot get transaction: {e}")))?
476        .checkpoint
477        .ok_or(anyhow!("Transaction not found"))?;
478
479    // Download the full checkpoint for this sequence number
480    let full_check_point = get_full_checkpoint(config, seq)
481        .await
482        .map_err(|e| anyhow!(format!("Cannot get full checkpoint: {e}")))?;
483
484    // Load the list of stored checkpoints
485    let checkpoints_list: CheckpointsList = read_checkpoint_list(config)?;
486
487    // find the stored checkpoint before the seq checkpoint
488    let prev_ckp_id = checkpoints_list
489        .checkpoints
490        .iter()
491        .filter(|ckp_id| **ckp_id < seq)
492        .next_back();
493
494    let committee = if let Some(prev_ckp_id) = prev_ckp_id {
495        // Read it from the store
496        let prev_ckp = read_checkpoint(config, *prev_ckp_id)?;
497
498        // Check we have the right checkpoint
499        anyhow::ensure!(
500            prev_ckp.epoch().checked_add(1).unwrap() == full_check_point.checkpoint_summary.epoch(),
501            "Checkpoint sequence number does not match. Need to Sync."
502        );
503
504        // Get the committee from the previous checkpoint
505        let current_committee = prev_ckp
506            .end_of_epoch_data
507            .as_ref()
508            .ok_or(anyhow!(
509                "Expected all checkpoints to be end-of-epoch checkpoints"
510            ))?
511            .next_epoch_committee
512            .iter()
513            .cloned()
514            .collect();
515
516        // Make a committee object using this
517        Committee::new(prev_ckp.epoch().checked_add(1).unwrap(), current_committee)
518    } else {
519        // Since we did not find a small committee checkpoint we use the genesis
520        let mut genesis_path = config.checkpoint_summary_dir.clone();
521        genesis_path.push(&config.genesis_filename);
522        Genesis::load(&genesis_path)?
523            .committee()
524            .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?
525    };
526
527    info!("Extracting effects and events for TID: {}", tid);
528    extract_verified_effects_and_events(&full_check_point, &committee, tid)
529        .map_err(|e| anyhow!(format!("Cannot extract effects and events: {e}")))
530}
531
532pub async fn get_verified_object(config: &Config, id: ObjectID) -> anyhow::Result<Object> {
533    let iota_client: iota_sdk::IotaClient = IotaClientBuilder::default()
534        .build(config.full_node_url.as_str())
535        .await
536        .unwrap();
537
538    info!("Getting object: {}", id);
539
540    let read_api = iota_client.read_api();
541    let object_json = read_api
542        .get_object_with_options(id, IotaObjectDataOptions::bcs_lossless())
543        .await
544        .expect("Cannot get object");
545    let object = object_json
546        .into_object()
547        .expect("Cannot make into object data");
548    let object: Object = object.try_into().expect("Cannot reconstruct object");
549
550    // Need to authenticate this object
551    let (effects, _) = get_verified_effects_and_events(config, object.previous_transaction)
552        .await
553        .expect("Cannot get effects and events");
554
555    // check that this object ID, version and hash is in the effects
556    let target_object_ref = object.compute_object_reference();
557    effects
558        .all_changed_objects()
559        .iter()
560        .find(|object_ref| object_ref.0 == target_object_ref)
561        .ok_or(anyhow!("Object not found"))
562        .expect("Object not found");
563
564    Ok(object)
565}