iota_graphql_rpc/types/
move_package.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    dataloader::Loader,
10    *,
11};
12use diesel::{
13    BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable,
14    prelude::QueryableByName,
15};
16use iota_indexer::{models::objects::StoredHistoryObject, schema::packages};
17use iota_package_resolver::{Package as ParsedMovePackage, error::Error as PackageCacheError};
18use iota_types::{is_system_package, move_package::MovePackage as NativeMovePackage, object::Data};
19use serde::{Deserialize, Serialize};
20
21use crate::{
22    connection::ScanConnection,
23    consistency::{Checkpointed, ConsistentNamedCursor},
24    data::{DataLoader, Db, DbConnection, QueryExecutor},
25    error::Error,
26    filter, query,
27    raw_query::RawQuery,
28    types::{
29        balance::{self, Balance},
30        base64::Base64,
31        big_int::BigInt,
32        coin::Coin,
33        cursor::{BcsCursor, JsonCursor, Page, RawPaginated, ScanLimited, Target},
34        iota_address::{IotaAddress, addr},
35        iota_names_registration::{DomainFormat, IotaNamesRegistration},
36        move_module::MoveModule,
37        move_object::MoveObject,
38        object::{self, Object, ObjectFilter, ObjectImpl, ObjectOwner, ObjectStatus},
39        owner::OwnerImpl,
40        stake::StakedIota,
41        transaction_block::{self, TransactionBlock, TransactionBlockFilter},
42        type_filter::ExactTypeFilter,
43        uint53::UInt53,
44    },
45};
46
47#[derive(Clone)]
48pub(crate) struct MovePackage {
49    /// Representation of this Move Object as a generic Object.
50    pub super_: Object,
51
52    /// Move-object-specific data, extracted from the native representation at
53    /// `graphql_object.native_object.data`.
54    pub native: NativeMovePackage,
55}
56
57/// Filter for paginating `MovePackage`s that were created within a range of
58/// checkpoints.
59#[derive(InputObject, Debug, Default, Clone)]
60pub(crate) struct MovePackageCheckpointFilter {
61    /// Fetch packages that were published strictly after this checkpoint.
62    /// Omitting this fetches packages published since genesis.
63    pub after_checkpoint: Option<UInt53>,
64
65    /// Fetch packages that were published strictly before this checkpoint.
66    /// Omitting this fetches packages published up to the latest checkpoint
67    /// (inclusive).
68    pub before_checkpoint: Option<UInt53>,
69}
70
71/// Filter for paginating versions of a given `MovePackage`.
72#[derive(InputObject, Debug, Default, Clone)]
73pub(crate) struct MovePackageVersionFilter {
74    /// Fetch versions of this package that are strictly newer than this
75    /// version. Omitting this fetches versions since the original version.
76    pub after_version: Option<UInt53>,
77
78    /// Fetch versions of this package that are strictly older than this
79    /// version. Omitting this fetches versions up to the latest version
80    /// (inclusive).
81    pub before_version: Option<UInt53>,
82}
83
84/// Filter for a point query of a MovePackage, supporting querying different
85/// versions of a package by their version. Note that different versions of the
86/// same user package exist at different IDs to each other, so this is different
87/// from looking up the historical version of an object.
88pub(crate) enum PackageLookup {
89    /// Get the package at the given address, if it was created before the given
90    /// checkpoint.
91    ById { checkpoint_viewed_at: u64 },
92
93    /// Get the package whose original ID matches the storage ID of the package
94    /// at the given address, but whose version is `version`.
95    Versioned {
96        version: u64,
97        checkpoint_viewed_at: u64,
98    },
99
100    /// Get the package whose original ID matches the storage ID of the package
101    /// at the given address, but that has the max version at the given
102    /// checkpoint.
103    Latest { checkpoint_viewed_at: u64 },
104}
105
106/// Information used by a package to link to a specific version of its
107/// dependency.
108#[derive(SimpleObject)]
109struct Linkage {
110    /// The ID on-chain of the first version of the dependency.
111    original_id: IotaAddress,
112
113    /// The ID on-chain of the version of the dependency that this package
114    /// depends on.
115    upgraded_id: IotaAddress,
116
117    /// The version of the dependency that this package depends on.
118    version: UInt53,
119}
120
121/// Information about which previous versions of a package introduced its types.
122#[derive(SimpleObject)]
123struct TypeOrigin {
124    /// Module defining the type.
125    module: String,
126
127    /// Name of the struct.
128    #[graphql(name = "struct")]
129    struct_: String,
130
131    /// The storage ID of the package that first defined this type.
132    defining_id: IotaAddress,
133}
134
135/// A wrapper around the stored representation of a package, used to implement
136/// pagination-related traits.
137#[derive(Selectable, QueryableByName)]
138#[diesel(table_name = packages)]
139struct StoredHistoryPackage {
140    original_id: Vec<u8>,
141    #[diesel(embed)]
142    object: StoredHistoryObject,
143}
144
145pub(crate) struct MovePackageDowncastError;
146
147pub(crate) type CModule = JsonCursor<ConsistentNamedCursor>;
148pub(crate) type Cursor = BcsCursor<PackageCursor>;
149
150/// The inner struct for the `MovePackage` cursor. The package is identified by
151/// the checkpoint it was created in, its original ID, and its version, and the
152/// `checkpoint_viewed_at` specifies the checkpoint snapshot that the data came
153/// from.
154#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
155pub(crate) struct PackageCursor {
156    pub checkpoint_sequence_number: u64,
157    pub original_id: Vec<u8>,
158    pub package_version: u64,
159    pub checkpoint_viewed_at: u64,
160}
161
162/// `DataLoader` key for fetching the storage ID of the (user) package that
163/// shares an original (aka runtime) ID with the package stored at `package_id`,
164/// and whose version is `version`.
165///
166/// Note that this is different from looking up the historical version of an
167/// object -- the query returns the ID of the package (each version of a user
168/// package is at a different ID) -- and it does not work for system packages
169/// (whose versions do all reside under the same ID).
170#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
171struct PackageVersionKey {
172    address: IotaAddress,
173    version: u64,
174}
175
176/// `DataLoader` key for fetching the latest version of a user package: The
177/// package with the largest version whose original ID matches the original ID
178/// of the package at `address`.
179#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
180struct LatestKey {
181    address: IotaAddress,
182    checkpoint_viewed_at: u64,
183}
184
185/// A MovePackage is a kind of Move object that represents code that has been
186/// published on chain. It exposes information about its modules, type
187/// definitions, functions, and dependencies.
188#[Object]
189impl MovePackage {
190    pub(crate) async fn address(&self) -> IotaAddress {
191        OwnerImpl::from(&self.super_).address().await
192    }
193
194    /// Objects owned by this package, optionally `filter`-ed.
195    ///
196    /// Note that objects owned by a package are inaccessible, because packages
197    /// are immutable and cannot be owned by an address.
198    pub(crate) async fn objects(
199        &self,
200        ctx: &Context<'_>,
201        first: Option<u64>,
202        after: Option<object::Cursor>,
203        last: Option<u64>,
204        before: Option<object::Cursor>,
205        filter: Option<ObjectFilter>,
206    ) -> Result<Connection<String, MoveObject>> {
207        OwnerImpl::from(&self.super_)
208            .objects(ctx, first, after, last, before, filter)
209            .await
210    }
211
212    /// Total balance of all coins with marker type owned by this package. If
213    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
214    ///
215    /// Note that coins owned by a package are inaccessible, because packages
216    /// are immutable and cannot be owned by an address.
217    pub(crate) async fn balance(
218        &self,
219        ctx: &Context<'_>,
220        type_: Option<ExactTypeFilter>,
221    ) -> Result<Option<Balance>> {
222        OwnerImpl::from(&self.super_).balance(ctx, type_).await
223    }
224
225    /// The balances of all coin types owned by this package.
226    ///
227    /// Note that coins owned by a package are inaccessible, because packages
228    /// are immutable and cannot be owned by an address.
229    pub(crate) async fn balances(
230        &self,
231        ctx: &Context<'_>,
232        first: Option<u64>,
233        after: Option<balance::Cursor>,
234        last: Option<u64>,
235        before: Option<balance::Cursor>,
236    ) -> Result<Connection<String, Balance>> {
237        OwnerImpl::from(&self.super_)
238            .balances(ctx, first, after, last, before)
239            .await
240    }
241
242    /// The coin objects owned by this package.
243    ///
244    /// `type` is a filter on the coin's type parameter, defaulting to
245    /// `0x2::iota::IOTA`.
246    ///
247    /// Note that coins owned by a package are inaccessible, because packages
248    /// are immutable and cannot be owned by an address.
249    pub(crate) async fn coins(
250        &self,
251        ctx: &Context<'_>,
252        first: Option<u64>,
253        after: Option<object::Cursor>,
254        last: Option<u64>,
255        before: Option<object::Cursor>,
256        type_: Option<ExactTypeFilter>,
257    ) -> Result<Connection<String, Coin>> {
258        OwnerImpl::from(&self.super_)
259            .coins(ctx, first, after, last, before, type_)
260            .await
261    }
262
263    /// The `0x3::staking_pool::StakedIota` objects owned by this package.
264    ///
265    /// Note that objects owned by a package are inaccessible, because packages
266    /// are immutable and cannot be owned by an address.
267    pub(crate) async fn staked_iotas(
268        &self,
269        ctx: &Context<'_>,
270        first: Option<u64>,
271        after: Option<object::Cursor>,
272        last: Option<u64>,
273        before: Option<object::Cursor>,
274    ) -> Result<Connection<String, StakedIota>> {
275        OwnerImpl::from(&self.super_)
276            .staked_iotas(ctx, first, after, last, before)
277            .await
278    }
279
280    /// The domain explicitly configured as the default domain pointing to this
281    /// object.
282    pub(crate) async fn iota_names_default_name(
283        &self,
284        ctx: &Context<'_>,
285        format: Option<DomainFormat>,
286    ) -> Result<Option<String>> {
287        OwnerImpl::from(&self.super_)
288            .iota_names_default_name(ctx, format)
289            .await
290    }
291
292    /// The IotaNamesRegistration NFTs owned by this package. These grant the
293    /// owner the capability to manage the associated domain.
294    ///
295    /// Note that objects owned by a package are inaccessible, because packages
296    /// are immutable and cannot be owned by an address.
297    pub(crate) async fn iota_names_registrations(
298        &self,
299        ctx: &Context<'_>,
300        first: Option<u64>,
301        after: Option<object::Cursor>,
302        last: Option<u64>,
303        before: Option<object::Cursor>,
304    ) -> Result<Connection<String, IotaNamesRegistration>> {
305        OwnerImpl::from(&self.super_)
306            .iota_names_registrations(ctx, first, after, last, before)
307            .await
308    }
309
310    pub(crate) async fn version(&self) -> UInt53 {
311        ObjectImpl(&self.super_).version().await
312    }
313
314    /// The current status of the object as read from the off-chain store. The
315    /// possible states are: NOT_INDEXED, the object is loaded from
316    /// serialized data, such as the contents of a genesis or system package
317    /// upgrade transaction. LIVE, the version returned is the most recent for
318    /// the object, and it is not deleted or wrapped at that version.
319    /// HISTORICAL, the object was referenced at a specific version or
320    /// checkpoint, so is fetched from historical tables and may not be the
321    /// latest version of the object. WRAPPED_OR_DELETED, the object is deleted
322    /// or wrapped and only partial information can be loaded."
323    pub(crate) async fn status(&self) -> ObjectStatus {
324        ObjectImpl(&self.super_).status().await
325    }
326
327    /// 32-byte hash that identifies the package's contents, encoded as a Base58
328    /// string.
329    pub(crate) async fn digest(&self) -> Option<String> {
330        ObjectImpl(&self.super_).digest().await
331    }
332
333    /// The owner type of this object: Immutable, Shared, Parent, Address
334    /// Packages are always Immutable.
335    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
336        ObjectImpl(&self.super_).owner(ctx).await
337    }
338
339    /// The transaction block that published or upgraded this package.
340    pub(crate) async fn previous_transaction_block(
341        &self,
342        ctx: &Context<'_>,
343    ) -> Result<Option<TransactionBlock>> {
344        ObjectImpl(&self.super_)
345            .previous_transaction_block(ctx)
346            .await
347    }
348
349    /// The amount of IOTA we would rebate if this object gets deleted or
350    /// mutated. This number is recalculated based on the present storage
351    /// gas price.
352    ///
353    /// Note that packages cannot be deleted or mutated, so this number is
354    /// provided purely for reference.
355    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
356        ObjectImpl(&self.super_).storage_rebate().await
357    }
358
359    /// The transaction blocks that sent objects to this package.
360    ///
361    /// Note that objects that have been sent to a package become inaccessible.
362    ///
363    /// `scanLimit` restricts the number of candidate transactions scanned when
364    /// gathering a page of results. It is required for queries that apply
365    /// more than two complex filters (on function, kind, sender, recipient,
366    /// input object, changed object, or ids), and can be at most
367    /// `serviceConfig.maxScanLimit`.
368    ///
369    /// When the scan limit is reached the page will be returned even if it has
370    /// fewer than `first` results when paginating forward (`last` when
371    /// paginating backwards). If there are more transactions to scan,
372    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
373    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
374    /// to the last transaction that was scanned as opposed to the last (or
375    /// first) transaction in the page.
376    ///
377    /// Requesting the next (or previous) page after this cursor will resume the
378    /// search, scanning the next `scanLimit` many transactions in the
379    /// direction of pagination, and so on until all transactions in the
380    /// scanning range have been visited.
381    ///
382    /// By default, the scanning range includes all transactions known to
383    /// GraphQL, but it can be restricted by the `after` and `before`
384    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
385    /// `atCheckpoint` filters.
386    pub(crate) async fn received_transaction_blocks(
387        &self,
388        ctx: &Context<'_>,
389        first: Option<u64>,
390        after: Option<transaction_block::Cursor>,
391        last: Option<u64>,
392        before: Option<transaction_block::Cursor>,
393        filter: Option<TransactionBlockFilter>,
394        scan_limit: Option<u64>,
395    ) -> Result<ScanConnection<String, TransactionBlock>> {
396        ObjectImpl(&self.super_)
397            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
398            .await
399    }
400
401    /// The Base64-encoded BCS serialization of the package's content.
402    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
403        ObjectImpl(&self.super_).bcs().await
404    }
405
406    /// Fetch another version of this package (the package that shares this
407    /// package's original ID, but has the specified `version`).
408    async fn package_at_version(
409        &self,
410        ctx: &Context<'_>,
411        version: u64,
412    ) -> Result<Option<MovePackage>> {
413        MovePackage::query(
414            ctx,
415            self.super_.address,
416            MovePackage::by_version(version, self.checkpoint_viewed_at_impl()),
417        )
418        .await
419        .extend()
420    }
421
422    /// Fetch all versions of this package (packages that share this package's
423    /// original ID), optionally bounding the versions exclusively from
424    /// below with `afterVersion`, or from above with `beforeVersion`.
425    async fn package_versions(
426        &self,
427        ctx: &Context<'_>,
428        first: Option<u64>,
429        after: Option<Cursor>,
430        last: Option<u64>,
431        before: Option<Cursor>,
432        filter: Option<MovePackageVersionFilter>,
433    ) -> Result<Connection<String, MovePackage>> {
434        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
435
436        MovePackage::paginate_by_version(
437            ctx.data_unchecked(),
438            page,
439            self.super_.address,
440            filter,
441            self.checkpoint_viewed_at_impl(),
442        )
443        .await
444        .extend()
445    }
446
447    /// Fetch the latest version of this package (the package with the highest
448    /// `version` that shares this packages's original ID)
449    async fn latest_package(&self, ctx: &Context<'_>) -> Result<MovePackage> {
450        Ok(MovePackage::query(
451            ctx,
452            self.super_.address,
453            MovePackage::latest_at(self.checkpoint_viewed_at_impl()),
454        )
455        .await
456        .extend()?
457        .ok_or_else(|| Error::Internal("No latest version found".to_string()))?)
458    }
459
460    /// A representation of the module called `name` in this package, including
461    /// the structs and functions it defines.
462    async fn module(&self, name: String) -> Result<Option<MoveModule>> {
463        self.module_impl(&name).extend()
464    }
465
466    /// Paginate through the MoveModules defined in this package.
467    pub async fn modules(
468        &self,
469        ctx: &Context<'_>,
470        first: Option<u64>,
471        after: Option<CModule>,
472        last: Option<u64>,
473        before: Option<CModule>,
474    ) -> Result<Option<Connection<String, MoveModule>>> {
475        use std::ops::Bound as B;
476
477        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
478        let cursor_viewed_at = page.validate_cursor_consistency()?;
479        let checkpoint_viewed_at =
480            cursor_viewed_at.unwrap_or_else(|| self.checkpoint_viewed_at_impl());
481
482        let parsed = self.parsed_package()?;
483        let module_range = parsed.modules().range::<String, _>((
484            page.after().map_or(B::Unbounded, |a| B::Excluded(&a.name)),
485            page.before().map_or(B::Unbounded, |b| B::Excluded(&b.name)),
486        ));
487
488        let mut connection = Connection::new(false, false);
489        let modules = if page.is_from_front() {
490            module_range.take(page.limit()).collect()
491        } else {
492            let mut ms: Vec<_> = module_range.rev().take(page.limit()).collect();
493            ms.reverse();
494            ms
495        };
496
497        connection.has_previous_page = modules.first().is_some_and(|(fst, _)| {
498            parsed
499                .modules()
500                .range::<String, _>((B::Unbounded, B::Excluded(*fst)))
501                .next()
502                .is_some()
503        });
504
505        connection.has_next_page = modules.last().is_some_and(|(lst, _)| {
506            parsed
507                .modules()
508                .range::<String, _>((B::Excluded(*lst), B::Unbounded))
509                .next()
510                .is_some()
511        });
512
513        for (name, parsed) in modules {
514            let Some(native) = self.native.serialized_module_map().get(name) else {
515                return Err(Error::Internal(format!(
516                    "Module '{name}' exists in PackageCache but not in serialized map.",
517                ))
518                .extend());
519            };
520
521            let cursor = JsonCursor::new(ConsistentNamedCursor {
522                name: name.clone(),
523                c: checkpoint_viewed_at,
524            })
525            .encode_cursor();
526            connection.edges.push(Edge::new(
527                cursor,
528                MoveModule {
529                    storage_id: self.super_.address,
530                    native: native.clone(),
531                    parsed: parsed.clone(),
532                    checkpoint_viewed_at,
533                },
534            ))
535        }
536
537        if connection.edges.is_empty() {
538            Ok(None)
539        } else {
540            Ok(Some(connection))
541        }
542    }
543
544    /// The transitive dependencies of this package.
545    async fn linkage(&self) -> Option<Vec<Linkage>> {
546        let linkage = self
547            .native
548            .linkage_table()
549            .iter()
550            .map(|(&runtime_id, upgrade_info)| Linkage {
551                original_id: runtime_id.into(),
552                upgraded_id: upgrade_info.upgraded_id.into(),
553                version: upgrade_info.upgraded_version.value().into(),
554            })
555            .collect();
556
557        Some(linkage)
558    }
559
560    /// The (previous) versions of this package that introduced its types.
561    async fn type_origins(&self) -> Option<Vec<TypeOrigin>> {
562        let type_origins = self
563            .native
564            .type_origin_table()
565            .iter()
566            .map(|origin| TypeOrigin {
567                module: origin.module_name.clone(),
568                struct_: origin.datatype_name.clone(),
569                defining_id: origin.package.into(),
570            })
571            .collect();
572
573        Some(type_origins)
574    }
575
576    /// BCS representation of the package's modules.  Modules appear as a
577    /// sequence of pairs (module name, followed by module bytes), in
578    /// alphabetic order by module name.
579    async fn module_bcs(&self) -> Result<Option<Base64>> {
580        let bcs = bcs::to_bytes(self.native.serialized_module_map())
581            .map_err(|_| {
582                Error::Internal(format!("Failed to serialize package {}", self.native.id()))
583            })
584            .extend()?;
585
586        Ok(Some(bcs.into()))
587    }
588}
589
590impl MovePackage {
591    fn parsed_package(&self) -> Result<ParsedMovePackage, Error> {
592        ParsedMovePackage::read_from_package(&self.native)
593            .map_err(|e| Error::Internal(format!("Error reading package: {e}")))
594    }
595
596    /// This package was viewed at a snapshot of the chain state at this
597    /// checkpoint (identified by its sequence number).
598    fn checkpoint_viewed_at_impl(&self) -> u64 {
599        self.super_.checkpoint_viewed_at
600    }
601
602    pub(crate) fn module_impl(&self, name: &str) -> Result<Option<MoveModule>, Error> {
603        use PackageCacheError as E;
604        match (
605            self.native.serialized_module_map().get(name),
606            self.parsed_package()?.module(name),
607        ) {
608            (Some(native), Ok(parsed)) => Ok(Some(MoveModule {
609                storage_id: self.super_.address,
610                native: native.clone(),
611                parsed: parsed.clone(),
612                checkpoint_viewed_at: self.checkpoint_viewed_at_impl(),
613            })),
614
615            (None, _) | (_, Err(E::ModuleNotFound(_, _))) => Ok(None),
616            (_, Err(e)) => Err(Error::Internal(format!(
617                "Unexpected error fetching module: {e}"
618            ))),
619        }
620    }
621
622    /// Look-up the package by its Storage ID, as of a given checkpoint.
623    pub(crate) fn by_id_at(checkpoint_viewed_at: u64) -> PackageLookup {
624        PackageLookup::ById {
625            checkpoint_viewed_at,
626        }
627    }
628
629    /// Look-up a specific version of the package, identified by the storage ID
630    /// of any version of the package, and the desired version (the actual
631    /// object loaded might be at a different object ID).
632    pub(crate) fn by_version(version: u64, checkpoint_viewed_at: u64) -> PackageLookup {
633        PackageLookup::Versioned {
634            version,
635            checkpoint_viewed_at,
636        }
637    }
638
639    /// Look-up the package that shares the same original ID as the package at
640    /// `address`, but has the latest version, as of the given checkpoint.
641    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> PackageLookup {
642        PackageLookup::Latest {
643            checkpoint_viewed_at,
644        }
645    }
646
647    pub(crate) async fn query(
648        ctx: &Context<'_>,
649        address: IotaAddress,
650        key: PackageLookup,
651    ) -> Result<Option<Self>, Error> {
652        let (address, key) = match key {
653            PackageLookup::ById {
654                checkpoint_viewed_at,
655            } => (address, Object::latest_at(checkpoint_viewed_at)),
656
657            PackageLookup::Versioned {
658                version,
659                checkpoint_viewed_at,
660            } => {
661                if is_system_package(address) {
662                    (address, Object::at_version(version, checkpoint_viewed_at))
663                } else {
664                    let DataLoader(loader) = &ctx.data_unchecked();
665                    let Some(translation) = loader
666                        .load_one(PackageVersionKey { address, version })
667                        .await?
668                    else {
669                        return Ok(None);
670                    };
671
672                    (translation, Object::latest_at(checkpoint_viewed_at))
673                }
674            }
675
676            PackageLookup::Latest {
677                checkpoint_viewed_at,
678            } => {
679                if is_system_package(address) {
680                    (address, Object::latest_at(checkpoint_viewed_at))
681                } else {
682                    let DataLoader(loader) = &ctx.data_unchecked();
683                    let Some(translation) = loader
684                        .load_one(LatestKey {
685                            address,
686                            checkpoint_viewed_at,
687                        })
688                        .await?
689                    else {
690                        return Ok(None);
691                    };
692
693                    (translation, Object::latest_at(checkpoint_viewed_at))
694                }
695            }
696        };
697
698        let Some(object) = Object::query(ctx, address, key).await? else {
699            return Ok(None);
700        };
701
702        Ok(Some(MovePackage::try_from(&object).map_err(|_| {
703            Error::Internal(format!("{address} is not a package"))
704        })?))
705    }
706
707    /// Query the database for a `page` of Move packages. The Page uses the
708    /// checkpoint sequence number the package was created at, its original
709    /// ID, and its version as the cursor. The query can optionally be
710    /// filtered by a bound on the checkpoints the packages were created in.
711    ///
712    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence
713    /// number at which this page was queried. Each entity returned in the
714    /// connection will inherit this checkpoint, so that when viewing that
715    /// entity's state, it will be as if it is being viewed at this
716    /// checkpoint.
717    ///
718    /// The cursors in `page` may also include checkpoint viewed at fields. If
719    /// these are set, they take precedence over the checkpoint that
720    /// pagination is being conducted in.
721    pub(crate) async fn paginate_by_checkpoint(
722        db: &Db,
723        page: Page<Cursor>,
724        filter: Option<MovePackageCheckpointFilter>,
725        checkpoint_viewed_at: u64,
726    ) -> Result<Connection<String, MovePackage>, Error> {
727        let cursor_viewed_at = page.validate_cursor_consistency()?;
728        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
729
730        let after_checkpoint: Option<u64> = filter
731            .as_ref()
732            .and_then(|f| f.after_checkpoint)
733            .map(|v| v.into());
734
735        // Clamp the "before checkpoint" bound by "checkpoint viewed at".
736        let before_checkpoint = filter
737            .as_ref()
738            .and_then(|f| f.before_checkpoint)
739            .map(|v| v.into())
740            .unwrap_or(u64::MAX)
741            .min(checkpoint_viewed_at + 1);
742
743        let (prev, next, results) = db
744            .execute(move |conn| {
745                let mut q = query!(
746                    r#"
747                    SELECT
748                            p.original_id,
749                            o.*
750                    FROM
751                            packages p
752                    INNER JOIN
753                            objects_history o
754                    ON
755                            p.package_id = o.object_id
756                    AND     p.package_version = o.object_version
757                    AND     p.checkpoint_sequence_number = o.checkpoint_sequence_number
758                "#
759                );
760
761                q = filter!(
762                    q,
763                    format!("o.checkpoint_sequence_number < {before_checkpoint}")
764                );
765                if let Some(after) = after_checkpoint {
766                    q = filter!(q, format!("{after} < o.checkpoint_sequence_number"));
767                }
768
769                page.paginate_raw_query::<StoredHistoryPackage>(conn, checkpoint_viewed_at, q)
770            })
771            .await?;
772
773        let mut conn = Connection::new(prev, next);
774
775        // The "checkpoint viewed at" sets a consistent upper bound for the nested
776        // queries.
777        for stored in results {
778            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
779            let package =
780                MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
781            conn.edges.push(Edge::new(cursor, package));
782        }
783
784        Ok(conn)
785    }
786
787    /// Query the database for a `page` of Move packages. The Page uses the
788    /// checkpoint sequence number the package was created at, its original
789    /// ID, and its version as the cursor. The query is filtered by the ID
790    /// of a package and will only return packages from the same family
791    /// (sharing the same original ID as the package whose ID was given), and
792    /// can optionally be filtered by an upper and lower bound on package
793    /// version.
794    ///
795    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence
796    /// number at which this page was queried. Each entity returned in the
797    /// connection will inherit this checkpoint, so that when viewing that
798    /// entity's state, it will be as if it is being viewed at this
799    /// checkpoint.
800    ///
801    /// The cursors in `page` may also include checkpoint viewed at fields. If
802    /// these are set, they take precedence over the checkpoint that
803    /// pagination is being conducted in.
804    pub(crate) async fn paginate_by_version(
805        db: &Db,
806        page: Page<Cursor>,
807        package: IotaAddress,
808        filter: Option<MovePackageVersionFilter>,
809        checkpoint_viewed_at: u64,
810    ) -> Result<Connection<String, MovePackage>, Error> {
811        let cursor_viewed_at = page.validate_cursor_consistency()?;
812        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
813        let (prev, next, results) = db
814            .execute(move |conn| {
815                page.paginate_raw_query::<StoredHistoryPackage>(
816                    conn,
817                    checkpoint_viewed_at,
818                    if is_system_package(package) {
819                        system_package_version_query(package, filter)
820                    } else {
821                        user_package_version_query(package, filter)
822                    },
823                )
824            })
825            .await?;
826
827        let mut conn = Connection::new(prev, next);
828
829        // The "checkpoint viewed at" sets a consistent upper bound for the nested
830        // queries.
831        for stored in results {
832            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
833            let package =
834                MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
835            conn.edges.push(Edge::new(cursor, package));
836        }
837
838        Ok(conn)
839    }
840
841    /// `checkpoint_viewed_at` points to the checkpoint snapshot that this
842    /// `MovePackage` came from. This is stored in the `MovePackage` so that
843    /// related fields from the package are read from the same checkpoint
844    /// (consistently).
845    pub(crate) fn try_from_stored_history_object(
846        history_object: StoredHistoryObject,
847        checkpoint_viewed_at: u64,
848    ) -> Result<Self, Error> {
849        let object = Object::try_from_stored_history_object(
850            history_object,
851            checkpoint_viewed_at,
852            // root_version
853            None,
854        )?;
855        Self::try_from(&object).map_err(|_| Error::Internal("Not a package!".to_string()))
856    }
857}
858
859impl Checkpointed for Cursor {
860    fn checkpoint_viewed_at(&self) -> u64 {
861        self.checkpoint_viewed_at
862    }
863}
864
865impl RawPaginated<Cursor> for StoredHistoryPackage {
866    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
867        filter!(
868            query,
869            format!(
870                "o.checkpoint_sequence_number > {cp} OR (\
871                 o.checkpoint_sequence_number = {cp} AND
872                 original_id > '\\x{id}'::bytea OR (\
873                 original_id = '\\x{id}'::bytea AND \
874                 o.object_version >= {pv}\
875                 ))",
876                cp = cursor.checkpoint_sequence_number,
877                id = hex::encode(&cursor.original_id),
878                pv = cursor.package_version,
879            )
880        )
881    }
882
883    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
884        filter!(
885            query,
886            format!(
887                "o.checkpoint_sequence_number < {cp} OR (\
888                 o.checkpoint_sequence_number = {cp} AND
889                 original_id < '\\x{id}'::bytea OR (\
890                 original_id = '\\x{id}'::bytea AND \
891                 o.object_version <= {pv}\
892                 ))",
893                cp = cursor.checkpoint_sequence_number,
894                id = hex::encode(&cursor.original_id),
895                pv = cursor.package_version,
896            )
897        )
898    }
899
900    fn order(asc: bool, query: RawQuery) -> RawQuery {
901        if asc {
902            query
903                .order_by("o.checkpoint_sequence_number ASC")
904                .order_by("original_id ASC")
905                .order_by("o.object_version ASC")
906        } else {
907            query
908                .order_by("o.checkpoint_sequence_number DESC")
909                .order_by("original_id DESC")
910                .order_by("o.object_version DESC")
911        }
912    }
913}
914
915impl Target<Cursor> for StoredHistoryPackage {
916    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
917        Cursor::new(PackageCursor {
918            checkpoint_sequence_number: self.object.checkpoint_sequence_number as u64,
919            original_id: self.original_id.clone(),
920            package_version: self.object.object_version as u64,
921            checkpoint_viewed_at,
922        })
923    }
924}
925
926impl ScanLimited for BcsCursor<PackageCursor> {}
927
928impl Loader<PackageVersionKey> for Db {
929    type Value = IotaAddress;
930    type Error = Error;
931
932    async fn load(
933        &self,
934        keys: &[PackageVersionKey],
935    ) -> Result<HashMap<PackageVersionKey, IotaAddress>, Error> {
936        use packages::dsl;
937        let other = diesel::alias!(packages as other);
938
939        let id_versions: BTreeSet<_> = keys
940            .iter()
941            .map(|k| (k.address.into_vec(), k.version as i64))
942            .collect();
943
944        let stored_packages: Vec<(Vec<u8>, i64, Vec<u8>)> = self
945            .execute(move |conn| {
946                conn.results(|| {
947                    let mut query = dsl::packages
948                        .inner_join(other.on(dsl::original_id.eq(other.field(dsl::original_id))))
949                        .select((
950                            dsl::package_id,
951                            other.field(dsl::package_version),
952                            other.field(dsl::package_id),
953                        ))
954                        .into_boxed();
955
956                    for (id, version) in id_versions.iter().cloned() {
957                        query = query.or_filter(
958                            dsl::package_id
959                                .eq(id)
960                                .and(other.field(dsl::package_version).eq(version)),
961                        );
962                    }
963
964                    query
965                })
966            })
967            .await
968            .map_err(|e| Error::Internal(format!("Failed to load packages: {e}")))?;
969
970        let mut result = HashMap::new();
971        for (id, version, other_id) in stored_packages {
972            result.insert(
973                PackageVersionKey {
974                    address: addr(&id)?,
975                    version: version as u64,
976                },
977                addr(&other_id)?,
978            );
979        }
980
981        Ok(result)
982    }
983}
984
985impl Loader<LatestKey> for Db {
986    type Value = IotaAddress;
987    type Error = Error;
988
989    async fn load(&self, keys: &[LatestKey]) -> Result<HashMap<LatestKey, IotaAddress>, Error> {
990        use packages::dsl;
991        let other = diesel::alias!(packages as other);
992
993        let mut ids_by_cursor: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
994        for key in keys {
995            ids_by_cursor
996                .entry(key.checkpoint_viewed_at)
997                .or_default()
998                .insert(key.address.into_vec());
999        }
1000
1001        // Issue concurrent reads for each group of IDs
1002        let futures = ids_by_cursor
1003            .into_iter()
1004            .map(|(checkpoint_viewed_at, ids)| {
1005                self.execute(move |conn| {
1006                    let results: Vec<(Vec<u8>, Vec<u8>)> = conn.results(|| {
1007                        let o_original_id = other.field(dsl::original_id);
1008                        let o_package_id = other.field(dsl::package_id);
1009                        let o_cp_seq_num = other.field(dsl::checkpoint_sequence_number);
1010                        let o_version = other.field(dsl::package_version);
1011
1012                        let query = dsl::packages
1013                            .inner_join(other.on(dsl::original_id.eq(o_original_id)))
1014                            .select((dsl::package_id, o_package_id))
1015                            .filter(dsl::package_id.eq_any(ids.iter().cloned()))
1016                            .filter(o_cp_seq_num.le(checkpoint_viewed_at as i64))
1017                            .order_by((dsl::package_id, dsl::original_id, o_version.desc()))
1018                            .distinct_on((dsl::package_id, dsl::original_id));
1019                        query
1020                    })?;
1021
1022                    Ok::<_, diesel::result::Error>(
1023                        results
1024                            .into_iter()
1025                            .map(|(p, latest)| (checkpoint_viewed_at, p, latest))
1026                            .collect::<Vec<_>>(),
1027                    )
1028                })
1029            });
1030
1031        // Wait for the reads to all finish, and gather them into the result map.
1032        let groups = futures::future::join_all(futures).await;
1033
1034        let mut results = HashMap::new();
1035        for group in groups {
1036            for (checkpoint_viewed_at, address, latest) in
1037                group.map_err(|e| Error::Internal(format!("Failed to fetch packages: {e}")))?
1038            {
1039                results.insert(
1040                    LatestKey {
1041                        address: addr(&address)?,
1042                        checkpoint_viewed_at,
1043                    },
1044                    addr(&latest)?,
1045                );
1046            }
1047        }
1048
1049        Ok(results)
1050    }
1051}
1052
1053impl TryFrom<&Object> for MovePackage {
1054    type Error = MovePackageDowncastError;
1055
1056    fn try_from(object: &Object) -> Result<Self, MovePackageDowncastError> {
1057        let Some(native) = object.native_impl() else {
1058            return Err(MovePackageDowncastError);
1059        };
1060
1061        if let Data::Package(move_package) = &native.data {
1062            Ok(Self {
1063                super_: object.clone(),
1064                native: move_package.clone(),
1065            })
1066        } else {
1067            Err(MovePackageDowncastError)
1068        }
1069    }
1070}
1071
1072/// Query for fetching all the versions of a system package (assumes that
1073/// `package` has already been verified as a system package). This is an
1074/// `objects_history` query disguised as a package query.
1075fn system_package_version_query(
1076    package: IotaAddress,
1077    filter: Option<MovePackageVersionFilter>,
1078) -> RawQuery {
1079    // Query uses a left join to force the query planner to use `objects_version` in
1080    // the outer loop.
1081    let mut q = query!(
1082        r#"
1083            SELECT
1084                    o.object_id AS original_id,
1085                    o.*
1086            FROM
1087                    objects_version v
1088            LEFT JOIN
1089                    objects_history o
1090            ON
1091                    v.object_id = o.object_id
1092            AND     v.object_version = o.object_version
1093            AND     v.cp_sequence_number = o.checkpoint_sequence_number
1094        "#
1095    );
1096
1097    q = filter!(
1098        q,
1099        format!(
1100            "v.object_id = '\\x{}'::bytea",
1101            hex::encode(package.into_vec())
1102        )
1103    );
1104
1105    if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1106        let a: u64 = after.into();
1107        q = filter!(q, format!("v.object_version > {a}"));
1108    }
1109
1110    if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1111        let b: u64 = before.into();
1112        q = filter!(q, format!("v.object_version < {b}"));
1113    }
1114
1115    q
1116}
1117
1118/// Query for fetching all the versions of a non-system package (assumes that
1119/// `package` has already been verified as a system package)
1120fn user_package_version_query(
1121    package: IotaAddress,
1122    filter: Option<MovePackageVersionFilter>,
1123) -> RawQuery {
1124    let mut q = query!(
1125        r#"
1126            SELECT
1127                    p.original_id,
1128                    o.*
1129            FROM
1130                    packages q
1131            INNER JOIN
1132                    packages p
1133            ON
1134                    q.original_id = p.original_id
1135            INNER JOIN
1136                    objects_history o
1137            ON
1138                    p.package_id = o.object_id
1139            AND     p.package_version = o.object_version
1140            AND     p.checkpoint_sequence_number = o.checkpoint_sequence_number
1141        "#
1142    );
1143
1144    q = filter!(
1145        q,
1146        format!(
1147            "q.package_id = '\\x{}'::bytea",
1148            hex::encode(package.into_vec())
1149        )
1150    );
1151
1152    if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1153        let a: u64 = after.into();
1154        q = filter!(q, format!("p.package_version > {a}"));
1155    }
1156
1157    if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1158        let b: u64 = before.into();
1159        q = filter!(q, format!("p.package_version < {b}"));
1160    }
1161
1162    q
1163}