iota_indexer/store/
package_resolver.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl};
9use iota_package_resolver::{Package, PackageStore, error::Error as PackageResolverError};
10use iota_types::object::Object;
11use move_core_types::account_address::AccountAddress;
12
13use crate::{db::ConnectionPool, errors::IndexerError, schema::objects, store::diesel_macro::*};
14
15/// A package resolver that reads packages from the database.
16pub struct IndexerStorePackageResolver {
17    cp: ConnectionPool,
18}
19
20impl Clone for IndexerStorePackageResolver {
21    fn clone(&self) -> IndexerStorePackageResolver {
22        Self {
23            cp: self.cp.clone(),
24        }
25    }
26}
27
28impl IndexerStorePackageResolver {
29    pub fn new(cp: ConnectionPool) -> Self {
30        Self { cp }
31    }
32}
33
34#[async_trait]
35impl PackageStore for IndexerStorePackageResolver {
36    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
37        let pkg = self
38            .get_package_from_db_in_blocking_task(id)
39            .await
40            .map_err(|e| PackageResolverError::Store {
41                store: "PostgresDB",
42                source: Arc::new(e),
43            })?;
44        Ok(Arc::new(pkg))
45    }
46}
47
48impl IndexerStorePackageResolver {
49    fn get_package_from_db(&self, id: AccountAddress) -> Result<Package, IndexerError> {
50        let Some(bcs) = read_only_blocking!(&self.cp, |conn| {
51            let query = objects::dsl::objects
52                .select(objects::dsl::serialized_object)
53                .filter(objects::dsl::object_id.eq(id.to_vec()));
54            query.get_result::<Vec<u8>>(conn).optional()
55        })?
56        else {
57            return Err(IndexerError::PostgresRead(format!(
58                "Package not found in DB: {:?}",
59                id
60            )));
61        };
62        let object = bcs::from_bytes::<Object>(&bcs)?;
63        Package::read_from_object(&object).map_err(|e| {
64            IndexerError::PostgresRead(format!("Failed parsing object to package: {:?}", e))
65        })
66    }
67
68    async fn get_package_from_db_in_blocking_task(
69        &self,
70        id: AccountAddress,
71    ) -> Result<Package, IndexerError> {
72        let this = self.clone();
73        tokio::task::spawn_blocking(move || this.get_package_from_db(id)).await?
74    }
75}