iota_analytics_indexer/
package_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{path::Path, sync::Arc};
6
7use async_trait::async_trait;
8use iota_package_resolver::{
9    Package, PackageStore, PackageStoreWithLruCache, Result, error::Error as PackageResolverError,
10};
11use iota_rest_api::Client;
12use iota_types::{base_types::ObjectID, object::Object};
13use move_core_types::account_address::AccountAddress;
14use thiserror::Error;
15use typed_store::{
16    DBMapUtils, Map, TypedStoreError,
17    rocks::{DBMap, MetricConf},
18    traits::{TableSummary, TypedStoreDebug},
19};
20
21const STORE: &str = "RocksDB";
22
23#[derive(Error, Debug)]
24pub enum Error {
25    #[error("{0}")]
26    TypedStore(#[from] TypedStoreError),
27}
28
29impl From<Error> for PackageResolverError {
30    fn from(source: Error) -> Self {
31        match source {
32            Error::TypedStore(store_error) => Self::Store {
33                store: STORE,
34                source: Arc::new(store_error),
35            },
36        }
37    }
38}
39
40#[derive(DBMapUtils)]
41pub struct PackageStoreTables {
42    pub(crate) packages: DBMap<ObjectID, Object>,
43}
44
45impl PackageStoreTables {
46    pub fn new(path: &Path) -> Arc<Self> {
47        Arc::new(Self::open_tables_read_write(
48            path.to_path_buf(),
49            MetricConf::new("package"),
50            None,
51            None,
52        ))
53    }
54    pub(crate) fn update(&self, package: &Object) -> Result<()> {
55        let mut batch = self.packages.batch();
56        batch
57            .insert_batch(&self.packages, std::iter::once((package.id(), package)))
58            .map_err(Error::TypedStore)?;
59        batch.write().map_err(Error::TypedStore)?;
60        Ok(())
61    }
62}
63
64/// Store which keeps package objects in a local rocksdb store. It is expected
65/// that this store is kept updated with latest version of package objects while
66/// iterating over checkpoints. If the local db is missing (or gets deleted),
67/// packages are fetched from a full node and local store is updated
68#[derive(Clone)]
69pub struct LocalDBPackageStore {
70    package_store_tables: Arc<PackageStoreTables>,
71    fallback_client: Client,
72}
73
74impl LocalDBPackageStore {
75    pub fn new(path: &Path, rest_url: &str) -> Self {
76        Self {
77            package_store_tables: PackageStoreTables::new(path),
78            fallback_client: Client::new(rest_url),
79        }
80    }
81
82    pub fn update(&self, object: &Object) -> Result<()> {
83        let Some(_package) = object.data.try_as_package() else {
84            return Ok(());
85        };
86        self.package_store_tables.update(object)?;
87        Ok(())
88    }
89
90    pub async fn get(&self, id: AccountAddress) -> Result<Object> {
91        let object = if let Some(object) = self
92            .package_store_tables
93            .packages
94            .get(&ObjectID::from(id))
95            .map_err(Error::TypedStore)?
96        {
97            object
98        } else {
99            let object = self
100                .fallback_client
101                .get_object(ObjectID::from(id))
102                .await
103                .map_err(|_| PackageResolverError::PackageNotFound(id))?;
104            self.update(&object)?;
105            object
106        };
107        Ok(object)
108    }
109}
110
111#[async_trait]
112impl PackageStore for LocalDBPackageStore {
113    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>> {
114        let object = self.get(id).await?;
115        Ok(Arc::new(Package::read_from_object(&object)?))
116    }
117}
118
119pub(crate) type PackageCache = PackageStoreWithLruCache<LocalDBPackageStore>;