1use std::{
5 collections::HashMap,
6 fs,
7 io::{Read, Write},
8 path::PathBuf,
9 sync::{Arc, Mutex},
10};
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use clap::Subcommand;
15use iota_config::genesis::Genesis;
16use iota_json_rpc_types::{IotaObjectDataOptions, IotaTransactionBlockResponseOptions};
17use iota_package_resolver::{Package, PackageStore, Result as ResolverResult};
18use iota_rest_api::{CheckpointData, Client};
19use iota_sdk::IotaClientBuilder;
20use iota_types::{
21 base_types::ObjectID,
22 committee::Committee,
23 crypto::AuthorityQuorumSignInfo,
24 digests::TransactionDigest,
25 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
26 message_envelope::Envelope,
27 messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary, EndOfEpochData},
28 object::Object,
29};
30use log::info;
31use move_core_types::account_address::AccountAddress;
32use object_store::{parse_url, path::Path};
33use serde_json::{Value, json};
34use url::Url;
35
36pub struct RemotePackageStore {
37 config: Config,
38 cache: Mutex<HashMap<AccountAddress, Arc<Package>>>,
39}
40
41impl RemotePackageStore {
42 pub fn new(config: Config) -> Self {
43 Self {
44 config,
45 cache: Mutex::new(HashMap::new()),
46 }
47 }
48}
49
50#[async_trait]
51impl PackageStore for RemotePackageStore {
52 async fn fetch(&self, id: AccountAddress) -> ResolverResult<Arc<Package>> {
55 if let Some(package) = self.cache.lock().unwrap().get(&id) {
57 info!("Fetch Package: {} cache hit", id);
58 return Ok(package.clone());
59 }
60
61 info!("Fetch Package: {}", id);
62
63 let object = get_verified_object(&self.config, id.into()).await.unwrap();
64 let package = Arc::new(Package::read_from_object(&object).unwrap());
65
66 self.cache.lock().unwrap().insert(id, package.clone());
68
69 Ok(package)
70 }
71}
72
73#[derive(Subcommand, Debug)]
74pub enum SCommands {
75 Sync {},
77
78 Transaction {
80 #[arg(short, long, value_name = "TID")]
82 tid: String,
83 },
84
85 Object {
87 #[arg(short, long, value_name = "OID")]
89 oid: String,
90 },
91}
92
93#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
96pub struct Config {
97 pub full_node_url: String,
99
100 pub checkpoint_summary_dir: PathBuf,
102
103 pub genesis_filename: PathBuf,
105
106 pub object_store_url: String,
108
109 pub graphql_url: String,
111}
112
113async fn query_last_checkpoint_of_epoch(config: &Config, epoch_id: u64) -> anyhow::Result<u64> {
114 let query = json!({
116 "query": "query ($epochID: Int) { epoch(id: $epochID) { checkpoints(last: 1) { nodes { sequenceNumber } } } }",
117 "variables": { "epochID": epoch_id }
118 });
119
120 let client = reqwest::Client::new();
122 let resp = client
123 .post(&config.graphql_url)
124 .header("Content-Type", "application/json")
125 .body(query.to_string())
126 .send()
127 .await
128 .expect("Cannot connect to graphql")
129 .text()
130 .await
131 .expect("Cannot parse response");
132
133 let v: Value = serde_json::from_str(resp.as_str()).expect("Incorrect JSON response");
135 let checkpoint_number = v["data"]["epoch"]["checkpoints"]["nodes"][0]["sequenceNumber"]
136 .as_u64()
137 .unwrap();
138
139 Ok(checkpoint_number)
140}
141
142#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
144pub struct CheckpointsList {
145 pub checkpoints: Vec<u64>,
147}
148
149pub fn read_checkpoint_list(config: &Config) -> anyhow::Result<CheckpointsList> {
150 let mut checkpoints_path = config.checkpoint_summary_dir.clone();
151 checkpoints_path.push("checkpoints.yaml");
152 let reader = fs::File::open(checkpoints_path.clone())?;
154 Ok(serde_yaml::from_reader(reader)?)
155}
156
157pub fn read_checkpoint(
158 config: &Config,
159 checkpoint_number: u64,
160) -> anyhow::Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
161 read_checkpoint_general(config, checkpoint_number, None)
162}
163
164pub fn read_checkpoint_general(
165 config: &Config,
166 checkpoint_number: u64,
167 path: Option<&str>,
168) -> anyhow::Result<Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>> {
169 let mut checkpoint_path = config.checkpoint_summary_dir.clone();
171 if let Some(path) = path {
172 checkpoint_path.push(path);
173 }
174 checkpoint_path.push(format!("{}.yaml", checkpoint_number));
176 let mut reader = fs::File::open(checkpoint_path.clone())?;
177 let metadata = fs::metadata(&checkpoint_path)?;
178 let mut buffer = vec![0; metadata.len() as usize];
179 reader.read_exact(&mut buffer)?;
180 bcs::from_bytes(&buffer).map_err(|_| anyhow!("Unable to parse checkpoint file"))
181}
182
183pub fn write_checkpoint(
184 config: &Config,
185 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
186) -> anyhow::Result<()> {
187 write_checkpoint_general(config, summary, None)
188}
189
190pub fn write_checkpoint_general(
191 config: &Config,
192 summary: &Envelope<CheckpointSummary, AuthorityQuorumSignInfo<true>>,
193 path: Option<&str>,
194) -> anyhow::Result<()> {
195 let mut checkpoint_path = config.checkpoint_summary_dir.clone();
197 if let Some(path) = path {
198 checkpoint_path.push(path);
199 }
200 checkpoint_path.push(format!("{}.json", summary.sequence_number));
201 let writer = fs::File::create(checkpoint_path.clone())?;
202 serde_json::to_writer(&writer, &summary)
203 .map_err(|_| anyhow!("Unable to serialize checkpoint summary"))?;
204 Ok(())
205}
206
207pub fn write_checkpoint_list(
208 config: &Config,
209 checkpoints_list: &CheckpointsList,
210) -> anyhow::Result<()> {
211 let mut checkpoints_path = config.checkpoint_summary_dir.clone();
213 checkpoints_path.push("checkpoints.yaml");
214 let mut writer = fs::File::create(checkpoints_path.clone())?;
215 let mut bytes = Vec::new();
216 serde_yaml::to_writer(&mut bytes, &checkpoints_list)?;
217 writer
218 .write_all(&bytes)
219 .map_err(|_| anyhow!("Unable to serialize checkpoint list"))
220}
221
222pub async fn download_checkpoint_summary(
223 config: &Config,
224 checkpoint_number: u64,
225) -> anyhow::Result<CertifiedCheckpointSummary> {
226 if true {
228 download_checkpoint_summary_from_fullnode(config, checkpoint_number).await
229 } else {
230 download_checkpoint_summary_from_object_store(config, checkpoint_number).await
231 }
232}
233
234pub async fn download_checkpoint_summary_from_fullnode(
235 config: &Config,
236 checkpoint_number: u64,
237) -> anyhow::Result<CertifiedCheckpointSummary> {
238 let client = Client::new(config.full_node_url.as_str());
240 Ok(client.get_checkpoint_summary(checkpoint_number).await?)
241}
242
243pub async fn download_checkpoint_summary_from_object_store(
244 config: &Config,
245 checkpoint_number: u64,
246) -> anyhow::Result<CertifiedCheckpointSummary> {
247 let url = Url::parse(&config.object_store_url)?;
250 let (dyn_store, _store_path) = parse_url(&url).unwrap();
251 let path = Path::from(format!("{}.chk", checkpoint_number));
252 let response = dyn_store.get(&path).await?;
253 let bytes = response.bytes().await?;
254 let (_, blob) = bcs::from_bytes::<(u8, CheckpointData)>(&bytes)?;
255
256 info!("Downloaded checkpoint summary: {}", checkpoint_number);
257 Ok(blob.checkpoint_summary)
258}
259
260pub async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<()> {
263 let mut checkpoints_list: CheckpointsList = read_checkpoint_list(config)?;
265 let latest_in_list = if let Some(latest_in_list) = checkpoints_list.checkpoints.last() {
266 *latest_in_list
267 } else {
268 let last_checkpoint_in_first_epoch = query_last_checkpoint_of_epoch(config, 0).await?;
269 checkpoints_list
270 .checkpoints
271 .push(last_checkpoint_in_first_epoch);
272 write_checkpoint_list(config, &checkpoints_list)?;
273 last_checkpoint_in_first_epoch
274 };
275
276 let summary = download_checkpoint_summary(config, latest_in_list).await?;
278 let mut last_epoch = summary.epoch();
279
280 let client = IotaClientBuilder::default()
282 .build(config.full_node_url.as_str())
283 .await
284 .expect("Cannot connect to full node");
285
286 let latest_seq = client
287 .read_api()
288 .get_latest_checkpoint_sequence_number()
289 .await?;
290 let latest = download_checkpoint_summary(config, latest_seq).await?;
291
292 while last_epoch + 1 < latest.epoch() {
294 let target_epoch = last_epoch + 1;
295 let target_last_checkpoint_number =
296 query_last_checkpoint_of_epoch(config, target_epoch).await?;
297
298 checkpoints_list
300 .checkpoints
301 .push(target_last_checkpoint_number);
302 write_checkpoint_list(config, &checkpoints_list)?;
303
304 last_epoch = target_epoch;
306
307 println!(
308 "Last Epoch: {} Last Checkpoint: {}",
309 target_epoch, target_last_checkpoint_number
310 );
311 }
312
313 Ok(())
314}
315
316pub async fn check_and_sync_checkpoints(config: &Config) -> anyhow::Result<()> {
317 sync_checkpoint_list_to_latest(config)
318 .await
319 .map_err(|e| anyhow!(format!("Cannot refresh list: {e}")))?;
320
321 let checkpoints_list: CheckpointsList = read_checkpoint_list(config)
323 .map_err(|e| anyhow!(format!("Cannot read checkpoint list: {e}")))?;
324
325 let mut genesis_path = config.checkpoint_summary_dir.clone();
327 genesis_path.push(&config.genesis_filename);
328 let genesis_committee = Genesis::load(&genesis_path)?
329 .committee()
330 .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?;
331
332 let mut prev_committee = genesis_committee;
336 for ckp_id in checkpoints_list.checkpoints {
337 let mut checkpoint_path = config.checkpoint_summary_dir.clone();
340 checkpoint_path.push(format!("{}.yaml", ckp_id));
341
342 let summary = if checkpoint_path.exists() {
344 read_checkpoint(config, ckp_id)
345 .map_err(|e| anyhow!(format!("Cannot read checkpoint: {e}")))?
346 } else {
347 println!("downloading checkpoint summary: {ckp_id}");
348 let summary = download_checkpoint_summary(config, ckp_id)
350 .await
351 .map_err(|e| anyhow!(format!("Cannot download summary: {e}")))?;
352 summary.clone().try_into_verified(&prev_committee)?;
353 write_checkpoint(config, &summary)?;
355 summary
356 };
357
358 println!(
360 "Epoch: {} Checkpoint ID: {}",
361 summary.epoch(),
362 summary.digest()
363 );
364
365 if let Some(EndOfEpochData {
367 next_epoch_committee,
368 ..
369 }) = &summary.end_of_epoch_data
370 {
371 let next_committee = next_epoch_committee.iter().cloned().collect();
372 prev_committee =
373 Committee::new(summary.epoch().checked_add(1).unwrap(), next_committee);
374 } else {
375 return Err(anyhow!(
376 "Expected all checkpoints to be end-of-epoch checkpoints"
377 ));
378 }
379 }
380
381 Ok(())
382}
383
384pub async fn get_full_checkpoint(
385 config: &Config,
386 checkpoint_number: u64,
387) -> anyhow::Result<CheckpointData> {
388 if true {
390 get_full_checkpoint_from_object_store(config, checkpoint_number).await
391 } else {
392 get_full_checkpoint_from_client(config, checkpoint_number).await
393 }
394}
395
396pub async fn get_full_checkpoint_from_client(
397 config: &Config,
398 checkpoint_number: u64,
399) -> anyhow::Result<CheckpointData> {
400 let client: Client = Client::new(config.full_node_url.as_str());
402 let full_checkpoint = client.get_full_checkpoint(checkpoint_number).await?;
403
404 Ok(full_checkpoint)
405}
406
407pub async fn get_full_checkpoint_from_object_store(
408 config: &Config,
409 checkpoint_number: u64,
410) -> anyhow::Result<CheckpointData> {
411 let url = Url::parse(&config.object_store_url)
412 .map_err(|_| anyhow!("Cannot parse object store URL"))?;
413 let (dyn_store, _store_path) = parse_url(&url).unwrap();
414 let path = Path::from(format!("{}.chk", checkpoint_number));
415 info!("Request full checkpoint: {}", path);
416 let response = dyn_store
417 .get(&path)
418 .await
419 .map_err(|_| anyhow!("Cannot get full checkpoint from object store"))?;
420 let bytes = response.bytes().await?;
421 let (_, full_checkpoint) = bcs::from_bytes::<(u8, CheckpointData)>(&bytes)?;
422 Ok(full_checkpoint)
423}
424
425pub fn extract_verified_effects_and_events(
426 checkpoint: &CheckpointData,
427 committee: &Committee,
428 tid: TransactionDigest,
429) -> anyhow::Result<(TransactionEffects, Option<TransactionEvents>)> {
430 let summary = &checkpoint.checkpoint_summary;
431
432 summary.verify_with_contents(committee, Some(&checkpoint.checkpoint_contents))?;
434
435 let contents = &checkpoint.checkpoint_contents;
437 let (matching_tx, _) = checkpoint
438 .transactions
439 .iter()
440 .zip(contents.iter())
441 .find(|(tx, digest)| {
444 tx.effects.execution_digests() == **digest && digest.transaction == tid
445 })
446 .ok_or(anyhow!("Transaction not found in checkpoint contents"))?;
447
448 let events_digest = matching_tx.events.as_ref().map(|events| events.digest());
450 anyhow::ensure!(
451 events_digest.as_ref() == matching_tx.effects.events_digest(),
452 "Events digest does not match"
453 );
454
455 Ok((matching_tx.effects.clone(), matching_tx.events.clone()))
457}
458
459pub async fn get_verified_effects_and_events(
460 config: &Config,
461 tid: TransactionDigest,
462) -> anyhow::Result<(TransactionEffects, Option<TransactionEvents>)> {
463 let iota_client: iota_sdk::IotaClient = IotaClientBuilder::default()
464 .build(config.full_node_url.as_str())
465 .await
466 .unwrap();
467 let read_api = iota_client.read_api();
468
469 info!("Getting effects and events for TID: {}", tid);
470 let options = IotaTransactionBlockResponseOptions::new();
472 let seq = read_api
473 .get_transaction_with_options(tid, options)
474 .await
475 .map_err(|e| anyhow!(format!("Cannot get transaction: {e}")))?
476 .checkpoint
477 .ok_or(anyhow!("Transaction not found"))?;
478
479 let full_check_point = get_full_checkpoint(config, seq)
481 .await
482 .map_err(|e| anyhow!(format!("Cannot get full checkpoint: {e}")))?;
483
484 let checkpoints_list: CheckpointsList = read_checkpoint_list(config)?;
486
487 let prev_ckp_id = checkpoints_list
489 .checkpoints
490 .iter()
491 .filter(|ckp_id| **ckp_id < seq)
492 .next_back();
493
494 let committee = if let Some(prev_ckp_id) = prev_ckp_id {
495 let prev_ckp = read_checkpoint(config, *prev_ckp_id)?;
497
498 anyhow::ensure!(
500 prev_ckp.epoch().checked_add(1).unwrap() == full_check_point.checkpoint_summary.epoch(),
501 "Checkpoint sequence number does not match. Need to Sync."
502 );
503
504 let current_committee = prev_ckp
506 .end_of_epoch_data
507 .as_ref()
508 .ok_or(anyhow!(
509 "Expected all checkpoints to be end-of-epoch checkpoints"
510 ))?
511 .next_epoch_committee
512 .iter()
513 .cloned()
514 .collect();
515
516 Committee::new(prev_ckp.epoch().checked_add(1).unwrap(), current_committee)
518 } else {
519 let mut genesis_path = config.checkpoint_summary_dir.clone();
521 genesis_path.push(&config.genesis_filename);
522 Genesis::load(&genesis_path)?
523 .committee()
524 .map_err(|e| anyhow!(format!("Cannot load Genesis: {e}")))?
525 };
526
527 info!("Extracting effects and events for TID: {}", tid);
528 extract_verified_effects_and_events(&full_check_point, &committee, tid)
529 .map_err(|e| anyhow!(format!("Cannot extract effects and events: {e}")))
530}
531
532pub async fn get_verified_object(config: &Config, id: ObjectID) -> anyhow::Result<Object> {
533 let iota_client: iota_sdk::IotaClient = IotaClientBuilder::default()
534 .build(config.full_node_url.as_str())
535 .await
536 .unwrap();
537
538 info!("Getting object: {}", id);
539
540 let read_api = iota_client.read_api();
541 let object_json = read_api
542 .get_object_with_options(id, IotaObjectDataOptions::bcs_lossless())
543 .await
544 .expect("Cannot get object");
545 let object = object_json
546 .into_object()
547 .expect("Cannot make into object data");
548 let object: Object = object.try_into().expect("Cannot reconstruct object");
549
550 let (effects, _) = get_verified_effects_and_events(config, object.previous_transaction)
552 .await
553 .expect("Cannot get effects and events");
554
555 let target_object_ref = object.compute_object_reference();
557 effects
558 .all_changed_objects()
559 .iter()
560 .find(|object_ref| object_ref.0 == target_object_ref)
561 .ok_or(anyhow!("Object not found"))
562 .expect("Object not found");
563
564 Ok(object)
565}