iota_graphql_rpc/types/
object.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, BTreeSet, HashMap},
7    fmt::Write,
8};
9
10use async_graphql::{
11    connection::{Connection, CursorType, Edge},
12    dataloader::Loader,
13    *,
14};
15use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
16use iota_indexer::{
17    models::objects::{StoredHistoryObject, StoredObject},
18    schema::{objects, objects_history, objects_version},
19    types::{ObjectStatus as NativeObjectStatus, OwnerType},
20};
21use iota_types::{
22    TypeTag,
23    object::{
24        MoveObject as NativeMoveObject, Object as NativeObject, Owner as NativeOwner,
25        bounded_visitor::BoundedVisitor,
26    },
27};
28use move_core_types::{
29    annotated_value::{MoveStruct, MoveTypeLayout},
30    language_storage::StructTag,
31};
32use serde::{Deserialize, Serialize};
33
34use crate::{
35    connection::ScanConnection,
36    consistency::{Checkpointed, View, build_objects_query},
37    data::{DataLoader, Db, DbConnection, QueryExecutor, package_resolver::PackageResolver},
38    error::Error,
39    filter, or_filter,
40    raw_query::RawQuery,
41    types::{
42        available_range::AvailableRange,
43        balance::{self, Balance},
44        base64::Base64,
45        big_int::BigInt,
46        coin::Coin,
47        coin_metadata::CoinMetadata,
48        cursor::{self, Page, RawPaginated, ScanLimited, Target},
49        digest::Digest,
50        display::{Display, DisplayEntry},
51        dynamic_field::{DynamicField, DynamicFieldName},
52        intersect,
53        iota_address::{IotaAddress, addr},
54        iota_names_registration::{NameFormat, NameRegistration},
55        move_object::MoveObject,
56        move_package::MovePackage,
57        owner::{Owner, OwnerImpl},
58        stake::StakedIota,
59        transaction_block,
60        transaction_block::{TransactionBlock, TransactionBlockFilter},
61        type_filter::{ExactTypeFilter, TypeFilter},
62        uint53::UInt53,
63    },
64};
65
66#[derive(Clone, Debug)]
67pub(crate) struct Object {
68    pub address: IotaAddress,
69    pub kind: ObjectKind,
70    /// The checkpoint sequence number at which this was viewed at.
71    pub checkpoint_viewed_at: u64,
72    /// Root parent object version for dynamic fields.
73    ///
74    /// This enables consistent dynamic field reads in the case of chained
75    /// dynamic object fields, e.g., `Parent -> DOF1 -> DOF2`. In such
76    /// cases, the object versions may end up like `Parent >= DOF1, DOF2`
77    /// but `DOF1 < DOF2`. Thus, database queries for dynamic fields must
78    /// bound the object versions by the version of the root object of the tree.
79    ///
80    /// Essentially, lamport timestamps of objects are updated for all top-level
81    /// mutable objects provided as inputs to a transaction as well as any
82    /// mutated dynamic child objects. However, any dynamic child objects
83    /// that were loaded but not actually mutated don't end up having
84    /// their versions updated.
85    root_version: u64,
86}
87
88/// Type to implement GraphQL fields that are shared by all Objects.
89pub(crate) struct ObjectImpl<'o>(pub &'o Object);
90
91#[derive(Clone, Debug)]
92#[expect(clippy::large_enum_variant)]
93pub(crate) enum ObjectKind {
94    /// An object loaded from serialized data, such as the contents of a
95    /// transaction that hasn't been indexed yet.
96    NotIndexed(NativeObject),
97    /// An object fetched from the index.
98    Indexed(NativeObject, StoredHistoryObject),
99    /// The object is wrapped or deleted and only partial information can be
100    /// loaded from the indexer. The `u64` is the version of the object.
101    WrappedOrDeleted(u64),
102}
103
104#[derive(Enum, Copy, Clone, Eq, PartialEq, Debug)]
105#[graphql(name = "ObjectKind")]
106pub enum ObjectStatus {
107    /// The object is loaded from serialized data, such as the contents of a
108    /// transaction that hasn't been indexed yet.
109    NotIndexed,
110    /// The object is fetched from the index.
111    Indexed,
112    /// The object is deleted or wrapped and only partial information can be
113    /// loaded from the indexer.
114    WrappedOrDeleted,
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, InputObject)]
118pub(crate) struct ObjectRef {
119    /// ID of the object.
120    pub address: IotaAddress,
121    /// Version or sequence number of the object.
122    pub version: UInt53,
123    /// Digest of the object.
124    pub digest: Digest,
125}
126
127/// Constrains the set of objects returned. All filters are optional, and the
128/// resulting set of objects are ones whose
129///
130/// - Type matches the `type` filter,
131/// - AND, whose owner matches the `owner` filter,
132/// - AND, whose ID is in `objectIds` OR whose ID and version is in
133///   `objectKeys`.
134#[derive(InputObject, Default, Debug, Clone, Eq, PartialEq)]
135pub(crate) struct ObjectFilter {
136    /// This field is used to specify the type of objects that should be
137    /// included in the query results.
138    ///
139    /// Objects can be filtered by their type's package, package::module, or
140    /// their fully qualified type name.
141    ///
142    /// Generic types can be queried by either the generic type name, e.g.
143    /// `0x2::coin::Coin`, or by the full type name, such as
144    /// `0x2::coin::Coin<0x2::iota::IOTA>`.
145    pub type_: Option<TypeFilter>,
146
147    /// Filter for live objects by their current owners.
148    pub owner: Option<IotaAddress>,
149
150    /// Filter for live objects by their IDs.
151    pub object_ids: Option<Vec<IotaAddress>>,
152
153    /// Filter for live or potentially historical objects by their ID and
154    /// version.
155    pub object_keys: Option<Vec<ObjectKey>>,
156}
157
158#[derive(InputObject, Debug, Clone, Eq, PartialEq)]
159pub(crate) struct ObjectKey {
160    pub object_id: IotaAddress,
161    pub version: UInt53,
162}
163
164/// The object's owner type: Immutable, Shared, Parent, or Address.
165#[derive(Union, Clone)]
166pub(crate) enum ObjectOwner {
167    Immutable(Immutable),
168    Shared(Shared),
169    Parent(Box<Parent>),
170    Address(AddressOwner),
171}
172
173/// An immutable object is an object that can't be mutated, transferred, or
174/// deleted. Immutable objects have no owner, so anyone can use them.
175#[derive(SimpleObject, Clone)]
176pub(crate) struct Immutable {
177    #[graphql(name = "_")]
178    dummy: Option<bool>,
179}
180
181/// A shared object is an object that is shared using the
182/// 0x2::transfer::share_object function. Unlike owned objects, once an object
183/// is shared, it stays mutable and is accessible by anyone.
184#[derive(SimpleObject, Clone)]
185pub(crate) struct Shared {
186    initial_shared_version: UInt53,
187}
188
189/// If the object's owner is a Parent, this object is part of a dynamic field
190/// (it is the value of the dynamic field, or the intermediate Field object
191/// itself). Also note that if the owner is a parent, then it's guaranteed to be
192/// an object.
193#[derive(SimpleObject, Clone)]
194pub(crate) struct Parent {
195    parent: Option<Object>,
196}
197
198/// An address-owned object is owned by a specific 32-byte address that is
199/// either an account address (derived from a particular signature scheme) or
200/// an object ID. An address-owned object is accessible only to its owner and no
201/// others.
202#[derive(SimpleObject, Clone)]
203pub(crate) struct AddressOwner {
204    owner: Option<Owner>,
205}
206
207/// Filter for a point query of an Object.
208pub(crate) enum ObjectLookup {
209    LatestAt {
210        /// The checkpoint sequence number at which this was viewed at.
211        checkpoint_viewed_at: u64,
212    },
213
214    UnderParent {
215        /// The parent version to be used as an upper bound for the query. Look
216        /// for the latest version of a child object whose version is
217        /// less than or equal to this upper bound.
218        parent_version: u64,
219        /// The checkpoint sequence number at which this was viewed at.
220        checkpoint_viewed_at: u64,
221    },
222
223    VersionAt {
224        /// The exact version of the object to be fetched.
225        version: u64,
226        /// The checkpoint sequence number at which this was viewed at.
227        checkpoint_viewed_at: u64,
228    },
229
230    /// Variant analogous to [`VersionAt`](Self::VersionAt) but for optimistic
231    /// transactions, using the most recent not checkpointed data,
232    /// not bound by any checkpoint sequence number.
233    OptimisticVersion {
234        /// The exact version of the object to be fetched.
235        version: u64,
236    },
237}
238
239pub(crate) type Cursor = cursor::BcsCursor<HistoricalObjectCursor>;
240
241/// The inner struct for the `Object`'s cursor. The `object_id` is used as the
242/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
243/// the cursor.
244#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
245pub(crate) struct HistoricalObjectCursor {
246    #[serde(rename = "o")]
247    object_id: Vec<u8>,
248    /// The checkpoint sequence number this was viewed at.
249    #[serde(rename = "c")]
250    checkpoint_viewed_at: u64,
251}
252
253/// Interface implemented by on-chain values that are addressable by an ID (also
254/// referred to as its address). This includes Move objects and packages.
255#[expect(clippy::duplicated_attributes)]
256#[derive(Interface)]
257#[graphql(
258    name = "IObject",
259    field(name = "version", ty = "UInt53"),
260    field(
261        name = "status",
262        ty = "ObjectStatus",
263        desc = r#"
264            The current status of the object as read from the off-chain store. The
265            possible states are:
266            - NOT_INDEXED: The object is loaded from serialized data, such as the
267            contents of a genesis or system package upgrade transaction.
268            - INDEXED: The object is retrieved from the off-chain index and
269            represents the most recent or historical state of the object.
270            - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
271            information can be loaded.
272        "#
273    ),
274    field(
275        name = "digest",
276        ty = "Option<String>",
277        desc = "32-byte hash that identifies the object's current contents, encoded as a Base58 \
278                string."
279    ),
280    field(
281        name = "owner",
282        ty = "Option<ObjectOwner>",
283        desc = "The owner type of this object: Immutable, Shared, Parent, Address\n\
284                Immutable and Shared Objects do not have owners."
285    ),
286    field(
287        name = "previous_transaction_block",
288        ty = "Option<TransactionBlock>",
289        desc = "The transaction block that created this version of the object."
290    ),
291    field(name = "storage_rebate", ty = "Option<BigInt>", desc = "",),
292    field(
293        name = "received_transaction_blocks",
294        arg(name = "first", ty = "Option<u64>"),
295        arg(name = "after", ty = "Option<transaction_block::Cursor>"),
296        arg(name = "last", ty = "Option<u64>"),
297        arg(name = "before", ty = "Option<transaction_block::Cursor>"),
298        arg(name = "filter", ty = "Option<TransactionBlockFilter>"),
299        arg(name = "scan_limit", ty = "Option<u64>"),
300        ty = "ScanConnection<String, TransactionBlock>",
301        desc = "The transaction blocks that sent objects to this object."
302    ),
303    field(
304        name = "bcs",
305        ty = "Option<Base64>",
306        desc = "The Base64-encoded BCS serialization of the object's content."
307    )
308)]
309pub(crate) enum IObject {
310    Object(Object),
311    MovePackage(MovePackage),
312    MoveObject(MoveObject),
313    Coin(Coin),
314    CoinMetadata(CoinMetadata),
315    StakedIota(StakedIota),
316}
317
318/// `DataLoader` key for fetching an `Object` at a specific version, constrained
319/// by a consistency cursor (if that version was created after the checkpoint
320/// the query is viewing at, then it will fail).
321#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
322struct HistoricalKey {
323    id: IotaAddress,
324    version: u64,
325    checkpoint_viewed_at: u64,
326}
327
328/// `DataLoader` key for fetching objects that haven't been checkpointed yet.
329/// This is used specifically for loading objects
330/// that are part of optimistic transaction effects.
331#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
332struct OptimisticKey {
333    id: IotaAddress,
334    version: u64,
335}
336
337/// `DataLoader` key for fetching the latest version of an object whose parent
338/// object has version `parent_version`, as of `checkpoint_viewed_at`. This
339/// look-up can fail to find a valid object if the key is not self-consistent,
340/// for example if the `parent_version` is set to a higher version
341/// than the object's actual parent as of `checkpoint_viewed_at`.
342#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
343struct ParentVersionKey {
344    id: IotaAddress,
345    parent_version: u64,
346    checkpoint_viewed_at: u64,
347}
348
349/// `DataLoader` key for fetching the latest version of an object as of a given
350/// checkpoint.
351#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
352struct LatestAtKey {
353    id: IotaAddress,
354    checkpoint_viewed_at: u64,
355}
356
357/// An object in IOTA is a package (set of Move bytecode modules) or object
358/// (typed data structure with fields) with additional metadata detailing its
359/// id, version, transaction digest, owner field indicating how this object can
360/// be accessed.
361#[Object]
362impl Object {
363    pub(crate) async fn address(&self) -> IotaAddress {
364        OwnerImpl::from(self).address().await
365    }
366
367    /// Objects owned by this object, optionally `filter`-ed.
368    pub(crate) async fn objects(
369        &self,
370        ctx: &Context<'_>,
371        first: Option<u64>,
372        after: Option<Cursor>,
373        last: Option<u64>,
374        before: Option<Cursor>,
375        filter: Option<ObjectFilter>,
376    ) -> Result<Connection<String, MoveObject>> {
377        OwnerImpl::from(self)
378            .objects(ctx, first, after, last, before, filter)
379            .await
380    }
381
382    /// Total balance of all coins with marker type owned by this object. If
383    /// type is not supplied, it defaults to `0x2::iota::IOTA`.
384    pub(crate) async fn balance(
385        &self,
386        ctx: &Context<'_>,
387        type_: Option<ExactTypeFilter>,
388    ) -> Result<Option<Balance>> {
389        OwnerImpl::from(self).balance(ctx, type_).await
390    }
391
392    /// The balances of all coin types owned by this object.
393    pub(crate) async fn balances(
394        &self,
395        ctx: &Context<'_>,
396        first: Option<u64>,
397        after: Option<balance::Cursor>,
398        last: Option<u64>,
399        before: Option<balance::Cursor>,
400    ) -> Result<Connection<String, Balance>> {
401        OwnerImpl::from(self)
402            .balances(ctx, first, after, last, before)
403            .await
404    }
405
406    /// The coin objects for this object.
407    ///
408    /// `type` is a filter on the coin's type parameter, defaulting to
409    /// `0x2::iota::IOTA`.
410    pub(crate) async fn coins(
411        &self,
412        ctx: &Context<'_>,
413        first: Option<u64>,
414        after: Option<Cursor>,
415        last: Option<u64>,
416        before: Option<Cursor>,
417        type_: Option<ExactTypeFilter>,
418    ) -> Result<Connection<String, Coin>> {
419        OwnerImpl::from(self)
420            .coins(ctx, first, after, last, before, type_)
421            .await
422    }
423
424    /// The `0x3::staking_pool::StakedIota` objects owned by this object.
425    pub(crate) async fn staked_iotas(
426        &self,
427        ctx: &Context<'_>,
428        first: Option<u64>,
429        after: Option<Cursor>,
430        last: Option<u64>,
431        before: Option<Cursor>,
432    ) -> Result<Connection<String, StakedIota>> {
433        OwnerImpl::from(self)
434            .staked_iotas(ctx, first, after, last, before)
435            .await
436    }
437
438    /// The name explicitly configured as the default name pointing to this
439    /// address.
440    pub(crate) async fn iota_names_default_name(
441        &self,
442        ctx: &Context<'_>,
443        format: Option<NameFormat>,
444    ) -> Result<Option<String>> {
445        OwnerImpl::from(self)
446            .iota_names_default_name(ctx, format)
447            .await
448    }
449
450    /// The NameRegistration NFTs owned by this address. These grant the
451    /// owner the capability to manage the associated name.
452    pub(crate) async fn iota_names_registrations(
453        &self,
454        ctx: &Context<'_>,
455        first: Option<u64>,
456        after: Option<Cursor>,
457        last: Option<u64>,
458        before: Option<Cursor>,
459    ) -> Result<Connection<String, NameRegistration>> {
460        OwnerImpl::from(self)
461            .iota_names_registrations(ctx, first, after, last, before)
462            .await
463    }
464
465    pub(crate) async fn version(&self) -> UInt53 {
466        ObjectImpl(self).version().await
467    }
468
469    /// The current status of the object as read from the off-chain store. The
470    /// possible states are:
471    /// - NOT_INDEXED: The object is loaded from serialized data, such as the
472    ///   contents of a genesis or system package upgrade transaction.
473    /// - INDEXED: The object is retrieved from the off-chain index and
474    ///   represents the most recent or historical state of the object.
475    /// - WRAPPED_OR_DELETED: The object is deleted or wrapped and only partial
476    ///   information can be loaded.
477    pub(crate) async fn status(&self) -> ObjectStatus {
478        ObjectImpl(self).status().await
479    }
480
481    /// 32-byte hash that identifies the object's current contents, encoded as a
482    /// Base58 string.
483    pub(crate) async fn digest(&self) -> Option<String> {
484        ObjectImpl(self).digest().await
485    }
486
487    /// The owner type of this object: Immutable, Shared, Parent, Address
488    /// Immutable and Shared Objects do not have owners.
489    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
490        ObjectImpl(self).owner(ctx).await
491    }
492
493    /// The transaction block that created this version of the object.
494    pub(crate) async fn previous_transaction_block(
495        &self,
496        ctx: &Context<'_>,
497    ) -> Result<Option<TransactionBlock>> {
498        ObjectImpl(self).previous_transaction_block(ctx).await
499    }
500
501    /// The amount of IOTA we would rebate if this object gets deleted or
502    /// mutated. This number is recalculated based on the present storage
503    /// gas price.
504    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
505        ObjectImpl(self).storage_rebate().await
506    }
507
508    /// The transaction blocks that sent objects to this object.
509    ///
510    /// `scanLimit` restricts the number of candidate transactions scanned when
511    /// gathering a page of results. It is required for queries that apply
512    /// more than two complex filters (on function, kind, sender, recipient,
513    /// input object, changed object, or ids), and can be at most
514    /// `serviceConfig.maxScanLimit`.
515    ///
516    /// When the scan limit is reached the page will be returned even if it has
517    /// fewer than `first` results when paginating forward (`last` when
518    /// paginating backwards). If there are more transactions to scan,
519    /// `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
520    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set
521    /// to the last transaction that was scanned as opposed to the last (or
522    /// first) transaction in the page.
523    ///
524    /// Requesting the next (or previous) page after this cursor will resume the
525    /// search, scanning the next `scanLimit` many transactions in the
526    /// direction of pagination, and so on until all transactions in the
527    /// scanning range have been visited.
528    ///
529    /// By default, the scanning range includes all transactions known to
530    /// GraphQL, but it can be restricted by the `after` and `before`
531    /// cursors, and the `beforeCheckpoint`, `afterCheckpoint` and
532    /// `atCheckpoint` filters.
533    pub(crate) async fn received_transaction_blocks(
534        &self,
535        ctx: &Context<'_>,
536        first: Option<u64>,
537        after: Option<transaction_block::Cursor>,
538        last: Option<u64>,
539        before: Option<transaction_block::Cursor>,
540        filter: Option<TransactionBlockFilter>,
541        scan_limit: Option<u64>,
542    ) -> Result<ScanConnection<String, TransactionBlock>> {
543        ObjectImpl(self)
544            .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
545            .await
546    }
547
548    /// The Base64-encoded BCS serialization of the object's content.
549    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
550        ObjectImpl(self).bcs().await
551    }
552
553    /// The set of named templates defined on-chain for the type of this object,
554    /// to be handled off-chain. The server substitutes data from the object
555    /// into these templates to generate a display string per template.
556    async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
557        ObjectImpl(self).display(ctx).await
558    }
559
560    /// Access a dynamic field on an object using its name. Names are arbitrary
561    /// Move values whose type have `copy`, `drop`, and `store`, and are
562    /// specified using their type, and their BCS contents, Base64 encoded.
563    ///
564    /// Dynamic fields on wrapped objects can be accessed by using the same API
565    /// under the Owner type.
566    async fn dynamic_field(
567        &self,
568        ctx: &Context<'_>,
569        name: DynamicFieldName,
570    ) -> Result<Option<DynamicField>> {
571        OwnerImpl::from(self)
572            .dynamic_field(ctx, name, Some(self.root_version()))
573            .await
574    }
575
576    /// Access a dynamic object field on an object using its name. Names are
577    /// arbitrary Move values whose type have `copy`, `drop`, and `store`,
578    /// and are specified using their type, and their BCS contents, Base64
579    /// encoded. The value of a dynamic object field can also be accessed
580    /// off-chain directly via its address (e.g. using `Query.object`).
581    ///
582    /// Dynamic fields on wrapped objects can be accessed by using the same API
583    /// under the Owner type.
584    async fn dynamic_object_field(
585        &self,
586        ctx: &Context<'_>,
587        name: DynamicFieldName,
588    ) -> Result<Option<DynamicField>> {
589        OwnerImpl::from(self)
590            .dynamic_object_field(ctx, name, Some(self.root_version()))
591            .await
592    }
593
594    /// The dynamic fields and dynamic object fields on an object.
595    ///
596    /// Dynamic fields on wrapped objects can be accessed by using the same API
597    /// under the Owner type.
598    async fn dynamic_fields(
599        &self,
600        ctx: &Context<'_>,
601        first: Option<u64>,
602        after: Option<Cursor>,
603        last: Option<u64>,
604        before: Option<Cursor>,
605    ) -> Result<Connection<String, DynamicField>> {
606        OwnerImpl::from(self)
607            .dynamic_fields(ctx, first, after, last, before, Some(self.root_version()))
608            .await
609    }
610
611    /// Attempts to convert the object into a MoveObject
612    async fn as_move_object(&self) -> Option<MoveObject> {
613        MoveObject::try_from(self).ok()
614    }
615
616    /// Attempts to convert the object into a MovePackage
617    async fn as_move_package(&self) -> Option<MovePackage> {
618        MovePackage::try_from(self).ok()
619    }
620}
621
622impl ObjectImpl<'_> {
623    pub(crate) async fn version(&self) -> UInt53 {
624        self.0.version_impl().into()
625    }
626
627    pub(crate) async fn status(&self) -> ObjectStatus {
628        ObjectStatus::from(&self.0.kind)
629    }
630
631    pub(crate) async fn digest(&self) -> Option<String> {
632        self.0
633            .native_impl()
634            .map(|native| native.digest().base58_encode())
635    }
636
637    pub(crate) async fn owner(&self, ctx: &Context<'_>) -> Option<ObjectOwner> {
638        use NativeOwner as O;
639
640        let native = self.0.native_impl()?;
641
642        match native.owner {
643            O::AddressOwner(address) => {
644                let address = IotaAddress::from(address);
645                Some(ObjectOwner::Address(AddressOwner {
646                    owner: Some(Owner {
647                        address,
648                        checkpoint_viewed_at: self.0.checkpoint_viewed_at,
649                        root_version: None,
650                    }),
651                }))
652            }
653            O::Immutable => Some(ObjectOwner::Immutable(Immutable { dummy: None })),
654            O::ObjectOwner(address) => {
655                let parent = Object::query(
656                    ctx,
657                    address.into(),
658                    Object::latest_at(self.0.checkpoint_viewed_at),
659                )
660                .await
661                .ok()
662                .flatten();
663
664                Some(ObjectOwner::Parent(Box::new(Parent { parent })))
665            }
666            O::Shared {
667                initial_shared_version,
668            } => Some(ObjectOwner::Shared(Shared {
669                initial_shared_version: initial_shared_version.value().into(),
670            })),
671        }
672    }
673
674    pub(crate) async fn previous_transaction_block(
675        &self,
676        ctx: &Context<'_>,
677    ) -> Result<Option<TransactionBlock>> {
678        let Some(native) = self.0.native_impl() else {
679            return Ok(None);
680        };
681        let digest = native.previous_transaction;
682
683        TransactionBlock::query(ctx, digest.into(), self.0.checkpoint_viewed_at)
684            .await
685            .extend()
686    }
687
688    pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
689        self.0
690            .native_impl()
691            .map(|native| BigInt::from(native.storage_rebate))
692    }
693
694    pub(crate) async fn received_transaction_blocks(
695        &self,
696        ctx: &Context<'_>,
697        first: Option<u64>,
698        after: Option<transaction_block::Cursor>,
699        last: Option<u64>,
700        before: Option<transaction_block::Cursor>,
701        filter: Option<TransactionBlockFilter>,
702        scan_limit: Option<u64>,
703    ) -> Result<ScanConnection<String, TransactionBlock>> {
704        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
705
706        let Some(filter) = filter
707            .unwrap_or_default()
708            .intersect(TransactionBlockFilter {
709                recv_address: Some(self.0.address),
710                ..Default::default()
711            })
712        else {
713            return Ok(ScanConnection::new(false, false));
714        };
715
716        TransactionBlock::paginate(ctx, page, filter, self.0.checkpoint_viewed_at, scan_limit)
717            .await
718            .extend()
719    }
720
721    pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
722        use ObjectKind as K;
723        Ok(match &self.0.kind {
724            K::WrappedOrDeleted(_) => None,
725            // WrappedOrDeleted objects are also read from the historical objects table, and they do
726            // not have a serialized object, so the column is also nullable for stored historical
727            // objects.
728            K::Indexed(_, stored) => stored.serialized_object.as_ref().map(Base64::from),
729
730            K::NotIndexed(native) => {
731                let bytes = bcs::to_bytes(native)
732                    .map_err(|e| {
733                        Error::Internal(format!(
734                            "Failed to serialize object at {}: {e}",
735                            self.0.address
736                        ))
737                    })
738                    .extend()?;
739                Some(Base64::from(&bytes))
740            }
741        })
742    }
743
744    /// `display` is part of the `IMoveObject` interface, but is implemented on
745    /// `ObjectImpl` to allow for a convenience function on `Object`.
746    pub(crate) async fn display(&self, ctx: &Context<'_>) -> Result<Option<Vec<DisplayEntry>>> {
747        let Some(native) = self.0.native_impl() else {
748            return Ok(None);
749        };
750
751        let move_object = native
752            .data
753            .try_as_move()
754            .ok_or_else(|| Error::Internal("Failed to convert object into MoveObject".to_string()))
755            .extend()?;
756
757        let (struct_tag, move_struct) = deserialize_move_struct(move_object, ctx.data_unchecked())
758            .await
759            .extend()?;
760
761        let Some(display) = Display::query(ctx.data_unchecked(), struct_tag.into())
762            .await
763            .extend()?
764        else {
765            return Ok(None);
766        };
767
768        Ok(Some(display.render(&move_struct).extend()?))
769    }
770}
771
772impl Object {
773    /// Construct a GraphQL object from a native object, without its stored
774    /// (indexed) counterpart.
775    ///
776    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
777    /// which this `Object` was constructed in. This is stored on `Object`
778    /// so that when viewing that entity's state, it will be as if it was
779    /// read at the same checkpoint.
780    ///
781    /// `root_version` represents the version of the root object in some nested
782    /// chain of dynamic fields. This should typically be left `None`,
783    /// unless the object(s) being resolved is a dynamic field, or if
784    /// `root_version` has been explicitly set for this object. If None, then
785    /// we use [`version_for_dynamic_fields`] to infer a root version to then
786    /// propagate from this object down to its dynamic fields.
787    pub(crate) fn from_native(
788        address: IotaAddress,
789        native: NativeObject,
790        checkpoint_viewed_at: u64,
791        root_version: Option<u64>,
792    ) -> Object {
793        let root_version = root_version.unwrap_or_else(|| version_for_dynamic_fields(&native));
794        Object {
795            address,
796            kind: ObjectKind::NotIndexed(native),
797            checkpoint_viewed_at,
798            root_version,
799        }
800    }
801
802    pub(crate) fn native_impl(&self) -> Option<&NativeObject> {
803        use ObjectKind as K;
804
805        match &self.kind {
806            K::NotIndexed(native) | K::Indexed(native, _) => Some(native),
807            K::WrappedOrDeleted(_) => None,
808        }
809    }
810
811    pub(crate) fn version_impl(&self) -> u64 {
812        use ObjectKind as K;
813
814        match &self.kind {
815            K::NotIndexed(native) | K::Indexed(native, _) => native.version().value(),
816            K::WrappedOrDeleted(object_version) => *object_version,
817        }
818    }
819
820    /// Root parent object version for dynamic fields.
821    ///
822    /// Check [`Object::root_version`] for details.
823    pub(crate) fn root_version(&self) -> u64 {
824        self.root_version
825    }
826
827    /// Query the database for a `page` of objects, optionally `filter`-ed.
828    ///
829    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
830    /// which this page was queried for. Each entity returned in the
831    /// connection will inherit this checkpoint, so that when viewing that
832    /// entity's state, it will be as if it was read at the same checkpoint.
833    pub(crate) async fn paginate(
834        db: &Db,
835        page: Page<Cursor>,
836        filter: ObjectFilter,
837        checkpoint_viewed_at: u64,
838    ) -> Result<Connection<String, Object>, Error> {
839        Self::paginate_subtype(db, page, filter, checkpoint_viewed_at, Ok).await
840    }
841
842    /// Query the database for a `page` of some sub-type of Object. The page
843    /// uses the bytes of an Object ID and the checkpoint when the query was
844    /// made as the cursor, and can optionally be further `filter`-ed. The
845    /// subtype is created using the `downcast` function, which is allowed
846    /// to fail, if the downcast has failed.
847    ///
848    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
849    /// which this page was queried for. Each entity returned in the
850    /// connection will inherit this checkpoint, so that when viewing that
851    /// entity's state, it will be as if it was read at the same checkpoint.
852    ///
853    /// If a `Page<Cursor>` is also provided, then this function will defer to
854    /// the `checkpoint_viewed_at` in the cursors. Otherwise, use the value
855    /// from the parameter, or set to None. This is so that paginated
856    /// queries are consistent with the previous query that created the
857    /// cursor.
858    pub(crate) async fn paginate_subtype<T: OutputType>(
859        db: &Db,
860        page: Page<Cursor>,
861        filter: ObjectFilter,
862        checkpoint_viewed_at: u64,
863        downcast: impl Fn(Object) -> Result<T, Error>,
864    ) -> Result<Connection<String, T>, Error> {
865        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
866        // they are consistent. Otherwise, use the value from the parameter, or
867        // set to None. This is so that paginated queries are consistent with
868        // the previous query that created the cursor.
869        let cursor_viewed_at = page.validate_cursor_consistency()?;
870        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
871
872        let Some((prev, next, results)) = db
873            .execute_repeatable(move |conn| {
874                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
875                    return Ok::<_, diesel::result::Error>(None);
876                };
877
878                Ok(Some(page.paginate_raw_query::<StoredHistoryObject>(
879                    conn,
880                    checkpoint_viewed_at,
881                    objects_query(&filter, range, &page),
882                )?))
883            })
884            .await?
885        else {
886            return Err(Error::Client(
887                "Requested data is outside the available range".to_string(),
888            ));
889        };
890
891        let mut conn: Connection<String, T> = Connection::new(prev, next);
892
893        for stored in results {
894            // To maintain consistency, the returned cursor should have the same upper-bound
895            // as the checkpoint found on the cursor.
896            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
897            let object =
898                Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
899            conn.edges.push(Edge::new(cursor, downcast(object)?));
900        }
901
902        Ok(conn)
903    }
904
905    /// Look-up the latest version of the object as of a given checkpoint.
906    pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup {
907        ObjectLookup::LatestAt {
908            checkpoint_viewed_at,
909        }
910    }
911
912    /// Look-up the latest version of an object whose version is less than or
913    /// equal to its parent's version, as of a given checkpoint.
914    pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
915        ObjectLookup::UnderParent {
916            parent_version,
917            checkpoint_viewed_at,
918        }
919    }
920
921    /// Look-up a specific version of the object, as of a given checkpoint.
922    pub(crate) fn at_version(version: u64, checkpoint_viewed_at: u64) -> ObjectLookup {
923        ObjectLookup::VersionAt {
924            version,
925            checkpoint_viewed_at,
926        }
927    }
928
929    /// Look-up a specific version of the object from optimistic transactions.
930    pub(crate) fn at_optimistic_version(version: u64) -> ObjectLookup {
931        ObjectLookup::OptimisticVersion { version }
932    }
933
934    pub(crate) async fn query(
935        ctx: &Context<'_>,
936        id: IotaAddress,
937        key: ObjectLookup,
938    ) -> Result<Option<Self>, Error> {
939        let DataLoader(loader) = &ctx.data_unchecked();
940
941        match key {
942            ObjectLookup::VersionAt {
943                version,
944                checkpoint_viewed_at,
945            } => {
946                loader
947                    .load_one(HistoricalKey {
948                        id,
949                        version,
950                        checkpoint_viewed_at,
951                    })
952                    .await
953            }
954
955            ObjectLookup::OptimisticVersion { version } => {
956                loader.load_one(OptimisticKey { id, version }).await
957            }
958
959            ObjectLookup::UnderParent {
960                parent_version,
961                checkpoint_viewed_at,
962            } => {
963                loader
964                    .load_one(ParentVersionKey {
965                        id,
966                        parent_version,
967                        checkpoint_viewed_at,
968                    })
969                    .await
970            }
971
972            ObjectLookup::LatestAt {
973                checkpoint_viewed_at,
974            } => {
975                loader
976                    .load_one(LatestAtKey {
977                        id,
978                        checkpoint_viewed_at,
979                    })
980                    .await
981            }
982        }
983    }
984
985    /// Query for a singleton object identified by its type. Note: the object is
986    /// assumed to be a singleton (we either find at least one object with
987    /// this type and then return it, or return nothing).
988    pub(crate) async fn query_singleton(
989        db: &Db,
990        type_: StructTag,
991        checkpoint_viewed_at: u64,
992    ) -> Result<Option<Object>, Error> {
993        let filter = ObjectFilter {
994            type_: Some(TypeFilter::ByType(type_)),
995            ..Default::default()
996        };
997
998        let connection = Self::paginate(db, Page::bounded(1), filter, checkpoint_viewed_at).await?;
999
1000        Ok(connection.edges.into_iter().next().map(|edge| edge.node))
1001    }
1002
1003    /// `checkpoint_viewed_at` represents the checkpoint sequence number at
1004    /// which this `Object` was constructed in. This is stored on `Object`
1005    /// so that when viewing that entity's state, it will be as if it was
1006    /// read at the same checkpoint.
1007    ///
1008    /// `root_version` represents the version of the root object in some nested
1009    /// chain of dynamic fields. This should typically be left `None`,
1010    /// unless the object(s) being resolved is a dynamic field, or if
1011    /// `root_version` has been explicitly set for this object. If None, then
1012    /// we use [`version_for_dynamic_fields`] to infer a root version to then
1013    /// propagate from this object down to its dynamic fields.
1014    pub(crate) fn try_from_stored_history_object(
1015        history_object: StoredHistoryObject,
1016        checkpoint_viewed_at: u64,
1017        root_version: Option<u64>,
1018    ) -> Result<Self, Error> {
1019        let address = addr(&history_object.object_id)?;
1020
1021        let object_status =
1022            NativeObjectStatus::try_from(history_object.object_status).map_err(|_| {
1023                Error::Internal(format!(
1024                    "Unknown object status {} for object {} at version {}",
1025                    history_object.object_status, address, history_object.object_version
1026                ))
1027            })?;
1028
1029        match object_status {
1030            NativeObjectStatus::Active => {
1031                let Some(serialized_object) = &history_object.serialized_object else {
1032                    return Err(Error::Internal(format!(
1033                        "Live object {} at version {} cannot have missing serialized_object field",
1034                        address, history_object.object_version
1035                    )));
1036                };
1037
1038                let native_object = bcs::from_bytes(serialized_object).map_err(|_| {
1039                    Error::Internal(format!("Failed to deserialize object {address}"))
1040                })?;
1041
1042                let root_version =
1043                    root_version.unwrap_or_else(|| version_for_dynamic_fields(&native_object));
1044                Ok(Self {
1045                    address,
1046                    kind: ObjectKind::Indexed(native_object, history_object),
1047                    checkpoint_viewed_at,
1048                    root_version,
1049                })
1050            }
1051            NativeObjectStatus::WrappedOrDeleted => Ok(Self {
1052                address,
1053                kind: ObjectKind::WrappedOrDeleted(history_object.object_version as u64),
1054                checkpoint_viewed_at,
1055                root_version: history_object.object_version as u64,
1056            }),
1057        }
1058    }
1059
1060    pub(crate) fn try_from_stored_object(
1061        stored_object: StoredObject,
1062        checkpoint_viewed_at: u64,
1063    ) -> Result<Self, Error> {
1064        let address = addr(&stored_object.object_id)?;
1065
1066        let native_object = bcs::from_bytes(&stored_object.serialized_object)
1067            .map_err(|_| Error::Internal(format!("Failed to deserialize object {address}")))?;
1068
1069        let root_version = version_for_dynamic_fields(&native_object);
1070
1071        let stored_history_like = StoredHistoryObject {
1072            object_id: stored_object.object_id,
1073            object_version: stored_object.object_version,
1074            object_digest: Some(stored_object.object_digest),
1075            object_status: NativeObjectStatus::Active as i16,
1076            checkpoint_sequence_number: checkpoint_viewed_at as i64,
1077            serialized_object: Some(stored_object.serialized_object),
1078            object_type: stored_object.object_type,
1079            object_type_package: stored_object.object_type_package,
1080            object_type_module: stored_object.object_type_module,
1081            object_type_name: stored_object.object_type_name,
1082            owner_type: Some(stored_object.owner_type),
1083            owner_id: stored_object.owner_id,
1084            coin_type: stored_object.coin_type,
1085            coin_balance: stored_object.coin_balance,
1086            df_kind: stored_object.df_kind,
1087        };
1088
1089        Ok(Self {
1090            address,
1091            kind: ObjectKind::Indexed(native_object, stored_history_like),
1092            checkpoint_viewed_at,
1093            root_version,
1094        })
1095    }
1096}
1097
1098/// We're deliberately choosing to use a child object's version as the root
1099/// here, and letting the caller override it with the actual root object's
1100/// version if it has access to it.
1101///
1102/// Using the child object's version as the root means that we're seeing the
1103/// dynamic field tree under this object at the state resulting from the
1104/// transaction that produced this version.
1105///
1106/// See [`Object::root_version`] for more details on parent/child object version
1107/// mechanics.
1108fn version_for_dynamic_fields(native: &NativeObject) -> u64 {
1109    native.as_inner().version().into()
1110}
1111
1112impl ObjectFilter {
1113    /// Try to create a filter whose results are the intersection of objects in
1114    /// `self`'s results and objects in `other`'s results. This may not be
1115    /// possible if the resulting filter is inconsistent in some way (e.g. a
1116    /// filter that requires one field to be two different values
1117    /// simultaneously).
1118    pub(crate) fn intersect(self, other: ObjectFilter) -> Option<Self> {
1119        macro_rules! intersect {
1120            ($field:ident, $body:expr) => {
1121                intersect::field(self.$field, other.$field, $body)
1122            };
1123        }
1124
1125        // Treat `object_ids` and `object_keys` as a single filter on IDs, and
1126        // optionally versions, and compute the intersection of that.
1127        let keys = intersect::field(self.keys(), other.keys(), |k, l| {
1128            let mut combined = BTreeMap::new();
1129
1130            for (id, v) in k {
1131                if let Some(w) = l.get(&id).copied() {
1132                    combined.insert(id, intersect::field(v, w, intersect::by_eq)?);
1133                }
1134            }
1135
1136            // If the intersection is empty, it means, there were some ID or Key filters in
1137            // both `self` and `other`, but they don't overlap, so the final
1138            // result is inconsistent.
1139            (!combined.is_empty()).then_some(combined)
1140        })?;
1141
1142        // Extract the ID and Key filters back out. At this point, we know that if there
1143        // were ID/Key filters in both `self` and `other`, then they intersected
1144        // to form a consistent set of constraints, so it is safe to interpret
1145        // the lack of any ID/Key filters respectively as a lack of that kind of
1146        // constraint, rather than a constraint on the empty set.
1147
1148        let object_ids = {
1149            let partition: Vec<_> = keys
1150                .iter()
1151                .flatten()
1152                .filter_map(|(id, v)| v.is_none().then_some(*id))
1153                .collect();
1154
1155            (!partition.is_empty()).then_some(partition)
1156        };
1157
1158        let object_keys = {
1159            let partition: Vec<_> = keys
1160                .iter()
1161                .flatten()
1162                .filter_map(|(id, v)| {
1163                    Some(ObjectKey {
1164                        object_id: *id,
1165                        version: (*v)?.into(),
1166                    })
1167                })
1168                .collect();
1169
1170            (!partition.is_empty()).then_some(partition)
1171        };
1172
1173        Some(Self {
1174            type_: intersect!(type_, TypeFilter::intersect)?,
1175            owner: intersect!(owner, intersect::by_eq)?,
1176            object_ids,
1177            object_keys,
1178        })
1179    }
1180
1181    /// Extract the Object ID and Key filters into one combined map from Object
1182    /// IDs in this filter, to the versions they should have (or None if the
1183    /// filter mentions the ID but no version for it).
1184    fn keys(&self) -> Option<BTreeMap<IotaAddress, Option<u64>>> {
1185        if self.object_keys.is_none() && self.object_ids.is_none() {
1186            return None;
1187        }
1188
1189        Some(BTreeMap::from_iter(
1190            self.object_keys
1191                .iter()
1192                .flatten()
1193                .map(|key| (key.object_id, Some(key.version.into())))
1194                // Chain ID filters after Key filters so if there is overlap, we overwrite the key
1195                // filter with the ID filter.
1196                .chain(self.object_ids.iter().flatten().map(|id| (*id, None))),
1197        ))
1198    }
1199
1200    /// Applies ObjectFilter to the input `RawQuery` and returns a new
1201    /// `RawQuery`.
1202    pub(crate) fn apply(&self, mut query: RawQuery) -> RawQuery {
1203        // Start by applying the filters on IDs and/or keys because they are combined as
1204        // a disjunction, while the remaining queries are conjunctions.
1205        if let Some(object_ids) = &self.object_ids {
1206            // Maximally strict - match a vec of 0 elements
1207            if object_ids.is_empty() {
1208                query = or_filter!(query, "1=0");
1209            } else {
1210                let mut inner = String::new();
1211                let mut prefix = "object_id IN (";
1212                for id in object_ids {
1213                    // SAFETY: Writing to a `String` cannot fail.
1214                    write!(
1215                        &mut inner,
1216                        "{prefix}'\\x{}'::bytea",
1217                        hex::encode(id.into_vec())
1218                    )
1219                    .unwrap();
1220                    prefix = ", ";
1221                }
1222                inner.push(')');
1223                query = or_filter!(query, inner);
1224            }
1225        }
1226
1227        if let Some(object_keys) = &self.object_keys {
1228            // Maximally strict - match a vec of 0 elements
1229            if object_keys.is_empty() {
1230                query = or_filter!(query, "1=0");
1231            } else {
1232                let mut inner = String::new();
1233                let mut prefix = "(";
1234                for ObjectKey { object_id, version } in object_keys {
1235                    // SAFETY: Writing to a `String` cannot fail.
1236                    write!(
1237                        &mut inner,
1238                        "{prefix}(object_id = '\\x{}'::bytea AND object_version = {})",
1239                        hex::encode(object_id.into_vec()),
1240                        version
1241                    )
1242                    .unwrap();
1243                    prefix = " OR ";
1244                }
1245                inner.push(')');
1246                query = or_filter!(query, inner);
1247            }
1248        }
1249
1250        if let Some(owner) = self.owner {
1251            query = filter!(
1252                query,
1253                format!(
1254                    "owner_id = '\\x{}'::bytea AND owner_type = {}",
1255                    hex::encode(owner.into_vec()),
1256                    OwnerType::Address as i16
1257                )
1258            );
1259        }
1260
1261        if let Some(type_) = &self.type_ {
1262            return type_.apply_raw(
1263                query,
1264                "object_type",
1265                "object_type_package",
1266                "object_type_module",
1267                "object_type_name",
1268            );
1269        }
1270
1271        query
1272    }
1273
1274    pub(crate) fn has_filters(&self) -> bool {
1275        self != &Default::default()
1276    }
1277}
1278
1279impl HistoricalObjectCursor {
1280    pub(crate) fn new(object_id: Vec<u8>, checkpoint_viewed_at: u64) -> Self {
1281        Self {
1282            object_id,
1283            checkpoint_viewed_at,
1284        }
1285    }
1286}
1287
1288impl Checkpointed for Cursor {
1289    fn checkpoint_viewed_at(&self) -> u64 {
1290        self.checkpoint_viewed_at
1291    }
1292}
1293
1294impl ScanLimited for Cursor {}
1295
1296impl RawPaginated<Cursor> for StoredHistoryObject {
1297    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
1298        filter!(
1299            query,
1300            format!(
1301                "candidates.object_id >= '\\x{}'::bytea",
1302                hex::encode(cursor.object_id.clone())
1303            )
1304        )
1305    }
1306
1307    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
1308        filter!(
1309            query,
1310            format!(
1311                "candidates.object_id <= '\\x{}'::bytea",
1312                hex::encode(cursor.object_id.clone())
1313            )
1314        )
1315    }
1316
1317    fn order(asc: bool, query: RawQuery) -> RawQuery {
1318        if asc {
1319            query.order_by("candidates.object_id ASC")
1320        } else {
1321            query.order_by("candidates.object_id DESC")
1322        }
1323    }
1324}
1325
1326impl Target<Cursor> for StoredHistoryObject {
1327    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
1328        Cursor::new(HistoricalObjectCursor::new(
1329            self.object_id.clone(),
1330            checkpoint_viewed_at,
1331        ))
1332    }
1333}
1334
1335impl Loader<HistoricalKey> for Db {
1336    type Value = Object;
1337    type Error = Error;
1338
1339    async fn load(&self, keys: &[HistoricalKey]) -> Result<HashMap<HistoricalKey, Object>, Error> {
1340        use objects_history::dsl as h;
1341        use objects_version::dsl as v;
1342
1343        let id_versions: BTreeSet<_> = keys
1344            .iter()
1345            .map(|key| (key.id.into_vec(), key.version as i64))
1346            .collect();
1347
1348        let objects: Vec<StoredHistoryObject> = self
1349            .execute(move |conn| {
1350                conn.results(move || {
1351                    let mut query = h::objects_history
1352                        .inner_join(
1353                            v::objects_version.on(v::cp_sequence_number
1354                                .eq(h::checkpoint_sequence_number)
1355                                .and(v::object_id.eq(h::object_id))
1356                                .and(v::object_version.eq(h::object_version))),
1357                        )
1358                        .select(StoredHistoryObject::as_select())
1359                        .into_boxed();
1360
1361                    for (id, version) in id_versions.iter().cloned() {
1362                        query =
1363                            query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version)));
1364                    }
1365
1366                    query
1367                })
1368            })
1369            .await
1370            .map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?;
1371
1372        let mut id_version_to_stored = BTreeMap::new();
1373        for stored in objects {
1374            let key = (addr(&stored.object_id)?, stored.object_version as u64);
1375            id_version_to_stored.insert(key, stored);
1376        }
1377
1378        let mut result = HashMap::new();
1379        for key in keys {
1380            let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) else {
1381                continue;
1382            };
1383
1384            // Filter by key's checkpoint viewed at here. Doing this in memory because it
1385            // should be quite rare that this query actually filters something,
1386            // but encoding it in SQL is complicated.
1387            if key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1388                continue;
1389            }
1390
1391            let object = Object::try_from_stored_history_object(
1392                stored.clone(),
1393                key.checkpoint_viewed_at,
1394                // This conversion will use the object's own version as the `Object::root_version`.
1395                None,
1396            )?;
1397            result.insert(*key, object);
1398        }
1399
1400        Ok(result)
1401    }
1402}
1403
1404impl Loader<OptimisticKey> for Db {
1405    type Value = Object;
1406    type Error = Error;
1407
1408    async fn load(&self, keys: &[OptimisticKey]) -> Result<HashMap<OptimisticKey, Object>, Error> {
1409        use objects::dsl as o;
1410
1411        let id_versions: BTreeSet<_> = keys
1412            .iter()
1413            .map(|key| (key.id.into_vec(), key.version as i64))
1414            .collect();
1415
1416        let objects: Vec<StoredObject> = self
1417            .execute(move |conn| {
1418                conn.results(move || {
1419                    let mut query = o::objects.select(StoredObject::as_select()).into_boxed();
1420                    for (id, version) in id_versions.iter().cloned() {
1421                        query =
1422                            query.or_filter(o::object_id.eq(id).and(o::object_version.eq(version)));
1423                    }
1424                    query
1425                })
1426            })
1427            .await
1428            .map_err(|e| Error::Internal(format!("Failed to fetch optimistic objects: {e}")))?;
1429
1430        let mut result = HashMap::new();
1431        let id_version_to_stored = objects
1432            .into_iter()
1433            .map(|stored| {
1434                addr(&stored.object_id).map(|id| ((id, stored.object_version as u64), stored))
1435            })
1436            .collect::<Result<BTreeMap<_, _>, _>>()?;
1437
1438        // Collect keys that were not found in objects table
1439        let mut missing_keys = Vec::new();
1440        for key in keys {
1441            if let Some(stored) = id_version_to_stored.get(&(key.id, key.version)) {
1442                let object = Object::try_from_stored_object(stored.clone(), u64::MAX)?;
1443                result.insert(*key, object);
1444            } else {
1445                missing_keys.push(*key);
1446            }
1447        }
1448
1449        // For missing keys, fallback to using the objects_history table
1450        if !missing_keys.is_empty() {
1451            let historical_keys: Vec<HistoricalKey> = missing_keys
1452                .iter()
1453                .map(|key| HistoricalKey {
1454                    id: key.id,
1455                    version: key.version,
1456                    checkpoint_viewed_at: u64::MAX,
1457                })
1458                .collect();
1459
1460            let historical_result: HashMap<HistoricalKey, Object> =
1461                self.load(&historical_keys).await?;
1462
1463            for (historical_key, object) in historical_result {
1464                let optimistic_key = OptimisticKey {
1465                    id: historical_key.id,
1466                    version: historical_key.version,
1467                };
1468                result.insert(optimistic_key, object);
1469            }
1470        }
1471
1472        Ok(result)
1473    }
1474}
1475
1476impl Loader<ParentVersionKey> for Db {
1477    type Value = Object;
1478    type Error = Error;
1479
1480    async fn load(
1481        &self,
1482        keys: &[ParentVersionKey],
1483    ) -> Result<HashMap<ParentVersionKey, Object>, Error> {
1484        // Group keys by checkpoint viewed at and parent version -- we'll issue a
1485        // separate query for each group.
1486        #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
1487        struct GroupKey {
1488            checkpoint_viewed_at: u64,
1489            parent_version: u64,
1490        }
1491
1492        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1493        for key in keys {
1494            let group_key = GroupKey {
1495                checkpoint_viewed_at: key.checkpoint_viewed_at,
1496                parent_version: key.parent_version,
1497            };
1498
1499            keys_by_cursor_and_parent_version
1500                .entry(group_key)
1501                .or_default()
1502                .insert(key.id.into_vec());
1503        }
1504
1505        // Issue concurrent reads for each group of keys.
1506        let futures = keys_by_cursor_and_parent_version
1507            .into_iter()
1508            .map(|(group_key, ids)| {
1509                self.execute(move |conn| {
1510                    let stored: Vec<StoredHistoryObject> = conn.results(move || {
1511                        use objects_history::dsl as h;
1512                        use objects_version::dsl as v;
1513
1514                        h::objects_history
1515                            .inner_join(
1516                                v::objects_version.on(v::cp_sequence_number
1517                                    .eq(h::checkpoint_sequence_number)
1518                                    .and(v::object_id.eq(h::object_id))
1519                                    .and(v::object_version.eq(h::object_version))),
1520                            )
1521                            .select(StoredHistoryObject::as_select())
1522                            .filter(v::object_id.eq_any(ids.iter().cloned()))
1523                            .filter(v::object_version.le(group_key.parent_version as i64))
1524                            .distinct_on(v::object_id)
1525                            .order_by(v::object_id)
1526                            .then_order_by(v::object_version.desc())
1527                            .into_boxed()
1528                    })?;
1529
1530                    Ok::<_, diesel::result::Error>(
1531                        stored
1532                            .into_iter()
1533                            .map(|stored| (group_key, stored))
1534                            .collect::<Vec<_>>(),
1535                    )
1536                })
1537            });
1538
1539        // Wait for the reads to all finish, and gather them into the result map.
1540        let groups = futures::future::join_all(futures).await;
1541
1542        let mut results = HashMap::new();
1543        for group in groups {
1544            for (group_key, stored) in
1545                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1546            {
1547                // This particular object is invalid -- it didn't exist at the checkpoint we are
1548                // viewing at.
1549                if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 {
1550                    continue;
1551                }
1552
1553                let object = Object::try_from_stored_history_object(
1554                    stored,
1555                    group_key.checkpoint_viewed_at,
1556                    // If `LatestAtKey::parent_version` is set, it must have been correctly
1557                    // propagated from the `Object::root_version` of some object.
1558                    Some(group_key.parent_version),
1559                )?;
1560
1561                let key = ParentVersionKey {
1562                    id: object.address,
1563                    checkpoint_viewed_at: group_key.checkpoint_viewed_at,
1564                    parent_version: group_key.parent_version,
1565                };
1566
1567                results.insert(key, object);
1568            }
1569        }
1570
1571        Ok(results)
1572    }
1573}
1574
1575impl Loader<LatestAtKey> for Db {
1576    type Value = Object;
1577    type Error = Error;
1578
1579    async fn load(&self, keys: &[LatestAtKey]) -> Result<HashMap<LatestAtKey, Object>, Error> {
1580        // Group keys by checkpoint viewed at -- we'll issue a separate query for each
1581        // group.
1582        let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
1583
1584        for key in keys {
1585            keys_by_cursor_and_parent_version
1586                .entry(key.checkpoint_viewed_at)
1587                .or_default()
1588                .insert(key.id);
1589        }
1590
1591        // Issue concurrent reads for each group of keys.
1592        let futures =
1593            keys_by_cursor_and_parent_version
1594                .into_iter()
1595                .map(|(checkpoint_viewed_at, ids)| {
1596                    self.execute_repeatable(move |conn| {
1597                        let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)?
1598                        else {
1599                            return Ok::<Vec<(u64, StoredHistoryObject)>, diesel::result::Error>(
1600                                vec![],
1601                            );
1602                        };
1603
1604                        let filter = ObjectFilter {
1605                            object_ids: Some(ids.iter().cloned().collect()),
1606                            ..Default::default()
1607                        };
1608
1609                        Ok(conn
1610                            .results(move || {
1611                                build_objects_query(
1612                                    View::Consistent,
1613                                    range,
1614                                    &Page::bounded(ids.len() as u64),
1615                                    |q| filter.apply(q),
1616                                    |q| q,
1617                                )
1618                                .into_boxed()
1619                            })?
1620                            .into_iter()
1621                            .map(|r| (checkpoint_viewed_at, r))
1622                            .collect())
1623                    })
1624                });
1625
1626        // Wait for the reads to all finish, and gather them into the result map.
1627        let groups = futures::future::join_all(futures).await;
1628
1629        let mut results = HashMap::new();
1630        for group in groups {
1631            for (checkpoint_viewed_at, stored) in
1632                group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))?
1633            {
1634                let object =
1635                    Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?;
1636
1637                let key = LatestAtKey {
1638                    id: object.address,
1639                    checkpoint_viewed_at,
1640                };
1641
1642                results.insert(key, object);
1643            }
1644        }
1645
1646        Ok(results)
1647    }
1648}
1649
1650impl From<&ObjectKind> for ObjectStatus {
1651    fn from(kind: &ObjectKind) -> Self {
1652        match kind {
1653            ObjectKind::NotIndexed(_) => ObjectStatus::NotIndexed,
1654            ObjectKind::Indexed(_, _) => ObjectStatus::Indexed,
1655            ObjectKind::WrappedOrDeleted(_) => ObjectStatus::WrappedOrDeleted,
1656        }
1657    }
1658}
1659
1660impl From<&Object> for OwnerImpl {
1661    fn from(object: &Object) -> Self {
1662        OwnerImpl {
1663            address: object.address,
1664            checkpoint_viewed_at: object.checkpoint_viewed_at,
1665        }
1666    }
1667}
1668
1669pub(crate) async fn deserialize_move_struct(
1670    move_object: &NativeMoveObject,
1671    resolver: &PackageResolver,
1672) -> Result<(StructTag, MoveStruct), Error> {
1673    let struct_tag = StructTag::from(move_object.type_().clone());
1674    let contents = move_object.contents();
1675    let move_type_layout = resolver
1676        .type_layout(TypeTag::from(struct_tag.clone()))
1677        .await
1678        .map_err(|e| {
1679            Error::Internal(format!(
1680                "Error fetching layout for type {}: {e}",
1681                struct_tag.to_canonical_string(/* with_prefix */ true)
1682            ))
1683        })?;
1684
1685    let MoveTypeLayout::Struct(layout) = move_type_layout else {
1686        return Err(Error::Internal("Object is not a move struct".to_string()));
1687    };
1688
1689    // TODO (annotated-visitor): Use custom visitors for extracting a dynamic field,
1690    // and for creating a GraphQL MoveValue directly (not via an annotated
1691    // visitor).
1692    let move_struct = BoundedVisitor::deserialize_struct(contents, &layout).map_err(|e| {
1693        Error::Internal(format!(
1694            "Error deserializing move struct for type {}: {e}",
1695            struct_tag.to_canonical_string(/* with_prefix */ true)
1696        ))
1697    })?;
1698
1699    Ok((struct_tag, move_struct))
1700}
1701
1702/// Constructs a raw query to fetch objects from the database. Objects are
1703/// filtered out if they satisfy the criteria but have a later version in the
1704/// same checkpoint. If object keys are provided, or no filters are specified at
1705/// all, then this final condition is not applied.
1706fn objects_query(filter: &ObjectFilter, range: AvailableRange, page: &Page<Cursor>) -> RawQuery
1707where
1708{
1709    if let (Some(_), Some(_)) = (&filter.object_ids, &filter.object_keys) {
1710        // If both object IDs and object keys are specified, then we need to query in
1711        // both historical and consistent views, and then union the results.
1712        let ids_only_filter = ObjectFilter {
1713            object_keys: None,
1714            ..filter.clone()
1715        };
1716        let (id_query, id_bindings) = build_objects_query(
1717            View::Consistent,
1718            range,
1719            page,
1720            move |query| ids_only_filter.apply(query),
1721            move |newer| newer,
1722        )
1723        .finish();
1724
1725        let keys_only_filter = ObjectFilter {
1726            object_ids: None,
1727            ..filter.clone()
1728        };
1729        let (key_query, key_bindings) = build_objects_query(
1730            View::Historical,
1731            range,
1732            page,
1733            move |query| keys_only_filter.apply(query),
1734            move |newer| newer,
1735        )
1736        .finish();
1737
1738        RawQuery::new(
1739            format!("SELECT * FROM (({id_query}) UNION ALL ({key_query})) AS candidates",),
1740            id_bindings.into_iter().chain(key_bindings).collect(),
1741        )
1742        .order_by("object_id")
1743        .limit(page.limit() as i64)
1744    } else {
1745        // Only one of object IDs or object keys is specified, or neither are specified.
1746        let view = if filter.object_keys.is_some() || !filter.has_filters() {
1747            View::Historical
1748        } else {
1749            View::Consistent
1750        };
1751
1752        build_objects_query(
1753            view,
1754            range,
1755            page,
1756            move |query| filter.apply(query),
1757            move |newer| newer,
1758        )
1759    }
1760}
1761
1762#[cfg(test)]
1763mod tests {
1764    use std::str::FromStr;
1765
1766    use super::*;
1767
1768    #[test]
1769    fn test_owner_filter_intersection() {
1770        let f0 = ObjectFilter {
1771            owner: Some(IotaAddress::from_str("0x1").unwrap()),
1772            ..Default::default()
1773        };
1774
1775        let f1 = ObjectFilter {
1776            owner: Some(IotaAddress::from_str("0x2").unwrap()),
1777            ..Default::default()
1778        };
1779
1780        assert_eq!(f0.clone().intersect(f0.clone()), Some(f0.clone()));
1781        assert_eq!(f0.clone().intersect(f1.clone()), None);
1782    }
1783
1784    #[test]
1785    fn test_key_filter_intersection() {
1786        let i1 = IotaAddress::from_str("0x1").unwrap();
1787        let i2 = IotaAddress::from_str("0x2").unwrap();
1788        let i3 = IotaAddress::from_str("0x3").unwrap();
1789        let i4 = IotaAddress::from_str("0x4").unwrap();
1790
1791        let f0 = ObjectFilter {
1792            object_ids: Some(vec![i1, i3]),
1793            object_keys: Some(vec![
1794                ObjectKey {
1795                    object_id: i2,
1796                    version: 1.into(),
1797                },
1798                ObjectKey {
1799                    object_id: i4,
1800                    version: 2.into(),
1801                },
1802            ]),
1803            ..Default::default()
1804        };
1805
1806        let f1 = ObjectFilter {
1807            object_ids: Some(vec![i1, i2]),
1808            object_keys: Some(vec![ObjectKey {
1809                object_id: i4,
1810                version: 2.into(),
1811            }]),
1812            ..Default::default()
1813        };
1814
1815        let f2 = ObjectFilter {
1816            object_ids: Some(vec![i1, i3]),
1817            ..Default::default()
1818        };
1819
1820        let f3 = ObjectFilter {
1821            object_keys: Some(vec![
1822                ObjectKey {
1823                    object_id: i2,
1824                    version: 2.into(),
1825                },
1826                ObjectKey {
1827                    object_id: i4,
1828                    version: 2.into(),
1829                },
1830            ]),
1831            ..Default::default()
1832        };
1833
1834        assert_eq!(
1835            f0.clone().intersect(f1.clone()),
1836            Some(ObjectFilter {
1837                object_ids: Some(vec![i1]),
1838                object_keys: Some(vec![
1839                    ObjectKey {
1840                        object_id: i2,
1841                        version: 1.into(),
1842                    },
1843                    ObjectKey {
1844                        object_id: i4,
1845                        version: 2.into(),
1846                    },
1847                ]),
1848                ..Default::default()
1849            })
1850        );
1851
1852        assert_eq!(
1853            f1.clone().intersect(f2.clone()),
1854            Some(ObjectFilter {
1855                object_ids: Some(vec![i1]),
1856                ..Default::default()
1857            })
1858        );
1859
1860        assert_eq!(
1861            f1.clone().intersect(f3.clone()),
1862            Some(ObjectFilter {
1863                object_keys: Some(vec![
1864                    ObjectKey {
1865                        object_id: i2,
1866                        version: 2.into(),
1867                    },
1868                    ObjectKey {
1869                        object_id: i4,
1870                        version: 2.into(),
1871                    },
1872                ]),
1873                ..Default::default()
1874            })
1875        );
1876
1877        // i2 got a conflicting version assignment
1878        assert_eq!(f0.clone().intersect(f3.clone()), None);
1879
1880        // No overlap between these two.
1881        assert_eq!(f2.clone().intersect(f3.clone()), None);
1882    }
1883}