iota_graphql_rpc/types/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt::Write,
8};
9
10use async_graphql::{
11    connection::{Connection, CursorType, Edge},
12    dataloader::Loader,
13    *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17    models::objects::StoredHistoryObject,
18    schema::{objects_history, objects_version},
19    types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22    TypeTag,
23    object::{
24        MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25        bounded_visitor::BoundedVisitor,
26    },
27};
28use move_core_types::{
29    annotated_value::{MoveStruct, MoveTypeLayout},
30    language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    connection::ScanConnection,
36    consistency::{Checkpointed, View, build_objects_query},
37    data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
38    error::Error,
39    filter, or_filter,
40    raw_query::RawQuery,
41    types::{
42        available_range::AvailableRange,
43        balance::{self, Balance},
44        base64::Base64,
45        big_int::BigInt,
46        coin::Coin,
47        coin_metadata::CoinMetadata,
48        cursor::{self, Page, RawPaginated, ScanLimited, Target},
49        digest::Digest,
50        display::{Display, DisplayEntry},
51        dynamic_field::{DynamicField, DynamicFieldName},
52        intersect,
53        iota_address::{IotaAddress, addr},
54        iota_names_registration::{DomainFormat, IotaNamesRegistration},
55        move_object::MoveObject,
56        move_package::MovePackage,
57        owner::{Owner, OwnerImpl},
58        stake::StakedIota,
59        transaction_block,
60        transaction_block::{TransactionBlock, TransactionBlockFilter},
61        type_filter::{ExactTypeFilter, TypeFilter},
62        uint53::UInt53,
63    },
64};
65
66#[derive(Clone, Debug)]
67pub(crate) struct Object {
68    pub address: IotaAddress,
69    pub kind: ObjectKind,
70    /// The checkpoint sequence number at which this was viewed at.
71    pub checkpoint_viewed_at: u64,
72    /// Root parent object version for dynamic fields.
73    ///
74    /// This enables consistent dynamic field reads in the case of chained
75    /// dynamic object fields, e.g., `Parent -> DOF1 -> DOF2`. In such
76    /// cases, the object versions may end up like `Parent >= DOF1, DOF2`
77    /// but `DOF1 < DOF2`. Thus, database queries for dynamic fields must
78    /// bound the object versions by the version of the root object of the tree.
79    ///
80    /// Essentially, lamport timestamps of objects are updated for all top-level
81    /// mutable objects provided as inputs to a transaction as well as any
82    /// mutated dynamic child objects. However, any dynamic child objects
83    /// that were loaded but not actually mutated don't end up having
84    /// their versions updated.
85    root_version: u64,
86}
87
88/// Type to implement GraphQL fields that are shared by all Objects.
89pub(crate) struct ObjectImpl<'o>(pub &'o Object);
90
91#[derive(Clone, Debug)]
92#[expect(clippy::large_enum_variant)]
93pub(crate) enum ObjectKind {
94    /// An object loaded from serialized data, such as the contents of a
95    /// transaction that hasn't been indexed yet.
96    NotIndexed(NativeObject),
97    /// An object fetched from the index.
98    Indexed(NativeObject, StoredHistoryObject),
99    /// The object is wrapped or deleted and only partial information can be
100    /// loaded from the indexer. The `u64` is the version of the object.
101    WrappedOrDeleted(u64),
102}
103
104#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
105#[graphql(name = "ObjectKind")]
106pub enum ObjectStatus {
107    /// The object is loaded from serialized data, such as the contents of a
108    /// transaction that hasn't been indexed yet.
109    NotIndexed,
110    /// The object is fetched from the index.
111    Indexed,
112    /// The object is deleted or wrapped and only partial information can be
113    /// loaded from the indexer.
114    WrappedOrDeleted,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
118pub(crate) struct ObjectRef {
119    /// ID of the object.
120    pub address: IotaAddress,
121    /// Version or sequence number of the object.
122    pub version: UInt53,
123    /// Digest of the object.
124    pub digest: Digest,
125}
126
127/// Constrains the set of objects returned. All filters are optional, and the
128/// resulting set of objects are ones whose
129///
130/// - Type matches the `type` filter,
131/// - AND, whose owner matches the `owner` filter,
132/// - AND, whose ID is in `objectIds` OR whose ID and version is in
133///   `objectKeys`.
134#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
135pub(crate) struct ObjectFilter {
136    /// This field is used to specify the type of objects that should be
137    /// included in the query results.
138    ///
139    /// Objects can be filtered by their type's package, package::module, or
140    /// their fully qualified type name.
141    ///
142    /// Generic types can be queried by either the generic type name, e.g.
143    /// `0x2::coin::Coin`, or by the full type name, such as
144    /// `0x2::coin::Coin<0x2::iota::IOTA>`.
145    pub type_: Option<TypeFilter>,
146
147    /// Filter for live objects by their current owners.
148    pub owner: Option<IotaAddress>,
149
150    /// Filter for live objects by their IDs.
151    pub object_ids: Option<Vec<IotaAddress>>,
152
153    /// Filter for live or potentially historical objects by their ID and
154    /// version.
155    pub object_keys: Option<Vec<ObjectKey>>,
156}
157
158#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
159pub(crate) struct ObjectKey {
160    pub object_id: IotaAddress,
161    pub version: UInt53,
162}
163
164/// The object's owner type: Immutable, Shared, Parent, or Address.
165#[derive(Union, Clone)]
166pub(crate) enum ObjectOwner {
167    Immutable(Immutable),
168    Shared(Shared),
169    Parent(Parent),
170    Address(AddressOwner),
171}
172
173/// An immutable object is an object that can't be mutated, transferred, or
174/// deleted. Immutable objects have no owner, so anyone can use them.
175#[derive(SimpleObject, Clone)]
176pub(crate) struct Immutable {
177    #[graphql(name = "_")]
178    dummy: Option<bool>,
179}
180
181/// A shared object is an object that is shared using the
182/// 0x2::transfer::share_object function. Unlike owned objects, once an object
183/// is shared, it stays mutable and is accessible by anyone.
184#[derive(SimpleObject, Clone)]
185pub(crate) struct Shared {
186    initial_shared_version: UInt53,
187}
188
189/// If the object's owner is a Parent, this object is part of a dynamic field
190/// (it is the value of the dynamic field, or the intermediate Field object
191/// itself). Also note that if the owner is a parent, then it's guaranteed to be
192/// an object.
193#[derive(SimpleObject, Clone)]
194pub(crate) struct Parent {
195    parent: Option<Object>,
196}
197
198/// An address-owned object is owned by a specific 32-byte address that is
199/// either an account address (derived from a particular signature scheme) or
200/// an object ID. An address-owned object is accessible only to its owner and no
201/// others.
202#[derive(SimpleObject, Clone)]
203pub(crate) struct AddressOwner {
204    owner: Option<Owner>,
205}
206
207/// Filter for a point query of an Object.
208pub(crate) enum ObjectLookup {
209    LatestAt {
210        /// The checkpoint sequence number at which this was viewed at.
211        checkpoint_viewed_at: u64,
212    },
213
214    UnderParent {
215        /// The parent version to be used as an upper bound for the query. Look
216        /// for the latest version of a child object whose version is
217        /// less than or equal to this upper bound.
218        parent_version: u64,
219        /// The checkpoint sequence number at which this was viewed at.
220        checkpoint_viewed_at: u64,
221    },
222
223    VersionAt {
224        /// The exact version of the object to be fetched.
225        version: u64,
226        /// The checkpoint sequence number at which this was viewed at.
227        checkpoint_viewed_at: u64,
228    },
229}
230
231pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
232
233/// The inner struct for the `Object`'s cursor. The `object_id` is used as the
234/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
235/// the cursor.
236#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
237pub(crate) struct HistoricalObjectCursor {
238    #[serde(rename = "o")]
239    object_id: Vec<u8>,
240    /// The checkpoint sequence number this was viewed at.
241    #[serde(rename = "c")]
242    checkpoint_viewed_at: u64,
243}
244
245/// Interface implemented by on-chain values that are addressable by an ID (also
246/// referred to as its address). This includes Move objects and packages.
247#[expect(clippy::duplicated_attributes)]
248#[derive(Interface)]
249#[graphql(
250    name = "IObject",
251    field(name = "version", ty = "UInt53"),
252    field(
253        name = "status",
254        ty = "ObjectStatus",
255        desc = "The current status of the object as read from the off-chain store. The possible \
256                states are: NOT_INDEXED, the object is loaded from serialized data, such as the \
257                contents of a genesis or system package upgrade transaction. LIVE, the version \
258                returned is the most recent for the object, and it is not deleted or wrapped at \
259                that version. HISTORICAL, the object was referenced at a specific version or \
260                checkpoint, so is fetched from historical tables and may not be the latest version \
261                of the object. WRAPPED_OR_DELETED, the object is deleted or wrapped and only \
262                partial information can be loaded."
263    ),
264    field(
265        name = "digest",
266        ty = "Option<String>",
267        desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
268                string."
269    ),
270    field(
271        name = "owner",
272        ty = "Option<ObjectOwner>",
273        desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
274                Immutable and Shared Objects do not have owners."
275    ),
276    field(
277        name = "previous_transaction_block",
278        ty = "Option<TransactionBlock>",
279        desc = "The transaction block that created this version of the object."
280    ),
281    field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
282    field(
283        name = "received_transaction_blocks",
284        arg(name = "first", ty = "Option<u64>"),
285        arg(name = "after", ty = "Option<transaction_block::Cursor>"),
286        arg(name = "last", ty = "Option<u64>"),
287        arg(name = "before", ty = "Option<transaction_block::Cursor>"),
288        arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
289        arg(name = "scan_limit", ty = "Option<u64>"),
290        ty = "ScanConnection<String, TransactionBlock>",
291        desc = "The transaction blocks that sent objects to this object."
292    ),
293    field(
294        name = "bcs",
295        ty = "Option<Base64>",
296        desc = "The Base64-encoded BCS serialization of the object's content."
297    )
298)]
299pub(crate) enum IObject {
300    Object(Object),
301    MovePackage(MovePackage),
302    MoveObject(MoveObject),
303    Coin(Coin),
304    CoinMetadata(CoinMetadata),
305    StakedIota(StakedIota),
306}
307
308/// `DataLoader` key for fetching an `Object` at a specific version, constrained
309/// by a consistency cursor (if that version was created after the checkpoint
310/// the query is viewing at, then it will fail).
311#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
312struct HistoricalKey {
313    id: IotaAddress,
314    version: u64,
315    checkpoint_viewed_at: u64,
316}
317
318/// `DataLoader` key for fetching the latest version of an object whose parent
319/// object has version `parent_version`, as of `checkpoint_viewed_at`. This
320/// look-up can fail to find a valid object if the key is not self-consistent,
321/// for example if the `parent_version` is set to a higher version
322/// than the object's actual parent as of `checkpoint_viewed_at`.
323#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
324struct ParentVersionKey {
325    id: IotaAddress,
326    parent_version: u64,
327    checkpoint_viewed_at: u64,
328}
329
330/// `DataLoader` key for fetching the latest version of an object as of a given
331/// checkpoint.
332#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
333struct LatestAtKey {
334    id: IotaAddress,
335    checkpoint_viewed_at: u64,
336}
337
338/// An object in IOTA is a package (set of Move bytecode modules) or object
339/// (typed data structure with fields) with additional metadata detailing its
340/// id, version, transaction digest, owner field indicating how this object can
341/// be accessed.
342#[Object]
343impl Object {
344    pub(crate) async fn address(&self) -> IotaAddress {
345        OwnerImpl::from(self).address().await
346    }
347
348    /// Objects owned by this object, optionally `filter`-ed.
349    pub(crate) async fn objects(
350        &self,
351        ctx: &Context<'_>,
352        first: Option<u64>,
353        after: Option<Cursor>,
354        last: Option<u64>,
355        before: Option<Cursor>,
356        filter: Option<ObjectFilter>,
357    ) -> Result<Connection<String, MoveObject>> {
358        OwnerImpl::from(self)
359            .objects(ctx, first, after, last, before, filter)
360            .await
361    }
362
363    /// Total balance of all coins with marker type owned by this object. If
364    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
365    pub(crate) async fn balance(
366        &self,
367        ctx: &Context<'_>,
368        type_: Option<ExactTypeFilter>,
369    ) -> Result<Option<Balance>> {
370        OwnerImpl::from(self).balance(ctx, type_).await
371    }
372
373    /// The balances of all coin types owned by this object.
374    pub(crate) async fn balances(
375        &self,
376        ctx: &Context<'_>,
377        first: Option<u64>,
378        after: Option<balance::Cursor>,
379        last: Option<u64>,
380        before: Option<balance::Cursor>,
381    ) -> Result<Connection<String, Balance>> {
382        OwnerImpl::from(self)
383            .balances(ctx, first, after, last, before)
384            .await
385    }
386
387    /// The coin objects for this object.
388    ///
389    /// `type` is a filter on the coin's type parameter, defaulting to
390    /// `0x2::iota::IOTA`.
391    pub(crate) async fn coins(
392        &self,
393        ctx: &Context<'_>,
394        first: Option<u64>,
395        after: Option<Cursor>,
396        last: Option<u64>,
397        before: Option<Cursor>,
398        type_: Option<ExactTypeFilter>,
399    ) -> Result<Connection<String, Coin>> {
400        OwnerImpl::from(self)
401            .coins(ctx, first, after, last, before, type_)
402            .await
403    }
404
405    /// The `0x3::staking_pool::StakedIota` objects owned by this object.
406    pub(crate) async fn staked_iotas(
407        &self,
408        ctx: &Context<'_>,
409        first: Option<u64>,
410        after: Option<Cursor>,
411        last: Option<u64>,
412        before: Option<Cursor>,
413    ) -> Result<Connection<String, StakedIota>> {
414        OwnerImpl::from(self)
415            .staked_iotas(ctx, first, after, last, before)
416            .await
417    }
418
419    /// The domain explicitly configured as the default domain pointing to this
420    /// address.
421    pub(crate) async fn iota_names_default_name(
422        &self,
423        ctx: &Context<'_>,
424        format: Option<DomainFormat>,
425    ) -> Result<Option<String>> {
426        OwnerImpl::from(self)
427            .iota_names_default_name(ctx, format)
428            .await
429    }
430
431    /// The IotaNamesRegistration NFTs owned by this address. These grant the
432    /// owner the capability to manage the associated domain.
433    pub(crate) async fn iota_names_registrations(
434        &self,
435        ctx: &Context<'_>,
436        first: Option<u64>,
437        after: Option<Cursor>,
438        last: Option<u64>,
439        before: Option<Cursor>,
440    ) -> Result<Connection<String, IotaNamesRegistration>> {
441        OwnerImpl::from(self)
442            .iota_names_registrations(ctx, first, after, last, before)
443            .await
444    }
445
446    pub(crate) async fn version(&self) -> UInt53 {
447        ObjectImpl(self).version().await
448    }
449
450    /// The current status of the object as read from the off-chain store. The
451    /// possible states are: NOT_INDEXED, the object is loaded from
452    /// serialized data, such as the contents of a genesis or system package
453    /// upgrade transaction. LIVE, the version returned is the most recent for
454    /// the object, and it is not deleted or wrapped at that version.
455    /// HISTORICAL, the object was referenced at a specific version or
456    /// checkpoint, so is fetched from historical tables and may not be the
457    /// latest version of the object. WRAPPED_OR_DELETED, the object is deleted
458    /// or wrapped and only partial information can be loaded."
459    pub(crate) async fn status(&self) -> ObjectStatus {
460        ObjectImpl(self).status().await
461    }
462
463    /// 32-byte hash that identifies the object's current contents, encoded as a
464    /// Base58 string.
465    pub(crate) async fn digest(&self) -> Option<String> {
466        ObjectImpl(self).digest().await
467    }
468
469    /// The owner type of this object: Immutable, Shared, Parent, Address
470    /// Immutable and Shared Objects do not have owners.
471    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
472        ObjectImpl(self).owner(ctx).await
473    }
474
475    /// The transaction block that created this version of the object.
476    pub(crate) async fn previous_transaction_block(
477        &self,
478        ctx: &Context<'_>,
479    ) -> Result<Option<TransactionBlock>> {
480        ObjectImpl(self).previous_transaction_block(ctx).await
481    }
482
483    /// The amount of IOTA we would rebate if this object gets deleted or
484    /// mutated. This number is recalculated based on the present storage
485    /// gas price.
486    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
487        ObjectImpl(self).storage_rebate().await
488    }
489
490    /// The transaction blocks that sent objects to this object.
491    ///
492    /// `scanLimit` restricts the number of candidate transactions scanned when
493    /// gathering a page of results. It is required for queries that apply
494    /// more than two complex filters (on function, kind, sender, recipient,
495    /// input object, changed object, or ids), and can be at most
496    /// `serviceConfig.maxScanLimit`.
497    ///
498    /// When the scan limit is reached the page will be returned even if it has
499    /// fewer than `first` results when paginating forward (`last` when
500    /// paginating backwards). If there are more transactions to scan,
501    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
502    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
503    /// to the last transaction that was scanned as opposed to the last (or
504    /// first) transaction in the page.
505    ///
506    /// Requesting the next (or previous) page after this cursor will resume the
507    /// search, scanning the next `scanLimit` many transactions in the
508    /// direction of pagination, and so on until all transactions in the
509    /// scanning range have been visited.
510    ///
511    /// By default, the scanning range includes all transactions known to
512    /// GraphQL, but it can be restricted by the `after` and `before`
513    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
514    /// `atCheckpoint` filters.
515    pub(crate) async fn received_transaction_blocks(
516        &self,
517        ctx: &Context<'_>,
518        first: Option<u64>,
519        after: Option<transaction_block::Cursor>,
520        last: Option<u64>,
521        before: Option<transaction_block::Cursor>,
522        filter: Option<TransactionBlockFilter>,
523        scan_limit: Option<u64>,
524    ) -> Result<ScanConnection<String, TransactionBlock>> {
525        ObjectImpl(self)
526            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
527            .await
528    }
529
530    /// The Base64-encoded BCS serialization of the object's content.
531    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
532        ObjectImpl(self).bcs().await
533    }
534
535    /// The set of named templates defined on-chain for the type of this object,
536    /// to be handled off-chain. The server substitutes data from the object
537    /// into these templates to generate a display string per template.
538    async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
539        ObjectImpl(self).display(ctx).await
540    }
541
542    /// Access a dynamic field on an object using its name. Names are arbitrary
543    /// Move values whose type have `copy`, `drop`, and `store`, and are
544    /// specified using their type, and their BCS contents, Base64 encoded.
545    ///
546    /// Dynamic fields on wrapped objects can be accessed by using the same API
547    /// under the Owner type.
548    async fn dynamic_field(
549        &self,
550        ctx: &Context<'_>,
551        name: DynamicFieldName,
552    ) -> Result<Option<DynamicField>> {
553        OwnerImpl::from(self)
554            .dynamic_field(ctx, name, Some(self.root_version()))
555            .await
556    }
557
558    /// Access a dynamic object field on an object using its name. Names are
559    /// arbitrary Move values whose type have `copy`, `drop`, and `store`,
560    /// and are specified using their type, and their BCS contents, Base64
561    /// encoded. The value of a dynamic object field can also be accessed
562    /// off-chain directly via its address (e.g. using `Query.object`).
563    ///
564    /// Dynamic fields on wrapped objects can be accessed by using the same API
565    /// under the Owner type.
566    async fn dynamic_object_field(
567        &self,
568        ctx: &Context<'_>,
569        name: DynamicFieldName,
570    ) -> Result<Option<DynamicField>> {
571        OwnerImpl::from(self)
572            .dynamic_object_field(ctx, name, Some(self.root_version()))
573            .await
574    }
575
576    /// The dynamic fields and dynamic object fields on an object.
577    ///
578    /// Dynamic fields on wrapped objects can be accessed by using the same API
579    /// under the Owner type.
580    async fn dynamic_fields(
581        &self,
582        ctx: &Context<'_>,
583        first: Option<u64>,
584        after: Option<Cursor>,
585        last: Option<u64>,
586        before: Option<Cursor>,
587    ) -> Result<Connection<String, DynamicField>> {
588        OwnerImpl::from(self)
589            .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
590            .await
591    }
592
593    /// Attempts to convert the object into a MoveObject
594    async fn as_move_object(&self) -> Option<MoveObject> {
595        MoveObject::try_from(self).ok()
596    }
597
598    /// Attempts to convert the object into a MovePackage
599    async fn as_move_package(&self) -> Option<MovePackage> {
600        MovePackage::try_from(self).ok()
601    }
602}
603
604impl ObjectImpl<'_> {
605    pub(crate) async fn version(&self) -> UInt53 {
606        self.0.version_impl().into()
607    }
608
609    pub(crate) async fn status(&self) -> ObjectStatus {
610        ObjectStatus::from(&self.0.kind)
611    }
612
613    pub(crate) async fn digest(&self) -> Option<String> {
614        self.0
615            .native_impl()
616            .map(|native| native.digest().base58_encode())
617    }
618
619    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
620        use NativeOwner as O;
621
622        let native = self.0.native_impl()?;
623
624        match native.owner {
625            O::AddressOwner(address) => {
626                let address = IotaAddress::from(address);
627                Some(ObjectOwner::Address(AddressOwner {
628                    owner: Some(Owner {
629                        address,
630                        checkpoint_viewed_at: self.0.checkpoint_viewed_at,
631                        root_version: None,
632                    }),
633                }))
634            }
635            O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
636            O::ObjectOwner(address) => {
637                let parent = Object::query(
638                    ctx,
639                    address.into(),
640                    Object::latest_at(self.0.checkpoint_viewed_at),
641                )
642                .await
643                .ok()
644                .flatten();
645
646                Some(ObjectOwner::Parent(Parent { parent }))
647            }
648            O::Shared {
649                initial_shared_version,
650            } => Some(ObjectOwner::Shared(Shared {
651                initial_shared_version: initial_shared_version.value().into(),
652            })),
653        }
654    }
655
656    pub(crate) async fn previous_transaction_block(
657        &self,
658        ctx: &Context<'_>,
659    ) -> Result<Option<TransactionBlock>> {
660        let Some(native) = self.0.native_impl() else {
661            return Ok(None);
662        };
663        let digest = native.previous_transaction;
664
665        TransactionBlock::query(ctx, digest.into(), self.0.checkpoint_viewed_at)
666            .await
667            .extend()
668    }
669
670    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
671        self.0
672            .native_impl()
673            .map(|native| BigInt::from(native.storage_rebate))
674    }
675
676    pub(crate) async fn received_transaction_blocks(
677        &self,
678        ctx: &Context<'_>,
679        first: Option<u64>,
680        after: Option<transaction_block::Cursor>,
681        last: Option<u64>,
682        before: Option<transaction_block::Cursor>,
683        filter: Option<TransactionBlockFilter>,
684        scan_limit: Option<u64>,
685    ) -> Result<ScanConnection<String, TransactionBlock>> {
686        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
687
688        let Some(filter) = filter
689            .unwrap_or_default()
690            .intersect(TransactionBlockFilter {
691                recv_address: Some(self.0.address),
692                ..Default::default()
693            })
694        else {
695            return Ok(ScanConnection::new(false, false));
696        };
697
698        TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
699            .await
700            .extend()
701    }
702
703    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
704        use ObjectKind as K;
705        Ok(match &self.0.kind {
706            K::WrappedOrDeleted(_) => None,
707            // WrappedOrDeleted objects are also read from the historical objects table, and they do
708            // not have a serialized object, so the column is also nullable for stored historical
709            // objects.
710            K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
711
712            K::NotIndexed(native) => {
713                let bytes = bcs::to_bytes(native)
714                    .map_err(|e| {
715                        Error::Internal(format!(
716                            "Failed to serialize object at {}: {e}",
717                            self.0.address
718                        ))
719                    })
720                    .extend()?;
721                Some(Base64::from(&bytes))
722            }
723        })
724    }
725
726    /// `display` is part of the `IMoveObject` interface, but is implemented on
727    /// `ObjectImpl` to allow for a convenience function on `Object`.
728    pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
729        let Some(native) = self.0.native_impl() else {
730            return Ok(None);
731        };
732
733        let move_object = native
734            .data
735            .try_as_move()
736            .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
737            .extend()?;
738
739        let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
740            .await
741            .extend()?;
742
743        let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
744            .await
745            .extend()?
746        else {
747            return Ok(None);
748        };
749
750        Ok(Some(display.render(&move_struct).extend()?))
751    }
752}
753
754impl Object {
755    /// Construct a GraphQL object from a native object, without its stored
756    /// (indexed) counterpart.
757    ///
758    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
759    /// which this `Object` was constructed in. This is stored on `Object`
760    /// so that when viewing that entity's state, it will be as if it was
761    /// read at the same checkpoint.
762    ///
763    /// `root_version` represents the version of the root object in some nested
764    /// chain of dynamic fields. This should typically be left `None`,
765    /// unless the object(s) being resolved is a dynamic field, or if
766    /// `root_version` has been explicitly set for this object. If None, then
767    /// we use [`version_for_dynamic_fields`] to infer a root version to then
768    /// propagate from this object down to its dynamic fields.
769    pub(crate) fn from_native(
770        address: IotaAddress,
771        native: NativeObject,
772        checkpoint_viewed_at: u64,
773        root_version: Option<u64>,
774    ) -> Object {
775        let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
776        Object {
777            address,
778            kind: ObjectKind::NotIndexed(native),
779            checkpoint_viewed_at,
780            root_version,
781        }
782    }
783
784    pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
785        use ObjectKind as K;
786
787        match &self.kind {
788            K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
789            K::WrappedOrDeleted(_) => None,
790        }
791    }
792
793    pub(crate) fn version_impl(&self) -> u64 {
794        use ObjectKind as K;
795
796        match &self.kind {
797            K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
798            K::WrappedOrDeleted(object_version) => *object_version,
799        }
800    }
801
802    /// Root parent object version for dynamic fields.
803    ///
804    /// Check [`Object::root_version`] for details.
805    pub(crate) fn root_version(&self) -> u64 {
806        self.root_version
807    }
808
809    /// Query the database for a `page` of objects, optionally `filter`-ed.
810    ///
811    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
812    /// which this page was queried for. Each entity returned in the
813    /// connection will inherit this checkpoint, so that when viewing that
814    /// entity's state, it will be as if it was read at the same checkpoint.
815    pub(crate) async fn paginate(
816        db: &Db,
817        page: Page<Cursor>,
818        filter: ObjectFilter,
819        checkpoint_viewed_at: u64,
820    ) -> Result<Connection<String, Object>, Error> {
821        Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
822    }
823
824    /// Query the database for a `page` of some sub-type of Object. The page
825    /// uses the bytes of an Object ID and the checkpoint when the query was
826    /// made as the cursor, and can optionally be further `filter`-ed. The
827    /// subtype is created using the `downcast` function, which is allowed
828    /// to fail, if the downcast has failed.
829    ///
830    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
831    /// which this page was queried for. Each entity returned in the
832    /// connection will inherit this checkpoint, so that when viewing that
833    /// entity's state, it will be as if it was read at the same checkpoint.
834    ///
835    /// If a `Page<Cursor>` is also provided, then this function will defer to
836    /// the `checkpoint_viewed_at` in the cursors. Otherwise, use the value
837    /// from the parameter, or set to None. This is so that paginated
838    /// queries are consistent with the previous query that created the
839    /// cursor.
840    pub(crate) async fn paginate_subtype<T: OutputType>(
841        db: &Db,
842        page: Page<Cursor>,
843        filter: ObjectFilter,
844        checkpoint_viewed_at: u64,
845        downcast: impl Fn(Object) -> Result<T, Error>,
846    ) -> Result<Connection<String, T>, Error> {
847        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
848        // they are consistent. Otherwise, use the value from the parameter, or
849        // set to None. This is so that paginated queries are consistent with
850        // the previous query that created the cursor.
851        let cursor_viewed_at = page.validate_cursor_consistency()?;
852        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
853
854        let Some((prev, next, results)) = db
855            .execute_repeatable(move |conn| {
856                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
857                    return Ok::<_, diesel::result::Error>(None);
858                };
859
860                Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
861                    conn,
862                    checkpoint_viewed_at,
863                    objects_query(&filter, range, &page),
864                )?))
865            })
866            .await?
867        else {
868            return Err(Error::Client(
869                "Requested data is outside the available range".to_string(),
870            ));
871        };
872
873        let mut conn: Connection<String, T> = Connection::new(prev, next);
874
875        for stored in results {
876            // To maintain consistency, the returned cursor should have the same upper-bound
877            // as the checkpoint found on the cursor.
878            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
879            let object =
880                Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
881            conn.edges.push(Edge::new(cursor, downcast(object)?));
882        }
883
884        Ok(conn)
885    }
886
887    /// Look-up the latest version of the object as of a given checkpoint.
888    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
889        ObjectLookup::LatestAt {
890            checkpoint_viewed_at,
891        }
892    }
893
894    /// Look-up the latest version of an object whose version is less than or
895    /// equal to its parent's version, as of a given checkpoint.
896    pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
897        ObjectLookup::UnderParent {
898            parent_version,
899            checkpoint_viewed_at,
900        }
901    }
902
903    /// Look-up a specific version of the object, as of a given checkpoint.
904    pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
905        ObjectLookup::VersionAt {
906            version,
907            checkpoint_viewed_at,
908        }
909    }
910
911    pub(crate) async fn query(
912        ctx: &Context<'_>,
913        id: IotaAddress,
914        key: ObjectLookup,
915    ) -> Result<Option<Self>, Error> {
916        let DataLoader(loader) = &ctx.data_unchecked();
917
918        match key {
919            ObjectLookup::VersionAt {
920                version,
921                checkpoint_viewed_at,
922            } => {
923                loader
924                    .load_one(HistoricalKey {
925                        id,
926                        version,
927                        checkpoint_viewed_at,
928                    })
929                    .await
930            }
931
932            ObjectLookup::UnderParent {
933                parent_version,
934                checkpoint_viewed_at,
935            } => {
936                loader
937                    .load_one(ParentVersionKey {
938                        id,
939                        parent_version,
940                        checkpoint_viewed_at,
941                    })
942                    .await
943            }
944
945            ObjectLookup::LatestAt {
946                checkpoint_viewed_at,
947            } => {
948                loader
949                    .load_one(LatestAtKey {
950                        id,
951                        checkpoint_viewed_at,
952                    })
953                    .await
954            }
955        }
956    }
957
958    /// Query for a singleton object identified by its type. Note: the object is
959    /// assumed to be a singleton (we either find at least one object with
960    /// this type and then return it, or return nothing).
961    pub(crate) async fn query_singleton(
962        db: &Db,
963        type_: StructTag,
964        checkpoint_viewed_at: u64,
965    ) -> Result<Option<Object>, Error> {
966        let filter = ObjectFilter {
967            type_: Some(TypeFilter::ByType(type_)),
968            ..Default::default()
969        };
970
971        let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
972
973        Ok(connection.edges.into_iter().next().map(|edge| edge.node))
974    }
975
976    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
977    /// which this `Object` was constructed in. This is stored on `Object`
978    /// so that when viewing that entity's state, it will be as if it was
979    /// read at the same checkpoint.
980    ///
981    /// `root_version` represents the version of the root object in some nested
982    /// chain of dynamic fields. This should typically be left `None`,
983    /// unless the object(s) being resolved is a dynamic field, or if
984    /// `root_version` has been explicitly set for this object. If None, then
985    /// we use [`version_for_dynamic_fields`] to infer a root version to then
986    /// propagate from this object down to its dynamic fields.
987    pub(crate) fn try_from_stored_history_object(
988        history_object: StoredHistoryObject,
989        checkpoint_viewed_at: u64,
990        root_version: Option<u64>,
991    ) -> Result<Self, Error> {
992        let address = addr(&history_object.object_id)?;
993
994        let object_status =
995            NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
996                Error::Internal(format!(
997                    "Unknown object status {} for object {} at version {}",
998                    history_object.object_status, address, history_object.object_version
999                ))
1000            })?;
1001
1002        match object_status {
1003            NativeObjectStatus::Active => {
1004                let Some(serialized_object) = &history_object.serialized_object else {
1005                    return Err(Error::Internal(format!(
1006                        "Live object {} at version {} cannot have missing serialized_object field",
1007                        address, history_object.object_version
1008                    )));
1009                };
1010
1011                let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1012                    Error::Internal(format!("Failed to deserialize object {address}"))
1013                })?;
1014
1015                let root_version =
1016                    root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1017                Ok(Self {
1018                    address,
1019                    kind: ObjectKind::Indexed(native_object, history_object),
1020                    checkpoint_viewed_at,
1021                    root_version,
1022                })
1023            }
1024            NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1025                address,
1026                kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1027                checkpoint_viewed_at,
1028                root_version: history_object.object_version as u64,
1029            }),
1030        }
1031    }
1032}
1033
1034/// We're deliberately choosing to use a child object's version as the root
1035/// here, and letting the caller override it with the actual root object's
1036/// version if it has access to it.
1037///
1038/// Using the child object's version as the root means that we're seeing the
1039/// dynamic field tree under this object at the state resulting from the
1040/// transaction that produced this version.
1041///
1042/// See [`Object::root_version`] for more details on parent/child object version
1043/// mechanics.
1044fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1045    native.as_inner().version().into()
1046}
1047
1048impl ObjectFilter {
1049    /// Try to create a filter whose results are the intersection of objects in
1050    /// `self`'s results and objects in `other`'s results. This may not be
1051    /// possible if the resulting filter is inconsistent in some way (e.g. a
1052    /// filter that requires one field to be two different values
1053    /// simultaneously).
1054    pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1055        macro_rules! intersect {
1056            ($field:ident, $body:expr) => {
1057                intersect::field(self.$field, other.$field, $body)
1058            };
1059        }
1060
1061        // Treat `object_ids` and `object_keys` as a single filter on IDs, and
1062        // optionally versions, and compute the intersection of that.
1063        let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1064            let mut combined = BTreeMap::new();
1065
1066            for (id, v) in k {
1067                if let Some(w) = l.get(&id).copied() {
1068                    combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1069                }
1070            }
1071
1072            // If the intersection is empty, it means, there were some ID or Key filters in
1073            // both `self` and `other`, but they don't overlap, so the final
1074            // result is inconsistent.
1075            (!combined.is_empty()).then_some(combined)
1076        })?;
1077
1078        // Extract the ID and Key filters back out. At this point, we know that if there
1079        // were ID/Key filters in both `self` and `other`, then they intersected
1080        // to form a consistent set of constraints, so it is safe to interpret
1081        // the lack of any ID/Key filters respectively as a lack of that kind of
1082        // constraint, rather than a constraint on the empty set.
1083
1084        let object_ids = {
1085            let partition: Vec<_> = keys
1086                .iter()
1087                .flatten()
1088                .filter_map(|(id, v)| v.is_none().then_some(*id))
1089                .collect();
1090
1091            (!partition.is_empty()).then_some(partition)
1092        };
1093
1094        let object_keys = {
1095            let partition: Vec<_> = keys
1096                .iter()
1097                .flatten()
1098                .filter_map(|(id, v)| {
1099                    Some(ObjectKey {
1100                        object_id: *id,
1101                        version: (*v)?.into(),
1102                    })
1103                })
1104                .collect();
1105
1106            (!partition.is_empty()).then_some(partition)
1107        };
1108
1109        Some(Self {
1110            type_: intersect!(type_, TypeFilter::intersect)?,
1111            owner: intersect!(owner, intersect::by_eq)?,
1112            object_ids,
1113            object_keys,
1114        })
1115    }
1116
1117    /// Extract the Object ID and Key filters into one combined map from Object
1118    /// IDs in this filter, to the versions they should have (or None if the
1119    /// filter mentions the ID but no version for it).
1120    fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1121        if self.object_keys.is_none() && self.object_ids.is_none() {
1122            return None;
1123        }
1124
1125        Some(BTreeMap::from_iter(
1126            self.object_keys
1127                .iter()
1128                .flatten()
1129                .map(|key| (key.object_id, Some(key.version.into())))
1130                // Chain ID filters after Key filters so if there is overlap, we overwrite the key
1131                // filter with the ID filter.
1132                .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1133        ))
1134    }
1135
1136    /// Applies ObjectFilter to the input `RawQuery` and returns a new
1137    /// `RawQuery`.
1138    pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1139        // Start by applying the filters on IDs and/or keys because they are combined as
1140        // a disjunction, while the remaining queries are conjunctions.
1141        if let Some(object_ids) = &self.object_ids {
1142            // Maximally strict - match a vec of 0 elements
1143            if object_ids.is_empty() {
1144                query = or_filter!(query, "1=0");
1145            } else {
1146                let mut inner = String::new();
1147                let mut prefix = "object_id IN (";
1148                for id in object_ids {
1149                    // SAFETY: Writing to a `String` cannot fail.
1150                    write!(
1151                        &mut inner,
1152                        "{prefix}'\\x{}'::bytea",
1153                        hex::encode(id.into_vec())
1154                    )
1155                    .unwrap();
1156                    prefix = ", ";
1157                }
1158                inner.push(')');
1159                query = or_filter!(query, inner);
1160            }
1161        }
1162
1163        if let Some(object_keys) = &self.object_keys {
1164            // Maximally strict - match a vec of 0 elements
1165            if object_keys.is_empty() {
1166                query = or_filter!(query, "1=0");
1167            } else {
1168                let mut inner = String::new();
1169                let mut prefix = "(";
1170                for ObjectKey { object_id, version } in object_keys {
1171                    // SAFETY: Writing to a `String` cannot fail.
1172                    write!(
1173                        &mut inner,
1174                        "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1175                        hex::encode(object_id.into_vec()),
1176                        version
1177                    )
1178                    .unwrap();
1179                    prefix = " OR ";
1180                }
1181                inner.push(')');
1182                query = or_filter!(query, inner);
1183            }
1184        }
1185
1186        if let Some(owner) = self.owner {
1187            query = filter!(
1188                query,
1189                format!(
1190                    "owner_id = '\\x{}'::bytea AND owner_type = {}",
1191                    hex::encode(owner.into_vec()),
1192                    OwnerType::Address as i16
1193                )
1194            );
1195        }
1196
1197        if let Some(type_) = &self.type_ {
1198            return type_.apply_raw(
1199                query,
1200                "object_type",
1201                "object_type_package",
1202                "object_type_module",
1203                "object_type_name",
1204            );
1205        }
1206
1207        query
1208    }
1209
1210    pub(crate) fn has_filters(&self) -> bool {
1211        self != &Default::default()
1212    }
1213}
1214
1215impl HistoricalObjectCursor {
1216    pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1217        Self {
1218            object_id,
1219            checkpoint_viewed_at,
1220        }
1221    }
1222}
1223
1224impl Checkpointed for Cursor {
1225    fn checkpoint_viewed_at(&self) -> u64 {
1226        self.checkpoint_viewed_at
1227    }
1228}
1229
1230impl ScanLimited for Cursor {}
1231
1232impl RawPaginated<Cursor> for StoredHistoryObject {
1233    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1234        filter!(
1235            query,
1236            format!(
1237                "candidates.object_id >= '\\x{}'::bytea",
1238                hex::encode(cursor.object_id.clone())
1239            )
1240        )
1241    }
1242
1243    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1244        filter!(
1245            query,
1246            format!(
1247                "candidates.object_id <= '\\x{}'::bytea",
1248                hex::encode(cursor.object_id.clone())
1249            )
1250        )
1251    }
1252
1253    fn order(asc: bool, query: RawQuery) -> RawQuery {
1254        if asc {
1255            query.order_by("candidates.object_id ASC")
1256        } else {
1257            query.order_by("candidates.object_id DESC")
1258        }
1259    }
1260}
1261
1262impl Target<Cursor> for StoredHistoryObject {
1263    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1264        Cursor::new(HistoricalObjectCursor::new(
1265            self.object_id.clone(),
1266            checkpoint_viewed_at,
1267        ))
1268    }
1269}
1270
1271impl Loader<HistoricalKey> for Db {
1272    type Value = Object;
1273    type Error = Error;
1274
1275    async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1276        use objects_history::dsl as h;
1277        use objects_version::dsl as v;
1278
1279        let id_versions: BTreeSet<_> = keys
1280            .iter()
1281            .map(|key| (key.id.into_vec(), key.version as i64))
1282            .collect();
1283
1284        let objects: Vec<StoredHistoryObject> = self
1285            .execute(move |conn| {
1286                conn.results(move || {
1287                    let mut query = h::objects_history
1288                        .inner_join(
1289                            v::objects_version.on(v::cp_sequence_number
1290                                .eq(h::checkpoint_sequence_number)
1291                                .and(v::object_id.eq(h::object_id))
1292                                .and(v::object_version.eq(h::object_version))),
1293                        )
1294                        .select(StoredHistoryObject::as_select())
1295                        .into_boxed();
1296
1297                    for (id, version) in id_versions.iter().cloned() {
1298                        query =
1299                            query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1300                    }
1301
1302                    query
1303                })
1304            })
1305            .await
1306            .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1307
1308        let mut id_version_to_stored = BTreeMap::new();
1309        for stored in objects {
1310            let key = (addr(&stored.object_id)?, stored.object_version as u64);
1311            id_version_to_stored.insert(key, stored);
1312        }
1313
1314        let mut result = HashMap::new();
1315        for key in keys {
1316            let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1317                continue;
1318            };
1319
1320            // Filter by key's checkpoint viewed at here. Doing this in memory because it
1321            // should be quite rare that this query actually filters something,
1322            // but encoding it in SQL is complicated.
1323            if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1324                continue;
1325            }
1326
1327            let object = Object::try_from_stored_history_object(
1328                stored.clone(),
1329                key.checkpoint_viewed_at,
1330                // This conversion will use the object's own version as the `Object::root_version`.
1331                None,
1332            )?;
1333            result.insert(*key, object);
1334        }
1335
1336        Ok(result)
1337    }
1338}
1339
1340impl Loader<ParentVersionKey> for Db {
1341    type Value = Object;
1342    type Error = Error;
1343
1344    async fn load(
1345        &self,
1346        keys: &[ParentVersionKey],
1347    ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1348        // Group keys by checkpoint viewed at and parent version -- we'll issue a
1349        // separate query for each group.
1350        #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1351        struct GroupKey {
1352            checkpoint_viewed_at: u64,
1353            parent_version: u64,
1354        }
1355
1356        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1357        for key in keys {
1358            let group_key = GroupKey {
1359                checkpoint_viewed_at: key.checkpoint_viewed_at,
1360                parent_version: key.parent_version,
1361            };
1362
1363            keys_by_cursor_and_parent_version
1364                .entry(group_key)
1365                .or_default()
1366                .insert(key.id.into_vec());
1367        }
1368
1369        // Issue concurrent reads for each group of keys.
1370        let futures = keys_by_cursor_and_parent_version
1371            .into_iter()
1372            .map(|(group_key, ids)| {
1373                self.execute(move |conn| {
1374                    let stored: Vec<StoredHistoryObject> = conn.results(move || {
1375                        use objects_history::dsl as h;
1376                        use objects_version::dsl as v;
1377
1378                        h::objects_history
1379                            .inner_join(
1380                                v::objects_version.on(v::cp_sequence_number
1381                                    .eq(h::checkpoint_sequence_number)
1382                                    .and(v::object_id.eq(h::object_id))
1383                                    .and(v::object_version.eq(h::object_version))),
1384                            )
1385                            .select(StoredHistoryObject::as_select())
1386                            .filter(v::object_id.eq_any(ids.iter().cloned()))
1387                            .filter(v::object_version.le(group_key.parent_version as i64))
1388                            .distinct_on(v::object_id)
1389                            .order_by(v::object_id)
1390                            .then_order_by(v::object_version.desc())
1391                            .into_boxed()
1392                    })?;
1393
1394                    Ok::<_, diesel::result::Error>(
1395                        stored
1396                            .into_iter()
1397                            .map(|stored| (group_key, stored))
1398                            .collect::<Vec<_>>(),
1399                    )
1400                })
1401            });
1402
1403        // Wait for the reads to all finish, and gather them into the result map.
1404        let groups = futures::future::join_all(futures).await;
1405
1406        let mut results = HashMap::new();
1407        for group in groups {
1408            for (group_key, stored) in
1409                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1410            {
1411                // This particular object is invalid -- it didn't exist at the checkpoint we are
1412                // viewing at.
1413                if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1414                    continue;
1415                }
1416
1417                let object = Object::try_from_stored_history_object(
1418                    stored,
1419                    group_key.checkpoint_viewed_at,
1420                    // If `LatestAtKey::parent_version` is set, it must have been correctly
1421                    // propagated from the `Object::root_version` of some object.
1422                    Some(group_key.parent_version),
1423                )?;
1424
1425                let key = ParentVersionKey {
1426                    id: object.address,
1427                    checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1428                    parent_version: group_key.parent_version,
1429                };
1430
1431                results.insert(key, object);
1432            }
1433        }
1434
1435        Ok(results)
1436    }
1437}
1438
1439impl Loader<LatestAtKey> for Db {
1440    type Value = Object;
1441    type Error = Error;
1442
1443    async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1444        // Group keys by checkpoint viewed at -- we'll issue a separate query for each
1445        // group.
1446        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1447
1448        for key in keys {
1449            keys_by_cursor_and_parent_version
1450                .entry(key.checkpoint_viewed_at)
1451                .or_default()
1452                .insert(key.id);
1453        }
1454
1455        // Issue concurrent reads for each group of keys.
1456        let futures =
1457            keys_by_cursor_and_parent_version
1458                .into_iter()
1459                .map(|(checkpoint_viewed_at, ids)| {
1460                    self.execute_repeatable(move |conn| {
1461                        let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1462                        else {
1463                            return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1464                                vec![],
1465                            );
1466                        };
1467
1468                        let filter = ObjectFilter {
1469                            object_ids: Some(ids.iter().cloned().collect()),
1470                            ..Default::default()
1471                        };
1472
1473                        Ok(conn
1474                            .results(move || {
1475                                build_objects_query(
1476                                    View::Consistent,
1477                                    range,
1478                                    &Page::bounded(ids.len() as u64),
1479                                    |q| filter.apply(q),
1480                                    |q| q,
1481                                )
1482                                .into_boxed()
1483                            })?
1484                            .into_iter()
1485                            .map(|r| (checkpoint_viewed_at, r))
1486                            .collect())
1487                    })
1488                });
1489
1490        // Wait for the reads to all finish, and gather them into the result map.
1491        let groups = futures::future::join_all(futures).await;
1492
1493        let mut results = HashMap::new();
1494        for group in groups {
1495            for (checkpoint_viewed_at, stored) in
1496                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1497            {
1498                let object =
1499                    Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1500
1501                let key = LatestAtKey {
1502                    id: object.address,
1503                    checkpoint_viewed_at,
1504                };
1505
1506                results.insert(key, object);
1507            }
1508        }
1509
1510        Ok(results)
1511    }
1512}
1513
1514impl From<&ObjectKind> for ObjectStatus {
1515    fn from(kind: &ObjectKind) -> Self {
1516        match kind {
1517            ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1518            ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1519            ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1520        }
1521    }
1522}
1523
1524impl From<&Object> for OwnerImpl {
1525    fn from(object: &Object) -> Self {
1526        OwnerImpl {
1527            address: object.address,
1528            checkpoint_viewed_at: object.checkpoint_viewed_at,
1529        }
1530    }
1531}
1532
1533pub(crate) async fn deserialize_move_struct(
1534    move_object: &NativeMoveObject,
1535    resolver: &PackageResolver,
1536) -> Result<(StructTag, MoveStruct), Error> {
1537    let struct_tag = StructTag::from(move_object.type_().clone());
1538    let contents = move_object.contents();
1539    let move_type_layout = resolver
1540        .type_layout(TypeTag::from(struct_tag.clone()))
1541        .await
1542        .map_err(|e| {
1543            Error::Internal(format!(
1544                "Error fetching layout for type {}: {e}",
1545                struct_tag.to_canonical_string(/* with_prefix */ true)
1546            ))
1547        })?;
1548
1549    let MoveTypeLayout::Struct(layout) = move_type_layout else {
1550        return Err(Error::Internal("Object is not a move struct".to_string()));
1551    };
1552
1553    // TODO (annotated-visitor): Use custom visitors for extracting a dynamic field,
1554    // and for creating a GraphQL MoveValue directly (not via an annotated
1555    // visitor).
1556    let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1557        Error::Internal(format!(
1558            "Error deserializing move struct for type {}: {e}",
1559            struct_tag.to_canonical_string(/* with_prefix */ true)
1560        ))
1561    })?;
1562
1563    Ok((struct_tag, move_struct))
1564}
1565
1566/// Constructs a raw query to fetch objects from the database. Objects are
1567/// filtered out if they satisfy the criteria but have a later version in the
1568/// same checkpoint. If object keys are provided, or no filters are specified at
1569/// all, then this final condition is not applied.
1570fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1571where
1572{
1573    if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1574        // If both object IDs and object keys are specified, then we need to query in
1575        // both historical and consistent views, and then union the results.
1576        let ids_only_filter = ObjectFilter {
1577            object_keys: None,
1578            ..filter.clone()
1579        };
1580        let (id_query, id_bindings) = build_objects_query(
1581            View::Consistent,
1582            range,
1583            page,
1584            move |query| ids_only_filter.apply(query),
1585            move |newer| newer,
1586        )
1587        .finish();
1588
1589        let keys_only_filter = ObjectFilter {
1590            object_ids: None,
1591            ..filter.clone()
1592        };
1593        let (key_query, key_bindings) = build_objects_query(
1594            View::Historical,
1595            range,
1596            page,
1597            move |query| keys_only_filter.apply(query),
1598            move |newer| newer,
1599        )
1600        .finish();
1601
1602        RawQuery::new(
1603            format!(
1604                "SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",
1605                id_query = id_query,
1606                key_query = key_query,
1607            ),
1608            id_bindings.into_iter().chain(key_bindings).collect(),
1609        )
1610        .order_by("object_id")
1611        .limit(page.limit() as i64)
1612    } else {
1613        // Only one of object IDs or object keys is specified, or neither are specified.
1614        let view = if filter.object_keys.is_some() || !filter.has_filters() {
1615            View::Historical
1616        } else {
1617            View::Consistent
1618        };
1619
1620        build_objects_query(
1621            view,
1622            range,
1623            page,
1624            move |query| filter.apply(query),
1625            move |newer| newer,
1626        )
1627    }
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use std::str::FromStr;
1633
1634    use super::*;
1635
1636    #[test]
1637    fn test_owner_filter_intersection() {
1638        let f0 = ObjectFilter {
1639            owner: Some(IotaAddress::from_str("0x1").unwrap()),
1640            ..Default::default()
1641        };
1642
1643        let f1 = ObjectFilter {
1644            owner: Some(IotaAddress::from_str("0x2").unwrap()),
1645            ..Default::default()
1646        };
1647
1648        assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1649        assert_eq!(f0.clone().intersect(f1.clone()), None);
1650    }
1651
1652    #[test]
1653    fn test_key_filter_intersection() {
1654        let i1 = IotaAddress::from_str("0x1").unwrap();
1655        let i2 = IotaAddress::from_str("0x2").unwrap();
1656        let i3 = IotaAddress::from_str("0x3").unwrap();
1657        let i4 = IotaAddress::from_str("0x4").unwrap();
1658
1659        let f0 = ObjectFilter {
1660            object_ids: Some(vec![i1, i3]),
1661            object_keys: Some(vec![
1662                ObjectKey {
1663                    object_id: i2,
1664                    version: 1.into(),
1665                },
1666                ObjectKey {
1667                    object_id: i4,
1668                    version: 2.into(),
1669                },
1670            ]),
1671            ..Default::default()
1672        };
1673
1674        let f1 = ObjectFilter {
1675            object_ids: Some(vec![i1, i2]),
1676            object_keys: Some(vec![ObjectKey {
1677                object_id: i4,
1678                version: 2.into(),
1679            }]),
1680            ..Default::default()
1681        };
1682
1683        let f2 = ObjectFilter {
1684            object_ids: Some(vec![i1, i3]),
1685            ..Default::default()
1686        };
1687
1688        let f3 = ObjectFilter {
1689            object_keys: Some(vec![
1690                ObjectKey {
1691                    object_id: i2,
1692                    version: 2.into(),
1693                },
1694                ObjectKey {
1695                    object_id: i4,
1696                    version: 2.into(),
1697                },
1698            ]),
1699            ..Default::default()
1700        };
1701
1702        assert_eq!(
1703            f0.clone().intersect(f1.clone()),
1704            Some(ObjectFilter {
1705                object_ids: Some(vec![i1]),
1706                object_keys: Some(vec![
1707                    ObjectKey {
1708                        object_id: i2,
1709                        version: 1.into(),
1710                    },
1711                    ObjectKey {
1712                        object_id: i4,
1713                        version: 2.into(),
1714                    },
1715                ]),
1716                ..Default::default()
1717            })
1718        );
1719
1720        assert_eq!(
1721            f1.clone().intersect(f2.clone()),
1722            Some(ObjectFilter {
1723                object_ids: Some(vec![i1]),
1724                ..Default::default()
1725            })
1726        );
1727
1728        assert_eq!(
1729            f1.clone().intersect(f3.clone()),
1730            Some(ObjectFilter {
1731                object_keys: Some(vec![
1732                    ObjectKey {
1733                        object_id: i2,
1734                        version: 2.into(),
1735                    },
1736                    ObjectKey {
1737                        object_id: i4,
1738                        version: 2.into(),
1739                    },
1740                ]),
1741                ..Default::default()
1742            })
1743        );
1744
1745        // i2 got a conflicting version assignment
1746        assert_eq!(f0.clone().intersect(f3.clone()), None);
1747
1748        // No overlap between these two.
1749        assert_eq!(f2.clone().intersect(f3.clone()), None);
1750    }
1751}